This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 28c6635  add null-read metda group before applying create ts
28c6635 is described below

commit 28c6635b6aa92c470d73ebc9ebd10c5b039417af
Author: lta <[email protected]>
AuthorDate: Mon Apr 1 19:37:45 2019 +0800

    add null-read metda group before applying create ts
---
 .../org/apache/iotdb/cluster/entity/Server.java    |  8 +-
 .../cluster/entity/raft/DataStateMachine.java      | 86 ++++++++++++++++++----
 .../cluster/entity/raft/MetadataStateManchine.java |  6 +-
 .../cluster/qp/executor/NonQueryExecutor.java      | 24 +++---
 .../cluster/qp/executor/QueryMetadataExecutor.java | 10 +--
 .../org/apache/iotdb/cluster/rpc/MetadataType.java | 23 ------
 ...r.java => DataGroupNonQueryAsyncProcessor.java} | 28 ++++---
 ...r.java => MetaGroupNonQueryAsyncProcessor.java} | 25 +++----
 ...yRequest.java => DataGroupNonQueryRequest.java} | 10 ++-
 ...yRequest.java => MetaGroupNonQueryRequest.java} |  4 +-
 ...esponse.java => DataGroupNonQueryResponse.java} |  6 +-
 ...esponse.java => MetaGroupNonQueryResponse.java} |  6 +-
 12 files changed, 138 insertions(+), 98 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 0c7d568..d489c11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -29,8 +29,8 @@ import 
org.apache.iotdb.cluster.entity.data.DataPartitionHolder;
 import org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
-import org.apache.iotdb.cluster.rpc.processor.DataNonQueryAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.processor.MetadataNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.processor.DataGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.processor.MetaGroupNonQueryAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.processor.QueryTimeSeriesAsyncProcessor;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -63,8 +63,8 @@ public class Server {
     RpcServer rpcServer = new RpcServer(serverId.getPort());
     RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
 
-    rpcServer.registerUserProcessor(new DataNonQueryAsyncProcessor(this));
-    rpcServer.registerUserProcessor(new MetadataNonQueryAsyncProcessor(this));
+    rpcServer.registerUserProcessor(new DataGroupNonQueryAsyncProcessor(this));
+    rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor(this));
     rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor(this));
 
     metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, 
true);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index ff26108..ee7bded 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -23,20 +23,27 @@ import com.alipay.remoting.serialization.SerializerManager;
 import com.alipay.sofa.jraft.Closure;
 import com.alipay.sofa.jraft.Iterator;
 import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import com.alipay.sofa.jraft.core.StateMachineAdapter;
 import com.alipay.sofa.jraft.entity.LeaderChangeContext;
 import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.util.Bits;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.callback.QPTask;
+import org.apache.iotdb.cluster.callback.SingleQPTask;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.rpc.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
 import org.slf4j.Logger;
@@ -46,11 +53,13 @@ public class DataStateMachine extends StateMachineAdapter {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataStateMachine.class);
 
+  private Server server = Server.getInstance();
   private OverflowQPExecutor qpExecutor = new OverflowQPExecutor();
   private PeerId peerId;
   private String groupId;
   private AtomicLong leaderTerm = new AtomicLong(-1);
   private MManager mManager = MManager.getInstance();
+  private final AtomicInteger requestId = new AtomicInteger(0);
 
   public DataStateMachine(String groupId, PeerId peerId) {
     this.peerId = peerId;
@@ -62,32 +71,81 @@ public class DataStateMachine extends StateMachineAdapter {
     while (iterator.hasNext()) {
 
       Closure closure = null;
-      DataNonQueryRequest request = null;
-      if (iterator.done() != null) {
-        closure = iterator.done();
-      }
+      DataGroupNonQueryRequest request = null;
       final ByteBuffer data = iterator.getData();
       try {
         request = SerializerManager.getSerializer(SerializerManager.Hessian2)
-            .deserialize(data.array(), DataNonQueryRequest.class.getName());
+            .deserialize(data.array(), 
DataGroupNonQueryRequest.class.getName());
       } catch (final CodecException e) {
         LOGGER.error("Fail to decode IncrementAndGetRequest", e);
       }
+      if (iterator.done() != null) {
+        /** It's leader to apply task **/
+        closure = iterator.done();
+      }
       Status status = Status.OK();
+      /** If the request is to set path, it needs to run null-read in meta 
group to avoid out of data sync **/
+      if (request.getOperatorType() == OperatorType.SET_STORAGE_GROUP) {
+        SingleQPTask nullReadTask = new SingleQPTask(false, null);
+        try {
+          LOGGER.info("handle add path request.");
+          handleNullReadToMetaGroup(nullReadTask, status);
+          nullReadTask.await();
+        } catch (InterruptedException e) {
+          status.setCode(1);
+          status.setErrorMsg(e.toString());
+        }
+      }
+      LOGGER.info("handle physical plan");
+      handlePhysicalPlan(status, request, closure);
+      iterator.next();
+    }
+  }
+
+  /**
+   * Handle Physical plan in state machine and return message if this node is 
leader of the group
+   */
+  private void handlePhysicalPlan(Status status, DataGroupNonQueryRequest 
request,
+      Closure closure) {
+    if (status.isOk()) {
       try {
         assert request != null;
-          PhysicalPlan plan = PhysicalPlanLogTransfer
-              .logToOperator(request.getPhysicalPlanBytes());
-          qpExecutor.processNonQuery(plan);
+        PhysicalPlan plan = PhysicalPlanLogTransfer
+            .logToOperator(request.getPhysicalPlanBytes());
+        qpExecutor.processNonQuery(plan);
       } catch (ProcessorException | IOException e) {
         LOGGER.error("Execute physical plan error", e);
         status = new Status(1, e.toString());
       }
-      if (closure != null) {
-        closure.run(status);
-      }
-      iterator.next();
     }
+    if (closure != null) {
+      closure.run(status);
+    }
+  }
+
+  /**
+   * Handle null-read process in metadata group if the request is to set path.
+   *
+   * @param qpTask null-read task
+   * @param originStatus status to return result if this node is leader of the 
data group
+   */
+  private void handleNullReadToMetaGroup(QPTask qpTask, Status originStatus) {
+    final byte[] reqContext = new byte[4];
+    Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+    MetadataRaftHolder metadataRaftHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
+    ((RaftService) metadataRaftHolder.getService()).getNode()
+        .readIndex(reqContext, new ReadIndexClosure() {
+          @Override
+          public void run(Status status, long index, byte[] reqCtx) {
+            BasicResponse response = new BasicResponse(false, true, null, 
null) {
+            };
+            if (!status.isOk()) {
+              originStatus.setCode(1);
+              originStatus.setErrorMsg(status.getErrorMsg());
+            }
+            qpTask.run(response);
+          }
+        });
   }
 
   @Override
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 004aefa..dab5049 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -30,7 +30,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetaGroupNonQueryRequest;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -80,14 +80,14 @@ public class MetadataStateManchine extends 
StateMachineAdapter {
     while (iterator.hasNext()) {
 
       Closure closure = null;
-      MetadataNonQueryRequest request = null;
+      MetaGroupNonQueryRequest request = null;
       if (iterator.done() != null) {
         closure = iterator.done();
       }
       final ByteBuffer data = iterator.getData();
       try {
         request = SerializerManager.getSerializer(SerializerManager.Hessian2)
-            .deserialize(data.array(), 
MetadataNonQueryRequest.class.getName());
+            .deserialize(data.array(), 
MetaGroupNonQueryRequest.class.getName());
       } catch (final CodecException e) {
         LOGGER.error("Fail to decode IncrementAndGetRequest", e);
       }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index c333f38..2c354cc 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.cluster.qp.executor;
 
 import com.alipay.remoting.exception.CodecException;
 import com.alipay.remoting.serialization.SerializerManager;
+import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.entity.Task;
 import com.alipay.sofa.jraft.option.CliOptions;
@@ -33,10 +34,10 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
 import org.apache.iotdb.cluster.rpc.request.BasicRequest;
-import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetaGroupNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.response.DataNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.response.DataGroupNonQueryResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -192,7 +193,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
       throws IOException, RaftConnectionException, InterruptedException {
     String groupId = getGroupIdBySG(storageGroup);
     PeerId leader = RaftUtils.getTargetPeerID(groupId);
-    DataNonQueryRequest request = new DataNonQueryRequest(groupId, plan);
+    DataGroupNonQueryRequest request = new DataGroupNonQueryRequest(groupId, 
plan);
     SingleQPTask qpTask = new SingleQPTask(false, request);
     /** Check if the plan can be executed locally. **/
     if (canHandleNonQuery(storageGroup)) {
@@ -207,13 +208,17 @@ public class NonQueryExecutor extends ClusterQPExecutor {
    */
   private boolean handleDataGroupRequestLocally(String groupId, QPTask qpTask, 
BasicRequest request)
       throws InterruptedException {
-    final byte[] reqContext = new byte[4];
-    Task task = null;
+    Task task = new Task();
+    task.setDone((Status status) -> {
+      BasicResponse response = new DataGroupNonQueryResponse(false, 
status.isOk(), null,
+          status.getErrorMsg());
+      qpTask.run(response);
+    });
     /** Apply qpTask to Raft Node **/
     try {
       task.setData(ByteBuffer
           .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
-              .serialize(reqContext)));
+              .serialize(request)));
     } catch (final CodecException e) {
       return false;
     }
@@ -222,7 +227,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
     RaftService service = (RaftService) dataRaftHolder.getService();
     service.getNode().apply(task);
     qpTask.await();
-    DataNonQueryResponse response = (DataNonQueryResponse) 
qpTask.getResponse();
+    DataGroupNonQueryResponse response = (DataGroupNonQueryResponse) 
qpTask.getResponse();
     return response.isSuccess();
   }
 
@@ -247,7 +252,8 @@ public class NonQueryExecutor extends ClusterQPExecutor {
    */
   public boolean redirectMetadataGroupLeader(PhysicalPlan plan)
       throws IOException, RaftConnectionException, InterruptedException {
-    MetadataNonQueryRequest request = new 
MetadataNonQueryRequest(CLUSTER_CONFIG.METADATA_GROUP_ID,
+    MetaGroupNonQueryRequest request = new MetaGroupNonQueryRequest(
+        CLUSTER_CONFIG.METADATA_GROUP_ID,
         plan);
     PeerId leader = 
RaftUtils.getTargetPeerID(CLUSTER_CONFIG.METADATA_GROUP_ID);
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 7967536..bc15218 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -45,7 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Handle show all storage group logic
+ * Handle < show timeseries <path> > logic
  */
 public class QueryMetadataExecutor extends ClusterQPExecutor {
 
@@ -70,7 +70,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     String storageGroup = getStroageGroupByDevice(path);
     String groupId = getGroupIdBySG(storageGroup);
     QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId, path);
-    PeerId leader = RaftUtils.getRandomPeerID(groupId);
+    PeerId holder = RaftUtils.getRandomPeerID(groupId);
     SingleQPTask task = new SingleQPTask(false, request);
 
     LOGGER.info("Execute show timeseries {} statement.", path);
@@ -80,7 +80,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
       return queryTimeSeriesLocally(path, groupId, task);
     } else {
       try {
-        return queryTimeSeries(task, leader);
+        return queryTimeSeries(task, holder);
       } catch (RaftConnectionException e) {
         LOGGER.error(e.getMessage());
         throw new ProcessorException("Raft connection occurs error.", e);
@@ -114,12 +114,8 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
                 response = new QueryTimeSeriesResponse(false, false, null, 
e.toString());
               }
             } else {
-
-              System.out.println("false");
               response = new QueryTimeSeriesResponse(false, false, null, null);
             }
-            System.out.println(status.isOk());
-            System.out.println();
             task.run(response);
           }
         });
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/MetadataType.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/MetadataType.java
deleted file mode 100644
index 9205241..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/MetadataType.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.rpc;
-
-public enum MetadataType {
-  STORAGE_GROUP, TIME_SERIES
-}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
similarity index 77%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
index f0134f1..5e756fa 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
@@ -31,34 +31,32 @@ import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.response.DataNonQueryResponse;
-import org.apache.iotdb.cluster.rpc.response.MetadataNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.DataGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.response.MetaGroupNonQueryResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
-import org.apache.iotdb.db.qp.logical.Operator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Async handle those requests which need to be applied in data group.
  */
-public class DataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<DataNonQueryRequest> {
+public class DataGroupNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<DataGroupNonQueryRequest> {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNonQueryAsyncProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataGroupNonQueryAsyncProcessor.class);
   private Server server;
 
-  public DataNonQueryAsyncProcessor(Server server) {
+  public DataGroupNonQueryAsyncProcessor(Server server) {
     this.server = server;
   }
 
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
-      DataNonQueryRequest dataNonQueryRequest) {
+      DataGroupNonQueryRequest dataGroupNonQueryRequest) {
     LOGGER.info("Handle data non query request.");
 
     /** Check if it's the leader **/
-    String groupId = dataNonQueryRequest.getGroupID();
+    String groupId = dataGroupNonQueryRequest.getGroupID();
     DataPartitionRaftHolder dataPartitionRaftHolder = 
(DataPartitionRaftHolder) server
         .getDataPartitionHolderMap().get(groupId);
     if (!dataPartitionRaftHolder.getFsm().isLeader()) {
@@ -67,7 +65,7 @@ public class DataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<DataNonQ
       BoltCliClientService cliClientService = new BoltCliClientService();
       cliClientService.init(new CliOptions());
       LOGGER.info("Right leader is: {}, group id = {} ", leader, groupId);
-      MetadataNonQueryResponse response = new MetadataNonQueryResponse(true, 
false,
+      MetaGroupNonQueryResponse response = new MetaGroupNonQueryResponse(true, 
false,
           leader.toString(), null);
       asyncContext.sendResponse(response);
     } else {
@@ -77,14 +75,14 @@ public class DataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<DataNonQ
       final Task task = new Task();
       task.setDone((Status status) -> {
         asyncContext.sendResponse(
-            new DataNonQueryResponse(false, status.isOk(), null, 
status.getErrorMsg()));
+            new DataGroupNonQueryResponse(false, status.isOk(), null, 
status.getErrorMsg()));
       });
       try {
         task.setData(ByteBuffer
             .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
-                .serialize(dataNonQueryRequest)));
+                .serialize(dataGroupNonQueryRequest)));
       } catch (final CodecException e) {
-        asyncContext.sendResponse(new MetadataNonQueryResponse(false, false, 
null, e.toString()));
+        asyncContext.sendResponse(new MetaGroupNonQueryResponse(false, false, 
null, e.toString()));
       }
       DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder) server
           .getDataPartitionHolderMap().get(groupId);
@@ -95,6 +93,6 @@ public class DataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<DataNonQ
 
   @Override
   public String interest() {
-    return DataNonQueryRequest.class.getName();
+    return DataGroupNonQueryRequest.class.getName();
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
similarity index 77%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
index 31e83ce..1a31f0d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
@@ -31,32 +31,31 @@ import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.response.MetadataNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.request.MetaGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.MetaGroupNonQueryResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
-import org.apache.iotdb.db.qp.logical.Operator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Async handle those requests which need to be applied in metadata group.
  */
-public class MetadataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<MetadataNonQueryRequest> {
+public class MetaGroupNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<MetaGroupNonQueryRequest> {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataNonQueryAsyncProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetaGroupNonQueryAsyncProcessor.class);
   private Server server;
 
-  public MetadataNonQueryAsyncProcessor(Server server) {
+  public MetaGroupNonQueryAsyncProcessor(Server server) {
     this.server = server;
   }
 
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
-      MetadataNonQueryRequest metadataNonQueryRequest) {
+      MetaGroupNonQueryRequest metaGroupNonQueryRequest) {
     LOGGER.info("Handle metadata non query query request.");
 
     /** Check if it's the leader **/
-    String groupId = metadataNonQueryRequest.getGroupID();
+    String groupId = metaGroupNonQueryRequest.getGroupID();
     MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
     if (!metadataHolder.getFsm().isLeader()) {
       PeerId leader = RaftUtils.getTargetPeerID(groupId);
@@ -64,7 +63,7 @@ public class MetadataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<Meta
       BoltCliClientService cliClientService = new BoltCliClientService();
       cliClientService.init(new CliOptions());
       LOGGER.info("Right leader is: {}, group id = {} ", leader, groupId);
-      MetadataNonQueryResponse response = new MetadataNonQueryResponse(true, 
false,
+      MetaGroupNonQueryResponse response = new MetaGroupNonQueryResponse(true, 
false,
           leader.toString(), null);
       asyncContext.sendResponse(response);
     } else {
@@ -74,14 +73,14 @@ public class MetadataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<Meta
       final Task task = new Task();
       task.setDone((Status status) -> {
         asyncContext.sendResponse(
-            new MetadataNonQueryResponse(false, status.isOk(), null, 
status.getErrorMsg()));
+            new MetaGroupNonQueryResponse(false, status.isOk(), null, 
status.getErrorMsg()));
       });
       try {
         task.setData(ByteBuffer
             .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
-                .serialize(metadataNonQueryRequest)));
+                .serialize(metaGroupNonQueryRequest)));
       } catch (final CodecException e) {
-        asyncContext.sendResponse(new MetadataNonQueryResponse(false, false, 
null, e.toString()));
+        asyncContext.sendResponse(new MetaGroupNonQueryResponse(false, false, 
null, e.toString()));
       }
 
       RaftService service = (RaftService) metadataHolder.getService();
@@ -91,6 +90,6 @@ public class MetadataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<Meta
 
   @Override
   public String interest() {
-    return MetadataNonQueryRequest.class.getName();
+    return MetaGroupNonQueryRequest.class.getName();
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataGroupNonQueryRequest.java
similarity index 78%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataGroupNonQueryRequest.java
index c5fb573..e9ec9d9 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataGroupNonQueryRequest.java
@@ -21,27 +21,33 @@ package org.apache.iotdb.cluster.rpc.request;
 import java.io.IOException;
 import java.io.Serializable;
 import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
 
 /**
  * Handle request to data group
  */
-public class DataNonQueryRequest extends BasicRequest implements Serializable {
+public class DataGroupNonQueryRequest extends BasicRequest implements 
Serializable {
 
   /**
    * Serialized physical plan
    */
   private byte[] physicalPlanBytes;
+  private Operator.OperatorType operatorType;
 
-  public DataNonQueryRequest(String groupID, PhysicalPlan plan)
+  public DataGroupNonQueryRequest(String groupID, PhysicalPlan plan)
       throws IOException {
     super(groupID);
     this.physicalPlanBytes = PhysicalPlanLogTransfer.operatorToLog(plan);
+    this.operatorType = plan.getOperatorType();
   }
 
   public byte[] getPhysicalPlanBytes() {
     return physicalPlanBytes;
   }
 
+  public OperatorType getOperatorType() {
+    return operatorType;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetaGroupNonQueryRequest.java
similarity index 90%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetaGroupNonQueryRequest.java
index dd91e09..fba5734 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetaGroupNonQueryRequest.java
@@ -27,14 +27,14 @@ import 
org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
 /**
  * Handle request to metadata group leader
  */
-public class MetadataNonQueryRequest extends BasicRequest implements 
Serializable {
+public class MetaGroupNonQueryRequest extends BasicRequest implements 
Serializable {
 
   /**
    * Serialized physical plan
    */
   private byte[] physicalPlanBytes;
 
-  public MetadataNonQueryRequest(String groupID, PhysicalPlan plan)
+  public MetaGroupNonQueryRequest(String groupID, PhysicalPlan plan)
       throws IOException {
     super(groupID);
     this.physicalPlanBytes = PhysicalPlanLogTransfer.operatorToLog(plan);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
similarity index 80%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
index 6f430d0..11bf173 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
@@ -21,13 +21,13 @@ package org.apache.iotdb.cluster.rpc.response;
 /**
  * Handle response from data group leader
  */
-public class DataNonQueryResponse extends BasicResponse {
+public class DataGroupNonQueryResponse extends BasicResponse {
 
-  public DataNonQueryResponse(boolean redirected, boolean success, String 
leaderStr, String errorMsg) {
+  public DataGroupNonQueryResponse(boolean redirected, boolean success, String 
leaderStr, String errorMsg) {
     super(redirected, success, leaderStr, errorMsg);
   }
 
-  public DataNonQueryResponse(boolean redirected, boolean success) {
+  public DataGroupNonQueryResponse(boolean redirected, boolean success) {
     super(redirected, success, null, null);
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
similarity index 80%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
index f23fa1c..60b4171 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
@@ -21,13 +21,13 @@ package org.apache.iotdb.cluster.rpc.response;
 /**
  * Handle response from metadata group leader
  */
-public class MetadataNonQueryResponse extends BasicResponse {
+public class MetaGroupNonQueryResponse extends BasicResponse {
 
-  public MetadataNonQueryResponse(boolean redirected, boolean success, String 
leaderStr, String errorMsg) {
+  public MetaGroupNonQueryResponse(boolean redirected, boolean success, String 
leaderStr, String errorMsg) {
     super(redirected, success, leaderStr, errorMsg);
   }
 
-  public MetadataNonQueryResponse(boolean redirected, boolean success) {
+  public MetaGroupNonQueryResponse(boolean redirected, boolean success) {
     super(redirected, success, null, null);
   }
 

Reply via email to