This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_catch_up
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_catch_up by this push:
new e2e2959a7f replace physical plan with IConsensusRequest
e2e2959a7f is described below
commit e2e2959a7f1888010af4ca14c8234a4d52d7358c
Author: jt <[email protected]>
AuthorDate: Wed Jun 15 10:17:20 2022 +0800
replace physical plan with IConsensusRequest
---
.../cluster/client/sync/SyncClientAdaptor.java | 5 +-
.../iotdb/cluster/coordinator/Coordinator.java | 37 ++++---
.../cluster/impl/NativeSingleRaftConsensus.java | 6 +-
.../org/apache/iotdb/cluster/log/LogParser.java | 8 +-
.../cluster/log/applier/AsyncDataLogApplier.java | 8 +-
.../iotdb/cluster/log/applier/BaseApplier.java | 18 ++--
.../iotdb/cluster/log/applier/DataLogApplier.java | 41 ++++----
.../iotdb/cluster/log/applier/MetaLogApplier.java | 6 +-
.../iotdb/cluster/log/logtypes/RequestLog.java | 43 ++++----
.../log/sequencing/AsynchronousSequencer.java | 6 +-
.../log/sequencing/SynchronousSequencer.java | 8 +-
.../handlers/forwarder/ForwardPlanHandler.java | 12 +--
.../cluster/server/member/DataGroupMember.java | 77 ++++++++------
.../cluster/server/member/MetaGroupMember.java | 19 +++-
.../iotdb/cluster/server/member/RaftMember.java | 116 +++++++++++----------
.../cluster/server/service/BaseAsyncService.java | 11 +-
.../cluster/server/service/BaseSyncService.java | 8 +-
.../apache/iotdb/cluster/utils/PartitionUtils.java | 7 +-
.../iotdb/cluster/common/TestLogApplier.java | 8 +-
.../apache/iotdb/cluster/log/LogParserTest.java | 6 +-
.../log/applier/AsyncDataLogApplierTest.java | 18 ++--
.../cluster/log/applier/DataLogApplierTest.java | 32 +++---
.../cluster/log/applier/MetaLogApplierTest.java | 12 +--
.../cluster/log/logtypes/SerializeLogTest.java | 8 +-
.../cluster/server/member/DataGroupMemberTest.java | 4 +-
.../iotdb/cluster/utils/SerializeUtilTest.java | 6 +-
26 files changed, 298 insertions(+), 232 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index b066bc86a0..29eee1ea21 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.SerializeUtils;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -423,11 +424,11 @@ public class SyncClientAdaptor {
}
public static TSStatus executeNonQuery(
- AsyncClient client, PhysicalPlan plan, RaftNode header, Node receiver)
+ AsyncClient client, IConsensusRequest plan, RaftNode header, Node
receiver)
throws IOException, TException, InterruptedException {
AtomicReference<TSStatus> status = new AtomicReference<>();
ExecutNonQueryReq req = new ExecutNonQueryReq();
- req.planBytes =
ByteBuffer.wrap(PlanSerializer.getInstance().serialize(plan));
+ req.planBytes = plan.serializeToByteBuffer();
if (header != null) {
req.setHeader(header);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index bfd48894b5..c03b1c0a80 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -116,37 +117,45 @@ public class Coordinator {
* nodes (like timeseries deletion) or the nodes that belong to certain
groups (like data
* ingestion).
*
- * @param plan a non-query plan.
+ * @param request a non-query plan.
*/
- public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+ public TSStatus executeNonQueryPlan(IConsensusRequest request) {
TSStatus result;
long startTime =
Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.getOperationStartTime();
- if (PartitionUtils.isLocalNonQueryPlan(plan)) {
+ if (PartitionUtils.isLocalNonQueryPlan(request)) {
// run locally
- result = executeNonQueryLocally(plan);
- } else if (PartitionUtils.isGlobalMetaPlan(plan)) {
+ result = executeNonQueryLocally(request);
+ } else if (PartitionUtils.isGlobalMetaPlan(request)) {
// forward the plan to all meta group nodes
- result = metaGroupMember.processNonPartitionedMetaPlan(plan);
- } else if (PartitionUtils.isGlobalDataPlan(plan)) {
+ result = metaGroupMember.processNonPartitionedMetaPlan(request);
+ } else if (PartitionUtils.isGlobalDataPlan(request)) {
// forward the plan to all data group nodes
- result = processNonPartitionedDataPlan(plan);
- } else {
+ result = processNonPartitionedDataPlan(((PhysicalPlan) request));
+ } else if (request instanceof PhysicalPlan) {
// split the plan and forward them to some PartitionGroups
try {
- result = processPartitionedPlan(plan);
+ result = processPartitionedPlan(((PhysicalPlan) request));
} catch (UnsupportedPlanException e) {
return StatusUtils.getStatus(StatusUtils.UNSUPPORTED_OPERATION,
e.getMessage());
}
+ } else {
+ result = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
+ "Unsupported request: " + request);
}
Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
return result;
}
/** execute a non-query plan that is not necessary to be executed on other
nodes. */
- private TSStatus executeNonQueryLocally(PhysicalPlan plan) {
+ private TSStatus executeNonQueryLocally(IConsensusRequest request) {
boolean execRet;
try {
- execRet = metaGroupMember.getLocalExecutor().processNonQuery(plan);
+ if (request instanceof PhysicalPlan) {
+ execRet =
metaGroupMember.getLocalExecutor().processNonQuery(((PhysicalPlan) request));
+ } else {
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
+ "Unsupported request: " + request);
+ }
} catch (QueryProcessException e) {
if (e.getErrorCode() !=
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
logger.debug("meet error while processing non-query. ", e);
@@ -299,7 +308,7 @@ public class Coordinator {
status =
metaGroupMember
.getLocalDataMember(partitionGroup.getHeader())
- .executeNonQueryPlan(plan);
+ .executeRequest(plan).getStatus();
logger.debug(
"Execute {} in a local group of {} with status {}",
plan,
@@ -479,7 +488,7 @@ public class Coordinator {
result =
metaGroupMember
.getLocalDataMember(entry.getValue().getHeader())
- .executeNonQueryPlan(entry.getKey());
+ .executeRequest(entry.getKey()).getStatus();
logger.debug(
"Execute {} in a local group of {}, {}",
entry.getKey(),
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
b/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
index eac51c293a..622287d7a9 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/impl/NativeSingleRaftConsensus.java
@@ -46,7 +46,7 @@ public class NativeSingleRaftConsensus implements
ISingleConsensus {
@Override
public ConsensusWriteResponse write(IConsensusRequest request) {
- return null;
+ return raftMember.executeRequest(request);
}
@Override
@@ -81,11 +81,11 @@ public class NativeSingleRaftConsensus implements
ISingleConsensus {
@Override
public boolean isLeader() {
- return false;
+ return raftMember.isLeader();
}
@Override
public Peer getLeader() {
- return null;
+ return raftMember.getLeaderPeer();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index 26f8a42192..b434873f1a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.slf4j.Logger;
@@ -72,9 +72,9 @@ public class LogParser {
log = addNodeLog;
break;
case PHYSICAL_PLAN:
- PhysicalPlanLog physicalPlanLog = new PhysicalPlanLog();
- physicalPlanLog.deserialize(buffer);
- log = physicalPlanLog;
+ RequestLog requestLog = new RequestLog();
+ requestLog.deserialize(buffer);
+ log = requestLog;
break;
case CLOSE_FILE:
CloseFileLog closeFileLog = new CloseFileLog();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index ebcc885564..127a590b68 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.log.applier;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -116,9 +116,9 @@ public class AsyncDataLogApplier implements LogApplier {
private PartialPath getLogKey(Log log) throws StorageGroupNotSetException {
// we can only apply some kinds of plans in parallel, for other logs, we
must wait until all
// previous logs are applied, or the order of deletions and insertions may
get wrong
- if (log instanceof PhysicalPlanLog) {
- PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
- PhysicalPlan plan = physicalPlanLog.getPlan();
+ if (log instanceof RequestLog) {
+ RequestLog requestLog = (RequestLog) log;
+ PhysicalPlan plan = requestLog.getRequest();
// this plan only affects one sg, so we can run it with other plans in
parallel
return getPlanKey(plan);
} else if (log instanceof CloseFileLog) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 996d083d77..34767fa6e9 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -63,20 +64,21 @@ abstract class BaseApplier implements LogApplier {
}
/**
- * @param plan
+ * @param request
* @param dataGroupMember the data group member that is applying the log,
null if the log is
* applied by a meta group member
* @throws QueryProcessException
* @throws StorageGroupNotSetException
* @throws StorageEngineException
*/
- void applyPhysicalPlan(PhysicalPlan plan, DataGroupMember dataGroupMember)
+ void applyRequest(IConsensusRequest request, DataGroupMember dataGroupMember)
throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
- if (plan instanceof InsertPlan) {
- processPlanWithTolerance((InsertPlan) plan, dataGroupMember);
- } else if (plan != null && !plan.isQuery()) {
+ if (request instanceof InsertPlan) {
+ processPlanWithTolerance((InsertPlan) request, dataGroupMember);
+ } else if (request instanceof PhysicalPlan && !((PhysicalPlan)
request).isQuery()) {
+ PhysicalPlan plan = ((PhysicalPlan) request);
try {
- getQueryExecutor().processNonQuery(plan);
+ getQueryExecutor().processNonQuery(((PhysicalPlan) request));
} catch (BatchProcessException e) {
handleBatchProcessException(e, plan);
} catch (QueryProcessException e) {
@@ -89,8 +91,8 @@ abstract class BaseApplier implements LogApplier {
} catch (StorageGroupNotSetException e) {
executeAfterSync(plan);
}
- } else if (plan != null) {
- logger.error("Unsupported physical plan: {}", plan);
+ } else if (request != null) {
+ throw new QueryProcessException("Unsupported request: " + request);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index b2f138f71e..95d1f126c1 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -24,12 +24,13 @@ import
org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -79,10 +80,10 @@ public class DataLogApplier extends BaseApplier {
.preRemoveNodeForDataGroup((RemoveNodeLog) log, dataGroupMember);
dataGroupMember.setAndSaveLastAppliedPartitionTableVersion(
((RemoveNodeLog) log).getMetaLogIndex());
- } else if (log instanceof PhysicalPlanLog) {
- PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
- PhysicalPlan plan = physicalPlanLog.getPlan();
- applyPhysicalPlan(plan);
+ } else if (log instanceof RequestLog) {
+ RequestLog requestLog = (RequestLog) log;
+ IConsensusRequest request = requestLog.getRequest();
+ applyRequest(request);
} else if (log instanceof CloseFileLog) {
CloseFileLog closeFileLog = ((CloseFileLog) log);
StorageEngine.getInstance()
@@ -105,21 +106,21 @@ public class DataLogApplier extends BaseApplier {
}
}
- public void applyPhysicalPlan(PhysicalPlan plan)
+ public void applyRequest(IConsensusRequest request)
throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
- if (plan instanceof DeletePlan) {
- ((DeletePlan)
plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
- } else if (plan instanceof DeleteTimeSeriesPlan) {
- ((DeleteTimeSeriesPlan)
plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
+ if (request instanceof DeletePlan) {
+ ((DeletePlan)
request).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
+ } else if (request instanceof DeleteTimeSeriesPlan) {
+ ((DeleteTimeSeriesPlan)
request).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
}
- if (plan instanceof InsertMultiTabletsPlan) {
- applyInsert((InsertMultiTabletsPlan) plan);
- } else if (plan instanceof InsertRowsPlan) {
- applyInsert((InsertRowsPlan) plan);
- } else if (plan instanceof InsertPlan) {
- applyInsert((InsertPlan) plan);
+ if (request instanceof InsertMultiTabletsPlan) {
+ applyInsert((InsertMultiTabletsPlan) request);
+ } else if (request instanceof InsertRowsPlan) {
+ applyInsert((InsertRowsPlan) request);
+ } else if (request instanceof InsertPlan) {
+ applyInsert((InsertPlan) request);
} else {
- applyPhysicalPlan(plan, dataGroupMember);
+ applyRequest(request, dataGroupMember);
}
}
@@ -142,7 +143,7 @@ public class DataLogApplier extends BaseApplier {
}
}
}
- applyPhysicalPlan(plan, dataGroupMember);
+ applyRequest(plan, dataGroupMember);
}
private void applyInsert(InsertRowsPlan plan)
@@ -164,7 +165,7 @@ public class DataLogApplier extends BaseApplier {
}
}
}
- applyPhysicalPlan(plan, dataGroupMember);
+ applyRequest(plan, dataGroupMember);
}
private void applyInsert(InsertPlan plan)
@@ -180,6 +181,6 @@ public class DataLogApplier extends BaseApplier {
throw new QueryProcessException(ce.getMessage());
}
}
- applyPhysicalPlan(plan, dataGroupMember);
+ applyRequest(plan, dataGroupMember);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index af3163cdd4..c75c15a914 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -49,8 +49,8 @@ public class MetaLogApplier extends BaseApplier {
logger.debug("MetaMember [{}] starts applying Log {}",
metaGroupMember.getName(), log);
if (log instanceof AddNodeLog) {
applyAddNodeLog((AddNodeLog) log);
- } else if (log instanceof PhysicalPlanLog) {
- applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
+ } else if (log instanceof RequestLog) {
+ applyRequest(((RequestLog) log).getRequest(), null);
} else if (log instanceof RemoveNodeLog) {
applyRemoveNodeLog((RemoveNodeLog) log);
} else if (log instanceof EmptyContentLog || log instanceof
FragmentedLog) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
index 5815ecc8b8..4f797d459f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.log.logtypes;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -35,23 +36,23 @@ import java.util.Objects;
import static org.apache.iotdb.cluster.log.Log.Types.PHYSICAL_PLAN;
-/** PhysicalPlanLog contains a non-partitioned physical plan like set storage
group. */
-public class PhysicalPlanLog extends Log {
+/** RequestLog contains a non-partitioned request like set storage group. */
+public class RequestLog extends Log {
- private static final Logger logger =
LoggerFactory.getLogger(PhysicalPlanLog.class);
- private PhysicalPlan plan;
+ private static final Logger logger =
LoggerFactory.getLogger(RequestLog.class);
+ private IConsensusRequest request;
- public PhysicalPlanLog() {}
+ public RequestLog() {}
- public PhysicalPlanLog(PhysicalPlan plan) {
- this.plan = plan;
+ public RequestLog(IConsensusRequest request) {
+ this.request = request;
}
@Override
public int getDefaultBufferSize() {
- if (plan instanceof DummyPlan) {
+ if (request instanceof DummyPlan) {
int workloadSize =
- ((DummyPlan) plan).getWorkload() == null ? 0 : ((DummyPlan)
plan).getWorkload().length;
+ ((DummyPlan) request).getWorkload() == null ? 0 : ((DummyPlan)
request).getWorkload().length;
return workloadSize + 512;
}
return DEFAULT_BUFFER_SIZE;
@@ -66,7 +67,9 @@ public class PhysicalPlanLog extends Log {
dataOutputStream.writeLong(getCurrLogIndex());
dataOutputStream.writeLong(getCurrLogTerm());
- plan.serialize(dataOutputStream);
+ ByteBuffer byteBuffer = request.serializeToByteBuffer();
+ dataOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset(),
+ byteBuffer.limit() - byteBuffer.position());
} catch (IOException e) {
// unreachable
}
@@ -79,7 +82,7 @@ public class PhysicalPlanLog extends Log {
buffer.put((byte) PHYSICAL_PLAN.ordinal());
buffer.putLong(getCurrLogIndex());
buffer.putLong(getCurrLogTerm());
- plan.serialize(buffer);
+ buffer.put(request.serializeToByteBuffer());
}
@Override
@@ -88,7 +91,7 @@ public class PhysicalPlanLog extends Log {
setCurrLogTerm(buffer.getLong());
try {
- plan = PhysicalPlan.Factory.create(buffer);
+ request = PhysicalPlan.Factory.create(buffer);
} catch (IOException | IllegalPathException e) {
logger.error(
"Cannot parse a physical {}:{} plan {}",
@@ -99,17 +102,17 @@ public class PhysicalPlanLog extends Log {
}
}
- public PhysicalPlan getPlan() {
- return plan;
+ public IConsensusRequest getRequest() {
+ return request;
}
- public void setPlan(PhysicalPlan plan) {
- this.plan = plan;
+ public void setRequest(IConsensusRequest request) {
+ this.request = request;
}
@Override
public String toString() {
- return plan + ",term:" + getCurrLogTerm() + ",index:" + getCurrLogIndex();
+ return request + ",term:" + getCurrLogTerm() + ",index:" +
getCurrLogIndex();
}
@Override
@@ -123,12 +126,12 @@ public class PhysicalPlanLog extends Log {
if (!super.equals(o)) {
return false;
}
- PhysicalPlanLog that = (PhysicalPlanLog) o;
- return Objects.equals(plan, that.plan);
+ RequestLog that = (RequestLog) o;
+ return Objects.equals(request, that.request);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), plan);
+ return Objects.hash(super.hashCode(), request);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 5b0cfc85b1..b3fdb6d0af 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.VotingLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -104,8 +104,8 @@ public class AsynchronousSequencer implements LogSequencer {
log.setSequenceStartTime(sequenceStartTime);
log.setCurrLogTerm(member.getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
- if (log instanceof PhysicalPlanLog) {
- ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex());
+ if (log instanceof RequestLog) {
+ ((RequestLog) log).getRequest().setIndex(log.getCurrLogIndex());
}
startTime =
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index ddb96d18be..bbaf8770ce 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.VotingLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -70,9 +70,9 @@ public class SynchronousSequencer implements LogSequencer {
// if the log contains a physical plan which is not a LogPlan,
assign the same index to
// the plan so the state machine can be bridged with the consensus
- if (log instanceof PhysicalPlanLog
- && !(((PhysicalPlanLog) log).getPlan() instanceof LogPlan)) {
- ((PhysicalPlanLog)
log).getPlan().setIndex(logManager.getLastLogIndex() + 1);
+ if (log instanceof RequestLog
+ && !(((RequestLog) log).getRequest() instanceof LogPlan)) {
+ ((RequestLog)
log).getRequest().setIndex(logManager.getLastLogIndex() + 1);
}
log.setCurrLogTerm(member.getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
index 0ca430d12e..44b317d505 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.server.handlers.forwarder;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
@@ -34,13 +34,13 @@ import java.util.concurrent.atomic.AtomicReference;
public class ForwardPlanHandler implements AsyncMethodCallback<TSStatus> {
private static final Logger logger =
LoggerFactory.getLogger(ForwardPlanHandler.class);
- private PhysicalPlan plan;
+ private IConsensusRequest request;
private AtomicReference<TSStatus> result;
private Node node;
- public ForwardPlanHandler(AtomicReference<TSStatus> result, PhysicalPlan
plan, Node node) {
+ public ForwardPlanHandler(AtomicReference<TSStatus> result,
IConsensusRequest request, Node node) {
this.result = result;
- this.plan = plan;
+ this.request = request;
this.node = node;
}
@@ -55,9 +55,9 @@ public class ForwardPlanHandler implements
AsyncMethodCallback<TSStatus> {
@Override
public void onError(Exception exception) {
if (exception instanceof IOException) {
- logger.warn("Cannot send plan {} to node {}: {}", plan, node,
exception.getMessage());
+ logger.warn("Cannot send plan {} to node {}: {}", request, node,
exception.getMessage());
} else {
- logger.error("Cannot send plan {} to node {}", plan, node, exception);
+ logger.error("Cannot send plan {} to node {}", request, node, exception);
}
synchronized (result) {
TSStatus status = StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR,
exception.getMessage());
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 6e03281060..173b7bcc9c 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -72,15 +72,20 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion.TimePartitionFilter;
@@ -186,6 +191,7 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
+ "-raftId-"
+ nodes.getRaftId()
+ "";
+ groupId = new DataRegionId(0);
setThisNode(thisNode);
setAllNodes(nodes);
mbeanName =
@@ -218,6 +224,7 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
new ClientManager(
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
ClientManager.Type.DataGroupClient));
+ groupId = new DataRegionId(nodes.getHeader().node.nodeIdentifier +
nodes.getHeader().raftId);
this.metaGroupMember = metaGroupMember;
setThisNode(thisNode);
setAllNodes(nodes);
@@ -694,65 +701,66 @@ public class DataGroupMember extends RaftMember
implements DataGroupMemberMBean
* Execute a non-query plan. If the member is a leader, a log for the plan
will be created and
* process through the raft procedure, otherwise the plan will be forwarded
to the leader.
*
- * @param plan a non-query plan.
+ * @param request a non-query plan.
*/
@Override
- public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+ public ConsensusWriteResponse executeRequest(IConsensusRequest request) {
if (ClusterDescriptor.getInstance().getConfig().getReplicationNum() == 1) {
try {
- if (plan instanceof LogPlan) {
+ if (request instanceof LogPlan) {
Log log;
try {
- log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+ log = LogParser.getINSTANCE().parse(((LogPlan) request).getLog());
} catch (UnknownLogTypeException e) {
- logger.error("Can not parse LogPlan {}", plan, e);
- return StatusUtils.PARSE_LOG_ERROR;
+ logger.error("Can not parse LogPlan {}", request, e);
+ return new ConsensusWriteResponse(null,
StatusUtils.PARSE_LOG_ERROR);
}
handleChangeMembershipLogWithoutRaft(log);
} else {
- ((DataLogApplier) dataLogApplier).applyPhysicalPlan(plan);
+ ((DataLogApplier) dataLogApplier).applyRequest(request);
}
- return StatusUtils.OK;
+ return new ConsensusWriteResponse(null, StatusUtils.OK);
} catch (Exception e) {
Throwable cause = IOUtils.getRootCause(e);
boolean hasCreated = false;
try {
- if (plan instanceof InsertPlan
+ if (request instanceof InsertPlan
&&
ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
- if (plan instanceof InsertRowsPlan || plan instanceof
InsertMultiTabletsPlan) {
+ if (request instanceof InsertRowsPlan || request instanceof
InsertMultiTabletsPlan) {
if (e instanceof BatchProcessException) {
for (TSStatus status : ((BatchProcessException)
e).getFailingStatus()) {
if (status.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
- hasCreated =
createTimeseriesForFailedInsertion(((InsertPlan) plan));
- ((BatchPlan) plan).getResults().clear();
+ hasCreated =
createTimeseriesForFailedInsertion(((InsertPlan) request));
+ ((BatchPlan) request).getResults().clear();
break;
}
}
}
} else if (cause instanceof PathNotExistException) {
- hasCreated = createTimeseriesForFailedInsertion(((InsertPlan)
plan));
+ hasCreated = createTimeseriesForFailedInsertion(((InsertPlan)
request));
}
}
} catch (MetadataException | CheckConsistencyException ex) {
- logger.error("{}: Cannot auto-create timeseries for {}", name, plan,
e);
- return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
ex.getMessage());
+ logger.error("{}: Cannot auto-create timeseries for {}", name,
request, e);
+ return new ConsensusWriteResponse(null,
+ StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
ex.getMessage()));
}
if (hasCreated) {
- return executeNonQueryPlan(plan);
+ return executeRequest(request);
}
- return handleLogExecutionException(plan, cause);
+ return new ConsensusWriteResponse(null,
handleLogExecutionException(request, cause));
}
} else {
- TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
+ TSStatus status = executeNonQueryPlanWithKnownLeader(request);
if (!StatusUtils.NO_LEADER.equals(status)) {
- return status;
+ return new ConsensusWriteResponse(null, status);
}
long startTime =
Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
waitLeader();
Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
- return executeNonQueryPlanWithKnownLeader(plan);
+ return new ConsensusWriteResponse(null,
executeNonQueryPlanWithKnownLeader(request));
}
}
@@ -788,42 +796,44 @@ public class DataGroupMember extends RaftMember
implements DataGroupMemberMBean
}
}
- private TSStatus executeNonQueryPlanWithKnownLeader(PhysicalPlan plan) {
+ private TSStatus executeNonQueryPlanWithKnownLeader(IConsensusRequest
request) {
if (character == NodeCharacter.LEADER) {
- if (plan.getTargetedTerm() > 0 && plan.getTargetedTerm() != term.get()) {
+ if ((request instanceof PhysicalPlan)
+ && ((PhysicalPlan) request).getTargetedTerm() > 0
+ && ((PhysicalPlan) request).getTargetedTerm() != term.get()) {
return StatusUtils.getStatus(TSStatusCode.LEADER_CHANGED)
.setMessage(getRaftGroupFullId() + "-" + term.get());
}
long startTime =
Statistic.DATA_GROUP_MEMBER_LOCAL_EXECUTION.getOperationStartTime();
- TSStatus status = processPlanLocally(plan);
+ TSStatus status = processPlanLocally(request);
boolean hasCreated = false;
try {
- if (plan instanceof InsertPlan
+ if (request instanceof InsertPlan
&&
ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
- if (plan instanceof InsertRowsPlan || plan instanceof
InsertMultiTabletsPlan) {
+ if (request instanceof InsertRowsPlan || request instanceof
InsertMultiTabletsPlan) {
if (status.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (TSStatus tmpStatus : status.getSubStatus()) {
if (tmpStatus.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
- hasCreated =
createTimeseriesForFailedInsertion(((InsertPlan) plan));
- ((BatchPlan) plan).getResults().clear();
+ hasCreated =
createTimeseriesForFailedInsertion(((InsertPlan) request));
+ ((BatchPlan) request).getResults().clear();
break;
}
}
}
} else {
if (status.getCode() ==
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
- hasCreated = createTimeseriesForFailedInsertion(((InsertPlan)
plan));
+ hasCreated = createTimeseriesForFailedInsertion(((InsertPlan)
request));
}
}
}
} catch (MetadataException | CheckConsistencyException e) {
- logger.error("{}: Cannot auto-create timeseries for {}", name, plan,
e);
+ logger.error("{}: Cannot auto-create timeseries for {}", name,
request, e);
return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
e.getMessage());
}
if (hasCreated) {
- status = processPlanLocally(plan);
+ status = processPlanLocally(request);
}
Statistic.DATA_GROUP_MEMBER_LOCAL_EXECUTION.calOperationCostTimeFromStart(startTime);
if (status != null) {
@@ -831,7 +841,7 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
}
} else if (leader.get() != null &&
!ClusterConstant.EMPTY_NODE.equals(leader.get())) {
long startTime =
Timer.Statistic.DATA_GROUP_MEMBER_FORWARD_PLAN.getOperationStartTime();
- TSStatus result = forwardPlan(plan, leader.get(), getHeader());
+ TSStatus result = forwardPlan(request, leader.get(), getHeader());
Timer.Statistic.DATA_GROUP_MEMBER_FORWARD_PLAN.calOperationCostTimeFromStart(startTime);
if (!StatusUtils.NO_LEADER.equals(result)) {
result.setRedirectNode(
@@ -1161,4 +1171,9 @@ public class DataGroupMember extends RaftMember
implements DataGroupMemberMBean
this.version = version;
}
}
+
+ @Override
+ public int getPort(Node node) {
+ return node.getDataPort();
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 65c767b0de..d2931ed34d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -76,11 +76,14 @@ import
org.apache.iotdb.cluster.utils.nodetool.function.Status;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -206,7 +209,9 @@ public class MetaGroupMember extends RaftMember implements
IService, MetaGroupMe
boolean ready = false;
@TestOnly
- public MetaGroupMember() {}
+ public MetaGroupMember() {
+ groupId = new PartitionRegionId(0);
+ }
public MetaGroupMember(Node thisNode, Coordinator coordinator) {
super(
@@ -214,6 +219,7 @@ public class MetaGroupMember extends RaftMember implements
IService, MetaGroupMe
new ClientManager(
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
ClientManager.Type.MetaGroupClient));
+ groupId = new PartitionRegionId(0);
setThisNode(thisNode);
setAllNodes(new PartitionGroup());
initPeerMap();
@@ -1308,7 +1314,7 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
* @param plan a non-query plan.
*/
@Override
- public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+ public ConsensusWriteResponse executeRequest(IConsensusRequest plan) {
TSStatus result;
long startTime =
Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.getOperationStartTime();
if (PartitionUtils.isGlobalMetaPlan(plan)) {
@@ -1322,7 +1328,7 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
result = coordinator.executeNonQueryPlan(plan);
}
Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
- return result;
+ return new ConsensusWriteResponse(null, result);
}
@Override
@@ -1341,7 +1347,7 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
* Thus the plan will be processed locally only by the MetaLeader and
forwarded by non-leader
* nodes.
*/
- public TSStatus processNonPartitionedMetaPlan(PhysicalPlan plan) {
+ public TSStatus processNonPartitionedMetaPlan(IConsensusRequest plan) {
if (character == NodeCharacter.LEADER) {
TSStatus status = processPlanLocally(plan);
if (status != null) {
@@ -1926,4 +1932,9 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
public String getIdNodeMapAsString() {
return idNodeMap.toString();
}
+
+ @Override
+ public int getPort(Node node) {
+ return node.getMetaPort();
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 1debf68cb5..0b06c31af5 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -49,7 +49,7 @@ import org.apache.iotdb.cluster.log.appender.LogAppender;
import org.apache.iotdb.cluster.log.appender.LogAppenderFactory;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;
import org.apache.iotdb.cluster.log.sequencing.LogSequencer;
@@ -81,13 +81,17 @@ import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
@@ -183,6 +187,7 @@ public abstract class RaftMember implements RaftMemberMBean
{
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
/** the name of the member, to distinguish several members in the logs. */
+ ConsensusGroupId groupId;
String name;
/** to choose nodes to send request of joining cluster randomly. */
Random random = new Random();
@@ -852,28 +857,13 @@ public abstract class RaftMember implements
RaftMemberMBean {
}
}
- /**
- * If the node is not a leader, the request will be sent to the leader or
reports an error if
- * there is no leader. Otherwise execute the plan locally (whether to send
it to followers depends
- * on the type of the plan).
- */
- public TSStatus executeNonQueryPlan(ExecutNonQueryReq request)
- throws IOException, IllegalPathException {
- // process the plan locally
- PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
-
- TSStatus answer = executeNonQueryPlan(plan);
- logger.debug("{}: Received a plan {}, executed answer: {}", name, plan,
answer);
- return answer;
- }
-
/**
* Execute a non-query plan. Subclass may have their individual implements.
*
* @param plan a non-query plan.
* @return A TSStatus indicating the execution result.
*/
- protected abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
+ public abstract ConsensusWriteResponse executeRequest(IConsensusRequest
plan);
abstract ClientCategory getClientCategory();
@@ -1109,29 +1099,29 @@ public abstract class RaftMember implements
RaftMemberMBean {
* @return OK if over half of the followers accept the log or null if the
leadership is lost
* during the appending
*/
- public TSStatus processPlanLocally(PhysicalPlan plan) {
+ public TSStatus processPlanLocally(IConsensusRequest request) {
if (USE_LOG_DISPATCHER) {
- return processPlanLocallyV2(plan);
+ return processPlanLocallyV2(request);
}
- logger.debug("{}: Processing plan {}", name, plan);
- if (readOnly && !(plan instanceof LogPlan)) {
+ logger.debug("{}: Processing plan {}", name, request);
+ if (readOnly && !(request instanceof LogPlan)) {
return StatusUtils.NODE_READ_ONLY;
}
long startTime =
Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
Log log;
- if (plan instanceof LogPlan) {
+ if (request instanceof LogPlan) {
try {
- log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+ log = LogParser.getINSTANCE().parse(((LogPlan) request).getLog());
} catch (UnknownLogTypeException e) {
- logger.error("Can not parse LogPlan {}", plan, e);
+ logger.error("Can not parse LogPlan {}", request, e);
return StatusUtils.PARSE_LOG_ERROR;
}
} else {
- log = new PhysicalPlanLog();
- ((PhysicalPlanLog) log).setPlan(plan);
+ log = new RequestLog();
+ ((RequestLog) log).setRequest(request);
}
// if a single log exceeds the threshold
@@ -1152,8 +1142,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
synchronized (logManager) {
if (logManager.getLastLogIndex() - logManager.getCommitLogIndex()
<= config.getUnCommittedRaftLogNumForRejectThreshold()) {
- if (!(plan instanceof LogPlan)) {
- plan.setIndex(logManager.getLastLogIndex() + 1);
+ if (!(request instanceof LogPlan) && (request instanceof
PhysicalPlan)) {
+ ((PhysicalPlan) request).setIndex(logManager.getLastLogIndex() +
1);
}
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
@@ -1187,7 +1177,7 @@ public abstract class RaftMember implements
RaftMemberMBean {
}
}
- protected TSStatus processPlanLocallyV2(PhysicalPlan plan) {
+ protected TSStatus processPlanLocallyV2(IConsensusRequest plan) {
long totalStartTime = System.nanoTime();
logger.debug("{}: Processing plan {}", name, plan);
if (readOnly) {
@@ -1203,8 +1193,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
return StatusUtils.PARSE_LOG_ERROR;
}
} else {
- log = new PhysicalPlanLog();
- ((PhysicalPlanLog) log).setPlan(plan);
+ log = new RequestLog();
+ ((RequestLog) log).setRequest(plan);
}
if (USE_CRAFT && allNodes.size() > 2) {
@@ -1473,7 +1463,7 @@ public abstract class RaftMember implements
RaftMemberMBean {
* communication
* @return a TSStatus indicating if the forwarding is successful.
*/
- public TSStatus forwardPlan(PhysicalPlan plan, Node node, RaftNode header) {
+ public TSStatus forwardPlan(IConsensusRequest plan, Node node, RaftNode
header) {
if (node == null || node.equals(thisNode)) {
logger.debug("{}: plan {} has no where to be forwarded", name, plan);
return StatusUtils.NO_LEADER;
@@ -1505,7 +1495,7 @@ public abstract class RaftMember implements
RaftMemberMBean {
* @param header to determine which DataGroupMember of "receiver" will
process the request.
* @return a TSStatus indicating if the forwarding is successful.
*/
- private TSStatus forwardPlanAsync(PhysicalPlan plan, Node receiver, RaftNode
header) {
+ private TSStatus forwardPlanAsync(IConsensusRequest plan, Node receiver,
RaftNode header) {
AsyncClient client = getAsyncClient(receiver);
if (client == null) {
logger.debug("{}: can not get client for node={}", name, receiver);
@@ -1517,38 +1507,38 @@ public abstract class RaftMember implements
RaftMemberMBean {
}
public TSStatus forwardPlanAsync(
- PhysicalPlan plan, Node receiver, RaftNode header, AsyncClient client) {
+ IConsensusRequest request, Node receiver, RaftNode header, AsyncClient
client) {
try {
- TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, plan,
header, receiver);
+ TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, request,
header, receiver);
if (tsStatus == null) {
tsStatus = StatusUtils.TIME_OUT;
- logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
+ logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
}
return tsStatus;
} catch (IOException | TException e) {
- logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
+ logger.error(MSG_FORWARD_ERROR, name, request, receiver, e);
return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.warn("{}: forward {} to {} interrupted", name, plan, receiver);
+ logger.warn("{}: forward {} to {} interrupted", name, request, receiver);
return StatusUtils.TIME_OUT;
}
}
- private TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, RaftNode
header) {
+ private TSStatus forwardPlanSync(IConsensusRequest request, Node receiver,
RaftNode header) {
Client client = getSyncClient(receiver);
if (client == null) {
- logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
+ logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
return StatusUtils.TIME_OUT;
}
- return forwardPlanSync(plan, receiver, header, client);
+ return forwardPlanSync(request, receiver, header, client);
}
public TSStatus forwardPlanSync(
- PhysicalPlan plan, Node receiver, RaftNode header, Client client) {
+ IConsensusRequest request, Node receiver, RaftNode header, Client
client) {
try {
ExecutNonQueryReq req = new ExecutNonQueryReq();
- req.setPlanBytes(PlanSerializer.getInstance().serialize(plan));
+ req.setPlanBytes(request.serializeToByteBuffer());
if (header != null) {
req.setHeader(header);
}
@@ -1556,19 +1546,16 @@ public abstract class RaftMember implements
RaftMemberMBean {
TSStatus tsStatus = client.executeNonQueryPlan(req);
if (tsStatus == null) {
tsStatus = StatusUtils.TIME_OUT;
- logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
+ logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
}
return tsStatus;
- } catch (IOException e) {
- logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
- return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
} catch (TException e) {
TSStatus status;
if (e.getCause() instanceof SocketTimeoutException) {
status = StatusUtils.TIME_OUT;
- logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
+ logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
} else {
- logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
+ logger.error(MSG_FORWARD_ERROR, name, request, receiver, e);
status = StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR,
e.getMessage());
}
// the connection may be broken, close it to avoid it being reused
@@ -1694,12 +1681,12 @@ public abstract class RaftMember implements
RaftMemberMBean {
}
private boolean canBeWeaklyAccepted(Log log) {
- if (!(log instanceof PhysicalPlanLog)) {
+ if (!(log instanceof RequestLog)) {
return false;
}
- PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
- return physicalPlanLog.getPlan() instanceof InsertPlan
- || physicalPlanLog.getPlan() instanceof DummyPlan;
+ RequestLog requestLog = (RequestLog) log;
+ return requestLog.getRequest() instanceof InsertPlan
+ || requestLog.getRequest() instanceof DummyPlan;
}
/**
@@ -2380,4 +2367,25 @@ public abstract class RaftMember implements
RaftMemberMBean {
public LogRelay getLogRelay() {
return logRelay;
}
+
+ public ConsensusGroupId getGroupId() {
+ return groupId;
+ }
+
+ public abstract int getPort(Node node);
+
+ public Peer getThisPeer() {
+ return new Peer(groupId, new TEndPoint(thisNode.internalIp,
getPort(thisNode)));
+ }
+
+ public Peer getLeaderPeer() {
+ if (leader.get() == null) {
+ return null;
+ }
+ return new Peer(groupId, new TEndPoint(leader.get().internalIp,
getPort(leader.get())));
+ }
+
+ public boolean isLeader() {
+ return ClusterUtils.nodeEqual(leader.get(), thisNode);
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index bb87809543..2a83adc28e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -46,9 +47,13 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class BaseAsyncService implements RaftService.AsyncIface {
+ private static final Logger logger =
LoggerFactory.getLogger(BaseAsyncService.class);
+
RaftMember member;
String name;
@@ -166,7 +171,11 @@ public abstract class BaseAsyncService implements
RaftService.AsyncIface {
}
try {
- TSStatus status = member.executeNonQueryPlan(request);
+ // process the plan locally
+ PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
+
+ TSStatus status = member.executeRequest(plan);
+ logger.debug("{}: Received a plan {}, executed answer: {}", name, plan,
status);
resultHandler.onComplete(
StatusUtils.getStatus(
status,
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index a438d3cb54..8119eceda5 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,7 +154,12 @@ public abstract class BaseSyncService implements
RaftService.Iface {
@Override
public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws
TException {
try {
- return member.executeNonQueryPlan(request);
+ // process the plan locally
+ PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
+
+ TSStatus answer = member.executeRequest(plan);
+ logger.debug("{}: Received a plan {}, executed answer: {}", name, plan,
answer);
+ return answer;
} catch (Exception e) {
throw new TException(e);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index b8e452657c..55761cd1eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.utils;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -69,7 +70,7 @@ public class PartitionUtils {
* @param plan
* @return
*/
- public static boolean isLocalNonQueryPlan(PhysicalPlan plan) {
+ public static boolean isLocalNonQueryPlan(IConsensusRequest plan) {
return plan instanceof LoadDataPlan
|| plan instanceof OperateFilePlan
|| plan instanceof KillQueryPlan
@@ -85,7 +86,7 @@ public class PartitionUtils {
* @param plan
* @return
*/
- public static boolean isGlobalMetaPlan(PhysicalPlan plan) {
+ public static boolean isGlobalMetaPlan(IConsensusRequest plan) {
return plan instanceof SetStorageGroupPlan
|| plan instanceof SetTTLPlan
|| plan instanceof ShowTTLPlan
@@ -113,7 +114,7 @@ public class PartitionUtils {
* @param plan the plan to check
* @return is globalDataPlan or not
*/
- public static boolean isGlobalDataPlan(PhysicalPlan plan) {
+ public static boolean isGlobalDataPlan(IConsensusRequest plan) {
return
// because deletePlan has an infinite time range.
plan instanceof DeletePlan
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestLogApplier.java
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestLogApplier.java
index 56ca05c2ef..663164dcf4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestLogApplier.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestLogApplier.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.common;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -37,9 +37,9 @@ public class TestLogApplier implements LogApplier {
@Override
public void apply(Log log) {
try {
- if (log instanceof PhysicalPlanLog) {
- PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
- getPlanExecutor().processNonQuery(physicalPlanLog.getPlan());
+ if (log instanceof RequestLog) {
+ RequestLog requestLog = (RequestLog) log;
+ getPlanExecutor().processNonQuery(requestLog.getRequest());
} else if (log instanceof CloseFileLog) {
CloseFileLog closeFileLog = ((CloseFileLog) log);
try {
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 6969448090..768ea092ca 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -24,7 +24,7 @@ import
org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -60,10 +60,10 @@ public class LogParserTest {
@Test
public void testPhysicalPlanLog() throws UnknownLogTypeException,
IllegalPathException {
- PhysicalPlanLog log = new PhysicalPlanLog();
+ RequestLog log = new RequestLog();
SetStorageGroupPlan setStorageGroupPlan =
new SetStorageGroupPlan(new PartialPath(TestUtils.getTestSg(5)));
- log.setPlan(setStorageGroupPlan);
+ log.setRequest(setStorageGroupPlan);
log.setCurrLogIndex(8);
log.setCurrLogTerm(8);
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
index af86c48b57..2866927b4f 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -68,9 +68,9 @@ public class AsyncDataLogApplierTest {
public void test() throws IllegalPathException, InterruptedException {
LogApplier dummyApplier =
log -> {
- if (log instanceof PhysicalPlanLog) {
- PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
- PhysicalPlan plan = physicalPlanLog.getPlan();
+ if (log instanceof RequestLog) {
+ RequestLog requestLog = (RequestLog) log;
+ PhysicalPlan plan = requestLog.getRequest();
if (plan instanceof InsertRowPlan) {
appliedLogs.add(log);
log.setApplied(true);
@@ -89,7 +89,7 @@ public class AsyncDataLogApplierTest {
PhysicalPlan plan =
new InsertRowPlan(
new PartialPath(TestUtils.getTestSg(i)), i, new String[0], new
String[0]);
- PhysicalPlanLog log = new PhysicalPlanLog(plan);
+ RequestLog log = new RequestLog(plan);
log.setCurrLogIndex(i);
logsToApply.add(log);
}
@@ -116,9 +116,9 @@ public class AsyncDataLogApplierTest {
public void testParallel() {
LogApplier dummyApplier =
log -> {
- if (log instanceof PhysicalPlanLog) {
- PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
- PhysicalPlan plan = physicalPlanLog.getPlan();
+ if (log instanceof RequestLog) {
+ RequestLog requestLog = (RequestLog) log;
+ PhysicalPlan plan = requestLog.getRequest();
if (plan instanceof InsertRowPlan) {
appliedLogs.add(log);
log.setApplied(true);
@@ -149,7 +149,7 @@ public class AsyncDataLogApplierTest {
} catch (IllegalPathException e) {
// ignore
}
- PhysicalPlanLog log = new PhysicalPlanLog(plan);
+ RequestLog log = new RequestLog(plan);
log.setCurrLogIndex(finalI * 11 + j);
threadLogsToApply.add(log);
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 8e739462e7..728ce7dc20 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.metadata.CSchemaProcessor;
import org.apache.iotdb.cluster.metadata.MetaPuller;
import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -301,8 +301,8 @@ public class DataLogApplierTest extends IoTDBTest {
throws QueryProcessException, IOException,
QueryFilterOptimizationException,
StorageEngineException, MetadataException, InterruptedException {
InsertRowPlan insertPlan = new InsertRowPlan();
- PhysicalPlanLog log = new PhysicalPlanLog();
- log.setPlan(insertPlan);
+ RequestLog log = new RequestLog();
+ log.setRequest(insertPlan);
// this series is already created
insertPlan.setDevicePath(new PartialPath(TestUtils.getTestSg(1)));
@@ -357,8 +357,8 @@ public class DataLogApplierTest extends IoTDBTest {
throws MetadataException, QueryProcessException, StorageEngineException,
IOException,
InterruptedException, QueryFilterOptimizationException {
InsertRowsPlan insertRowsPlan = new InsertRowsPlan();
- PhysicalPlanLog log = new PhysicalPlanLog();
- log.setPlan(insertRowsPlan);
+ RequestLog log = new RequestLog();
+ log.setRequest(insertRowsPlan);
for (int i = 1; i <= 4; i++) {
InsertRowPlan insertPlan = new InsertRowPlan();
@@ -394,7 +394,7 @@ public class DataLogApplierTest extends IoTDBTest {
DeletePlan deletePlan = new DeletePlan();
deletePlan.setPaths(Collections.singletonList(new
PartialPath(TestUtils.getTestSeries(0, 0))));
deletePlan.setDeleteEndTime(50);
- applier.apply(new PhysicalPlanLog(deletePlan));
+ applier.apply(new RequestLog(deletePlan));
QueryDataSet dataSet =
query(Collections.singletonList(TestUtils.getTestSeries(0, 0)), null);
int cnt = 0;
while (dataSet.hasNext()) {
@@ -422,7 +422,7 @@ public class DataLogApplierTest extends IoTDBTest {
// existing sg
FlushPlan flushPlan =
new FlushPlan(null, Collections.singletonList(new
PartialPath(TestUtils.getTestSg(0))));
- PhysicalPlanLog log = new PhysicalPlanLog(flushPlan);
+ RequestLog log = new RequestLog(flushPlan);
applier.apply(log);
assertNull(log.getException());
@@ -430,7 +430,7 @@ public class DataLogApplierTest extends IoTDBTest {
// non-existing sg
flushPlan =
new FlushPlan(null, Collections.singletonList(new
PartialPath(TestUtils.getTestSg(20))));
- log = new PhysicalPlanLog(flushPlan);
+ log = new RequestLog(flushPlan);
applier.apply(log);
assertEquals(
@@ -453,7 +453,7 @@ public class DataLogApplierTest extends IoTDBTest {
multiTimeSeriesPlan.setDataTypes(Arrays.asList(TSDataType.DOUBLE,
TSDataType.DOUBLE));
multiTimeSeriesPlan.setEncodings(Arrays.asList(TSEncoding.GORILLA,
TSEncoding.GORILLA));
- PhysicalPlanLog log = new PhysicalPlanLog(multiTimeSeriesPlan);
+ RequestLog log = new RequestLog(multiTimeSeriesPlan);
// the applier should sync meta leader to get root.sg2 and report no error
applier.apply(log);
assertTrue(
@@ -477,7 +477,7 @@ public class DataLogApplierTest extends IoTDBTest {
});
DeletePlan deletePlan = new DeletePlan();
- PhysicalPlanLog log = new PhysicalPlanLog(deletePlan);
+ RequestLog log = new RequestLog(deletePlan);
applier.apply(log);
assertNull(log.getException());
}
@@ -485,16 +485,16 @@ public class DataLogApplierTest extends IoTDBTest {
@Test
public void testApplyClearCache() {
ClearCachePlan clearCachePlan = new ClearCachePlan();
- PhysicalPlanLog physicalPlanLog = new PhysicalPlanLog(clearCachePlan);
- applier.apply(physicalPlanLog);
- assertNull(physicalPlanLog.getException());
+ RequestLog requestLog = new RequestLog(clearCachePlan);
+ applier.apply(requestLog);
+ assertNull(requestLog.getException());
}
@Test
public void testApplyMerge() {
MergePlan mergePlan = new MergePlan();
- PhysicalPlanLog physicalPlanLog = new PhysicalPlanLog(mergePlan);
- applier.apply(physicalPlanLog);
- assertNull(physicalPlanLog.getException());
+ RequestLog requestLog = new RequestLog(mergePlan);
+ applier.apply(requestLog);
+ assertNull(requestLog.getException());
}
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index ff50b10463..d8ebb4f093 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.Constants;
@@ -106,12 +106,12 @@ public class MetaLogApplierTest extends IoTDBTest {
@Test
public void testApplyMetadataCreation() throws MetadataException {
- PhysicalPlanLog physicalPlanLog = new PhysicalPlanLog();
+ RequestLog requestLog = new RequestLog();
SetStorageGroupPlan setStorageGroupPlan =
new SetStorageGroupPlan(new PartialPath("root.applyMeta"));
- physicalPlanLog.setPlan(setStorageGroupPlan);
+ requestLog.setRequest(setStorageGroupPlan);
- applier.apply(physicalPlanLog);
+ applier.apply(requestLog);
assertTrue(IoTDB.schemaProcessor.isPathExist(new
PartialPath("root.applyMeta")));
CreateTimeSeriesPlan createTimeSeriesPlan =
@@ -124,8 +124,8 @@ public class MetaLogApplierTest extends IoTDBTest {
Collections.emptyMap(),
Collections.emptyMap(),
null);
- physicalPlanLog.setPlan(createTimeSeriesPlan);
- applier.apply(physicalPlanLog);
+ requestLog.setRequest(createTimeSeriesPlan);
+ applier.apply(requestLog);
assertTrue(IoTDB.schemaProcessor.isPathExist(new
PartialPath("root.applyMeta.s1")));
assertEquals(
TSDataType.DOUBLE,
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 35aae54f2d..0d6832b964 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -48,7 +48,7 @@ public class SerializeLogTest {
@Test
public void testPhysicalPlanLog() throws UnknownLogTypeException,
IllegalPathException {
- PhysicalPlanLog log = new PhysicalPlanLog();
+ RequestLog log = new RequestLog();
log.setCurrLogIndex(2);
log.setCurrLogTerm(2);
InsertRowPlan plan = new InsertRowPlan();
@@ -68,19 +68,19 @@ public class SerializeLogTest {
schemas[2].getSchema().setType(TSDataType.TEXT);
plan.setMeasurementMNodes(schemas);
plan.setTime(1);
- log.setPlan(plan);
+ log.setRequest(plan);
ByteBuffer byteBuffer = log.serialize();
Log logPrime = LogParser.getINSTANCE().parse(byteBuffer);
assertEquals(log, logPrime);
- log = new PhysicalPlanLog(new SetStorageGroupPlan(new
PartialPath("root.sg1")));
+ log = new RequestLog(new SetStorageGroupPlan(new PartialPath("root.sg1")));
byteBuffer = log.serialize();
logPrime = LogParser.getINSTANCE().parse(byteBuffer);
assertEquals(log, logPrime);
log =
- new PhysicalPlanLog(
+ new RequestLog(
new CreateTimeSeriesPlan(
new PartialPath("root.applyMeta" + ".s1"),
TSDataType.DOUBLE,
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index ab3c41ba5f..64b167f3df 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -605,7 +605,7 @@ public class DataGroupMemberTest extends BaseMember {
Collections.emptyMap(),
Collections.emptyMap(),
null);
- assertEquals(200,
dataGroupMember.executeNonQueryPlan(createTimeSeriesPlan).code);
+ assertEquals(200,
dataGroupMember.executeRequest(createTimeSeriesPlan).code);
assertTrue(IoTDB.schemaProcessor.isPathExist(new
PartialPath(timeseriesSchema.getFullPath())));
}
@@ -633,7 +633,7 @@ public class DataGroupMemberTest extends BaseMember {
getLogManager(
partitionTable.getPartitionGroup(new
RaftNode(TestUtils.getNode(0), 0)),
dataGroupMember));
- assertEquals(200,
dataGroupMember.executeNonQueryPlan(createTimeSeriesPlan).code);
+ assertEquals(200,
dataGroupMember.executeRequest(createTimeSeriesPlan).code);
assertTrue(IoTDB.schemaProcessor.isPathExist(new
PartialPath(timeseriesSchema.getFullPath())));
testThreadPool.shutdownNow();
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
index 37c9abba12..a1f2be8d2d 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogParser;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -116,7 +116,7 @@ public class SerializeUtilTest {
tabletPlan.setColumns(columns);
tabletPlan.setRowCount(times.length);
- Log log = new PhysicalPlanLog(tabletPlan);
+ Log log = new RequestLog(tabletPlan);
log.setCurrLogTerm(1);
log.setCurrLogIndex(2);
@@ -177,7 +177,7 @@ public class SerializeUtilTest {
plan.setAttributes(attributesList);
plan.setAlias(alias);
- Log log = new PhysicalPlanLog(plan);
+ Log log = new RequestLog(plan);
log.setCurrLogTerm(1);
log.setCurrLogIndex(2);