This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch cp_master_cluster_bugfixs_to_0.12 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 103a018e33cda27a913968fe69df0c573d202b2e Author: LebronAl <[email protected]> AuthorDate: Mon Jun 28 11:47:44 2021 +0800 cherry pick master cluster bugfixs to 0.12 --- .../cluster/log/manage/CommittedEntryManager.java | 4 ++ .../query/reader/mult/RemoteMultSeriesReader.java | 4 +- .../cluster/server/member/DataGroupMember.java | 43 ++++++++++++++++++---- .../cluster/server/member/MetaGroupMember.java | 2 +- .../iotdb/cluster/server/member/RaftMember.java | 23 +++++++++--- 5 files changed, 59 insertions(+), 17 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java index e86df85..d8d511b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java @@ -242,6 +242,10 @@ public class CommittedEntryManager { } entries.addAll(appendingEntries); } else if (entries.size() - offset > 0) { + logger.error( + "committed entries cannot be truncated: current entries:{}, appendingEntries {}", + entries, + appendingEntries); throw new TruncateCommittedEntryException( appendingEntries.get(0).getCurrLogIndex(), getLastIndex()); } else { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java index 9c09bcb..bf20b35 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java @@ -80,7 +80,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader { } @Override - public boolean hasNextTimeValuePair(String fullPath) throws IOException { + public synchronized boolean hasNextTimeValuePair(String fullPath) throws IOException { BatchData batchData = currentBatchDatas.get(fullPath); if (batchData != null && batchData.hasCurrent()) { return true; @@ -89,7 +89,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader { return checkPathBatchData(fullPath); } - private boolean checkPathBatchData(String fullPath) { + private synchronized boolean checkPathBatchData(String fullPath) { BatchData batchData = cachedBatchs.get(fullPath).peek(); if (batchData != null && !batchData.isEmpty()) { return true; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index ba943a9..2b2b646 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient; import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient; import org.apache.iotdb.cluster.config.ClusterConstant; import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.exception.CheckConsistencyException; import org.apache.iotdb.cluster.exception.LogExecutionException; import org.apache.iotdb.cluster.exception.SnapshotInstallationException; import org.apache.iotdb.cluster.log.LogApplier; @@ -65,6 +66,7 @@ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager; import org.apache.iotdb.cluster.server.monitor.Peer; import org.apache.iotdb.cluster.server.monitor.Timer; import org.apache.iotdb.cluster.server.monitor.Timer.Statistic; +import org.apache.iotdb.cluster.utils.IOUtils; import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; @@ -75,6 +77,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.executor.PlanExecutor; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.sys.FlushPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.TestOnly; @@ -692,16 +695,40 @@ public class DataGroupMember extends RaftMember { */ @Override public TSStatus executeNonQueryPlan(PhysicalPlan plan) { - TSStatus status = executeNonQueryPlanWithKnownLeader(plan); - if (!StatusUtils.NO_LEADER.equals(status)) { - return status; - } + if (ClusterDescriptor.getInstance().getConfig().getReplicationNum() == 1) { + try { + getLocalExecutor().processNonQuery(plan); + return StatusUtils.OK; + } catch (Exception e) { + Throwable cause = IOUtils.getRootCause(e); + if (cause instanceof StorageGroupNotSetException) { + try { + metaGroupMember.syncLeaderWithConsistencyCheck(true); + if (plan instanceof InsertPlan && ((InsertPlan) plan).getFailedMeasurements() != null) { + ((InsertPlan) plan).recoverFromFailure(); + } + getLocalExecutor().processNonQuery(plan); + return StatusUtils.OK; + } catch (CheckConsistencyException ce) { + return StatusUtils.getStatus(StatusUtils.CONSISTENCY_FAILURE, ce.getMessage()); + } catch (Exception ne) { + return handleLogExecutionException(plan, IOUtils.getRootCause(ne)); + } + } + return handleLogExecutionException(plan, cause); + } + } else { + TSStatus status = executeNonQueryPlanWithKnownLeader(plan); + if (!StatusUtils.NO_LEADER.equals(status)) { + return status; + } - long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime(); - waitLeader(); - Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime); + long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime(); + waitLeader(); + Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime); - return executeNonQueryPlanWithKnownLeader(plan); + return executeNonQueryPlanWithKnownLeader(plan); + } } private TSStatus executeNonQueryPlanWithKnownLeader(PhysicalPlan plan) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index c477d5f..58884e1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -466,7 +466,7 @@ public class MetaGroupMember extends RaftMember { try { if (logger.isInfoEnabled()) { NodeReport report = genNodeReport(); - logger.info(report.toString()); + logger.debug(report.toString()); } } catch (Exception e) { logger.error("{} exception occurred when generating node report", name, e); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 426c370..9890f6b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -66,7 +66,8 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException; -import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.qp.executor.PlanExecutor; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.rpc.RpcUtils; @@ -224,6 +225,11 @@ public abstract class RaftMember { */ private LogDispatcher logDispatcher; + /** + * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB + */ + protected PlanExecutor localExecutor; + protected RaftMember() {} protected RaftMember( @@ -563,6 +569,13 @@ public abstract class RaftMember { return response; } + public PlanExecutor getLocalExecutor() throws QueryProcessException { + if (localExecutor == null) { + localExecutor = new PlanExecutor(); + } + return localExecutor; + } + /** * Get an asynchronous heartbeat thrift client to the given node. * @@ -972,7 +985,7 @@ public abstract class RaftMember { return StatusUtils.OK; } } catch (LogExecutionException e) { - return handleLogExecutionException(log, e); + return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e)); } return StatusUtils.TIME_OUT; } @@ -1036,7 +1049,7 @@ public abstract class RaftMember { break; } } catch (LogExecutionException e) { - return handleLogExecutionException(log, e); + return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e)); } return StatusUtils.TIME_OUT; } @@ -1468,8 +1481,7 @@ public abstract class RaftMember { } } - private TSStatus handleLogExecutionException(PhysicalPlanLog log, LogExecutionException e) { - Throwable cause = IOUtils.getRootCause(e); + protected TSStatus handleLogExecutionException(PhysicalPlan log, Throwable cause) { if (cause instanceof BatchProcessException) { return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) cause).getFailingStatus())); } @@ -1482,7 +1494,6 @@ public abstract class RaftMember { tsStatus.setCode(((IoTDBException) cause).getErrorCode()); } if (!(cause instanceof PathNotExistException) - && !(cause instanceof StorageGroupNotSetException) && !(cause instanceof PathAlreadyExistException) && !(cause instanceof StorageGroupAlreadySetException)) { logger.debug("{} cannot be executed because ", log, cause);
