This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch cluster_node_deletion in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_node_deletion by this push: new 970f582 add time intervals 970f582 is described below commit 970f5824479050b840455f40a72e3715dcc0b79c Author: jt2594838 <jt2594...@163.com> AuthorDate: Thu Feb 13 17:46:38 2020 +0800 add time intervals --- .../handlers/caller/AppendNodeEntryHandler.java | 1 + .../iotdb/cluster/server/member/RaftMember.java | 70 +++---- .../apache/iotdb/cluster/utils/PartitionUtils.java | 225 ++++++++++++++++++++- .../org/apache/iotdb/db/service/TSServiceImpl.java | 4 +- .../iotdb/tsfile/read/filter/GroupByFilter.java | 8 + .../iotdb/tsfile/read/filter/operator/In.java | 4 + 6 files changed, 268 insertions(+), 44 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java index dad3493..db81efd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java @@ -48,6 +48,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> { @Override public void onComplete(Long response) { + logger.debug("Append response {} from {}", response, receiver); if (leaderShipStale.get()) { // someone has rejected this log because the leadership is stale return; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 214ae19..335a721 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -76,6 +76,7 @@ public abstract class RaftMember implements RaftService.AsyncIface { ClusterConfig config = ClusterDescriptor.getINSTANCE().getConfig(); private static final Logger logger = LoggerFactory.getLogger(RaftMember.class); + private static final int SEND_LOG_RETRY = 3; String name; @@ -198,7 +199,6 @@ public abstract class RaftMember implements RaftService.AsyncIface { return; } - long thisTerm = term.get(); if (character != NodeCharacter.ELECTOR) { // only elector votes resultHandler.onComplete(Response.RESPONSE_LEADER_STILL_ONLINE); @@ -295,6 +295,7 @@ public abstract class RaftMember implements RaftService.AsyncIface { try { Log log = LogParser.getINSTANCE().parse(request.entry); resultHandler.onComplete(appendEntry(log)); + logger.debug("{} AppendEntryRequest completed", name); } catch (UnknownLogTypeException e) { resultHandler.onError(e); } @@ -322,7 +323,6 @@ public abstract class RaftMember implements RaftService.AsyncIface { } // synchronized: logs are serialized - //TODO why synchronized? private synchronized AppendLogResult sendLogToFollowers(Log log, AtomicInteger quorum) { if (allNodes.size() == 1) { // single node group, does not need the agreement of others @@ -342,12 +342,9 @@ public abstract class RaftMember implements RaftService.AsyncIface { } synchronized (quorum) {//this synchronized codes are just for calling quorum.wait. - //TODO As we have used synchronized (), do we really need to use AtomicInteger? // synchronized: avoid concurrent modification synchronized (allNodes) { - //TODO allNodes.sync is only used here. Is that needed? - //By the way, readLock is ok for this case. for (Node node : allNodes) { AsyncClient client = connectNode(node); if (client != null) { @@ -359,6 +356,7 @@ public abstract class RaftMember implements RaftService.AsyncIface { handler.setReceiverTerm(newLeaderTerm); try { client.appendEntry(request, handler); + logger.debug("{} sending a log to {}: {}", name, node, log); } catch (Exception e) { logger.warn("{} cannot append log to node {}", name, node, e); } @@ -408,12 +406,7 @@ public abstract class RaftMember implements RaftService.AsyncIface { allNodes.add(thisNode); } - public void setLastCatchUpResponseTime( - Map<Node, Long> lastCatchUpResponseTime) { - this.lastCatchUpResponseTime = lastCatchUpResponseTime; - } - - public void /**/setCharacter(NodeCharacter character) { + public void setCharacter(NodeCharacter character) { logger.info("{} has become a {}", name, character); this.character = character; } @@ -672,29 +665,34 @@ public abstract class RaftMember implements RaftService.AsyncIface { log.setPlan(plan); logManager.appendLog(log); - logger.debug("{}: Send plan {} to other nodes", name, plan); - AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2); - - switch (result) { - case OK: - logger.debug("{}: Plan {} is accepted", name, plan); - try { - logManager.commitLog(log); - } catch (QueryProcessException e) { - logger.info("{}: The log {} is not successfully applied, reverting", name, log, e); + retry: + for (int i = SEND_LOG_RETRY; i >= 0; i--) { + logger.debug("{}: Send plan {} to other nodes, retry remaining {}", name, plan, i); + AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2); + switch (result) { + case OK: + logger.debug("{}: Plan {} is accepted", name, plan); + try { + logManager.commitLog(log); + } catch (QueryProcessException e) { + logger.info("{}: The log {} is not successfully applied, reverting", name, log, e); + logManager.removeLastLog(); + TSStatus status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy(); + status.getStatusType().setMessage(e.getMessage()); + return status; + } + return StatusUtils.OK; + case TIME_OUT: + logger.debug("{}: Plan {} timed out", name, plan); + if (i == 1) { + return StatusUtils.TIME_OUT; + } + break; + case LEADERSHIP_STALE: + default: logManager.removeLastLog(); - TSStatus status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy(); - status.getStatusType().setMessage(e.getMessage()); - return status; - } - return StatusUtils.OK; - case TIME_OUT: - logger.debug("{}: Plan {} timed out", name, plan); - logManager.removeLastLog(); - return StatusUtils.TIME_OUT; - case LEADERSHIP_STALE: - default: - logManager.removeLastLog(); + break retry; + } } } return null; @@ -798,11 +796,7 @@ public abstract class RaftMember implements RaftService.AsyncIface { byte[] bytes = new byte[length]; ByteBuffer result = ByteBuffer.wrap(bytes); int len = bufferedInputStream.read(bytes); - if (len > 0) { - result.limit(len); - } else { - result.limit(0); - } + result.limit(Math.max(len, 0)); resultHandler.onComplete(result); } catch (IOException e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java index 6bf8b89..c700931 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java @@ -21,16 +21,14 @@ package org.apache.iotdb.cluster.utils; import static org.apache.iotdb.cluster.config.ClusterConstant.HASH_SALT; +import java.util.ArrayList; import org.apache.iotdb.cluster.config.ClusterConstant; -import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; -import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan; import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan; @@ -41,6 +39,18 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan; import org.apache.iotdb.db.qp.physical.sys.ShowPlan; import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType; import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan; +import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeEq; +import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt; +import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq; +import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeIn; +import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLt; +import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLtEq; +import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeNotEq; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; +import org.apache.iotdb.tsfile.read.filter.operator.NotFilter; +import org.apache.iotdb.tsfile.read.filter.operator.OrFilter; import org.apache.iotdb.tsfile.utils.Murmur128Hash; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,5 +124,214 @@ public class PartitionUtils { return newPlan; } + public static TimeIntervals extractTimeInterval(Filter filter) { + // and, or, not, value, time, group by + // eq, neq, gt, gteq, lt, lteq, in + if (filter instanceof AndFilter) { + AndFilter andFilter = ((AndFilter) filter); + TimeIntervals leftIntervals = extractTimeInterval(andFilter.getLeft()); + TimeIntervals rightIntervals = extractTimeInterval(andFilter.getRight()); + return leftIntervals.intersection(rightIntervals); + } else if (filter instanceof OrFilter) { + OrFilter orFilter = ((OrFilter) filter); + TimeIntervals leftIntervals = extractTimeInterval(orFilter.getLeft()); + TimeIntervals rightIntervals = extractTimeInterval(orFilter.getRight()); + return leftIntervals.union(rightIntervals); + } else if (filter instanceof NotFilter) { + NotFilter notFilter = ((NotFilter) filter); + return extractTimeInterval(notFilter.getFilter()).not(); + } else if (filter instanceof TimeGt) { + TimeGt timeGt = ((TimeGt) filter); + return new TimeIntervals(((long) timeGt.getValue()) + 1, Long.MAX_VALUE); + } else if (filter instanceof TimeGtEq) { + TimeGtEq timeGtEq = ((TimeGtEq) filter); + return new TimeIntervals(((long) timeGtEq.getValue()), Long.MAX_VALUE); + } else if (filter instanceof TimeEq) { + TimeEq timeEq = ((TimeEq) filter); + return new TimeIntervals(((long) timeEq.getValue()), ((long) timeEq.getValue())); + } else if (filter instanceof TimeNotEq) { + TimeNotEq timeNotEq = ((TimeNotEq) filter); + TimeIntervals intervals = new TimeIntervals(); + intervals.addInterval(Long.MIN_VALUE, (long) timeNotEq.getValue() - 1); + intervals.addInterval((long) timeNotEq.getValue() + 1, Long.MAX_VALUE); + return intervals; + } else if (filter instanceof TimeLt) { + TimeLt timeLt = ((TimeLt) filter); + return new TimeIntervals(Long.MIN_VALUE, (long) timeLt.getValue() - 1); + } else if (filter instanceof TimeLtEq) { + TimeLtEq timeLtEq = ((TimeLtEq) filter); + return new TimeIntervals(Long.MIN_VALUE, (long) timeLtEq.getValue()); + } else if (filter instanceof TimeIn) { + TimeIn timeIn = ((TimeIn) filter); + TimeIntervals intervals = new TimeIntervals(); + for (Object value : timeIn.getValues()) { + long time = ((long) value); + intervals.addInterval(time, time); + } + return intervals; + } else if (filter instanceof GroupByFilter) { + GroupByFilter groupByFilter = ((GroupByFilter) filter); + return new TimeIntervals(groupByFilter.getStartTime(), groupByFilter.getEndTime() + 1); + } + logger.warn("Unrecognized filter class: {}", filter.getClass()); + return TimeIntervals.ALL_INTERVAL; + } + + public static class TimeIntervals extends ArrayList<Long> { + + public static final TimeIntervals ALL_INTERVAL = new TimeIntervals(Long.MIN_VALUE, + Long.MAX_VALUE); + + public TimeIntervals() { + super(); + } + + public TimeIntervals(long lowerBound, long upperBound) { + super(); + addInterval(lowerBound, upperBound); + } + + public int getIntervalSize() { + return size() / 2; + } + + public long getLowerBound(int index) { + return get(index * 2); + } + + public long getUpperBound(int index) { + return get(index * 2 + 1); + } + public void addInterval(long lowerBound, long upperBound) { + add(lowerBound); + add(upperBound); + } + + public TimeIntervals intersection(TimeIntervals that) { + TimeIntervals result = new TimeIntervals(); + int thisSize = this.getIntervalSize(); + int thatSize = that.getIntervalSize(); + for (int i = 0; i < thisSize; i++) { + for (int j = 0; j < thatSize; j++) { + long thisLB = this.getLowerBound(i); + long thisUB = this.getUpperBound(i); + long thatLB = that.getLowerBound(i); + long thatUB = that.getUpperBound(i); + if (thisUB >= thatLB) { + if (thisUB <= thatUB) { + result.addInterval(Math.max(thisLB, thatLB), thisUB); + } else if (thisLB <= thatUB) { + result.addInterval(Math.max(thisLB, thatLB), thatUB); + } + } + } + } + return result; + } + + /** + * The union is implemented by merge, so the two intervals must be ordered. + * @param that + * @return + */ + public TimeIntervals union(TimeIntervals that) { + TimeIntervals result = new TimeIntervals(); + if (this.isEmpty()) { + return that; + } else if (that.isEmpty()) { + return this; + } + + int thisSize = this.getIntervalSize(); + int thatSize = that.getIntervalSize(); + int thisIndex = 0; + int thatIndex = 0; + long lastLowerBound = 0; + long lastUpperBound = 0; + boolean lastBoundSet = false; + // merge the heads of the two intervals + while (thisIndex < thisSize && thatIndex < thatSize) { + long thisLB = this.getLowerBound(thisIndex); + long thisUB = this.getUpperBound(thisIndex); + long thatLB = that.getLowerBound(thatIndex); + long thatUB = that.getUpperBound(thatIndex); + if (!lastBoundSet) { + lastBoundSet = true; + if (thisLB <= thatLB) { + lastLowerBound = thisLB; + lastUpperBound = thisUB; + thisIndex ++; + } else { + lastLowerBound = thatLB; + lastUpperBound = thatUB; + thatIndex ++; + } + } else { + if (thisLB <= lastUpperBound + 1 && thisUB >= lastLowerBound - 1) { + // the next interval from this can merge with last interval + lastLowerBound = Math.min(thisLB, lastLowerBound); + lastUpperBound = Math.max(thisUB, lastUpperBound); + thisIndex ++; + } else if (thatLB <= lastUpperBound + 1 && thatUB >= lastLowerBound - 1) { + // the next interval from that can merge with last interval + lastLowerBound = Math.min(thatLB, lastLowerBound); + lastUpperBound = Math.max(thatUB, lastUpperBound); + thatIndex ++; + } else { + // neither intervals can merge, add the last interval to the result and select a new + // one as base + result.addInterval(lastLowerBound, lastUpperBound); + lastBoundSet = false; + } + } + } + // merge the remaining intervals + TimeIntervals remainingIntervals = thisIndex < thisSize ? this : that; + int remainingIndex = thisIndex < thisSize ? thisIndex : thatIndex; + for (int i = remainingIndex; i < remainingIntervals.getIntervalSize(); i++) { + long lb = remainingIntervals.getLowerBound(i); + long ub = remainingIntervals.getUpperBound(i); + if (lb <= lastUpperBound && ub >= lastLowerBound) { + // the next interval can merge with last interval + lastLowerBound = Math.min(lb, lastLowerBound); + lastUpperBound = Math.max(ub, lastUpperBound); + } else { + // the two interval does not intersect, add the previous interval to the result + result.addInterval(lastLowerBound, lastUpperBound); + lastLowerBound = lb; + lastUpperBound = ub; + } + } + // add the last interval + result.addInterval(lastLowerBound, lastLowerBound); + return result; + } + + public TimeIntervals not() { + if (isEmpty()) { + return ALL_INTERVAL; + } + TimeIntervals result = new TimeIntervals(); + long firstLB = getLowerBound(0); + if (firstLB != Long.MIN_VALUE) { + result.addInterval(Long.MIN_VALUE, firstLB - 1); + } + + int intervalSize = getIntervalSize(); + for (int i = 0; i < intervalSize - 1; i++) { + long currentUB = getUpperBound(i); + long nextLB = getLowerBound(i + 1); + if (currentUB + 1 <= nextLB -1) { + result.addInterval(currentUB + 1, nextLB -1); + } + } + + long lastUB = getUpperBound(result.getIntervalSize() - 1); + if (lastUB != Long.MAX_VALUE) { + result.addInterval(lastUB + 1, Long.MAX_VALUE); + } + return result; + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 9dcb5c9..c832da8 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -869,15 +869,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } - - /** * create QueryDataSet and buffer it for fetchResults */ private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan) throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException, IOException, MetadataException, SQLException { - QueryContext context = new QueryContext(queryId); + QueryContext context = genQueryContext(queryId); QueryDataSet queryDataSet = processor.getExecutor().processQuery(physicalPlan, context); queryId2DataSet.put(queryId, queryDataSet); return queryDataSet; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java index b1cf764..8de0118 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java @@ -146,4 +146,12 @@ public class GroupByFilter implements Filter, Serializable { public int hashCode() { return Objects.hash(unit, slidingStep, startTime, endTime); } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java index 1bb76d4..3767f01 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java @@ -118,4 +118,8 @@ public class In<T extends Comparable<T>> implements Filter { public FilterSerializeId getSerializeId() { return FilterSerializeId.IN; } + + public Set<T> getValues() { + return values; + } }