This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit e4f19e52c55ba2171ed939ce2b40da13c5c299fa
Author: Ring-k <[email protected]>
AuthorDate: Thu Jun 18 15:50:29 2020 +0800

    auto schema creation
---
 .../cluster/client/sync/SyncClientAdaptor.java     |  10 ++
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  21 ++--
 .../iotdb/cluster/server/DataClusterServer.java    |   7 ++
 .../cluster/server/member/DataGroupMember.java     |  37 ++++--
 .../cluster/server/member/MetaGroupMember.java     | 132 ++++++++++++++++++++-
 .../iotdb/cluster/server/member/RaftMember.java    |   2 +-
 service-rpc/src/main/thrift/cluster.thrift         |   2 +
 7 files changed, 188 insertions(+), 23 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index be44fc3..a00fbc1 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -242,6 +242,16 @@ public class SyncClientAdaptor {
     return resultReference.get();
   }
 
+  public static Map<String, Boolean> getUnregisteredMeasurements(DataClient 
client, Node header, List<String> seriesPaths) throws TException, 
InterruptedException {
+    AtomicReference<Map<String, Boolean>> remoteResult = new 
AtomicReference<>();
+    GenericHandler<Map<String, Boolean>> handler = new 
GenericHandler<>(client.getNode(), remoteResult);
+    synchronized (remoteResult) {
+      client.isMeasurementsRegistered(header, seriesPaths, handler);
+      remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+    }
+    return remoteResult.get();
+  }
+
   public static List<String> getAllPaths(DataClient client, Node header, 
List<String> pathsToQuery)
       throws InterruptedException, TException {
     AtomicReference<List<String>> remoteResult = new AtomicReference<>();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 94b3693..8376fe2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -97,8 +97,8 @@ public class ClusterPlanExecutor extends PlanExecutor {
 
   @Override
   public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext 
context)
-          throws IOException, StorageEngineException, 
QueryFilterOptimizationException, QueryProcessException,
-          MetadataException {
+      throws IOException, StorageEngineException, 
QueryFilterOptimizationException, QueryProcessException,
+      MetadataException {
     if (queryPlan instanceof QueryPlan) {
       logger.debug("Executing a query: {}", queryPlan);
       return processDataQuery((QueryPlan) queryPlan, context);
@@ -155,7 +155,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
    *
    * @param sgPathMap the key is the storage group name and the value is the 
path to be queried with
    *                  storage group added
-   * @param level the max depth to match the pattern, -1 means matching the 
whole pattern
+   * @param level     the max depth to match the pattern, -1 means matching 
the whole pattern
    * @return the number of paths that match the pattern at given level
    * @throws MetadataException
    */
@@ -168,7 +168,8 @@ public class ClusterPlanExecutor extends PlanExecutor {
       String storageGroupName = sgPathEntry.getKey();
       String pathUnderSG = sgPathEntry.getValue();
       // find the data group that should hold the timeseries schemas of the 
storage group
-      PartitionGroup partitionGroup = 
metaGroupMember.getPartitionTable().route(storageGroupName, 0);
+      PartitionGroup partitionGroup = metaGroupMember.getPartitionTable()
+          .route(storageGroupName, 0);
       if (partitionGroup.contains(metaGroupMember.getThisNode())) {
         // this node is a member of the group, perform a local query after 
synchronizing with the
         // leader
@@ -234,7 +235,8 @@ public class ClusterPlanExecutor extends PlanExecutor {
     return localResult;
   }
 
-  private int getRemotePathCount(PartitionGroup partitionGroup, List<String> 
pathsToQuery, int level)
+  private int getRemotePathCount(PartitionGroup partitionGroup, List<String> 
pathsToQuery,
+      int level)
       throws MetadataException {
     // choose the node with lowest latency or highest throughput
     List<Node> coordinatedNodes = 
QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
@@ -308,7 +310,8 @@ public class ClusterPlanExecutor extends PlanExecutor {
       return MManager.getInstance().getNodesList(schemaPattern, level,
           new 
SlotSgFilter(metaGroupMember.getPartitionTable().getNodeSlots(header)));
     } catch (MetadataException e) {
-      logger.error("Cannot not get node list of {}@{} from {} locally", 
schemaPattern, level, group);
+      logger
+          .error("Cannot not get node list of {}@{} from {} locally", 
schemaPattern, level, group);
       return Collections.emptyList();
     }
   }
@@ -570,7 +573,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
 
   @Override
   protected AlignByDeviceDataSet getAlignByDeviceDataSet(AlignByDevicePlan 
plan,
-                                                         QueryContext context, 
IQueryRouter router)
+      QueryContext context, IQueryRouter router)
       throws MetadataException {
     return new ClusterAlignByDeviceDataSet(plan, context, router, 
metaGroupMember);
   }
@@ -588,8 +591,8 @@ public class ClusterPlanExecutor extends PlanExecutor {
         break;
       default:
         throw new QueryProcessException(String
-                .format("Unrecognized load configuration plan type: %s",
-                        plan.getLoadConfigurationPlanType()));
+            .format("Unrecognized load configuration plan type: %s",
+                plan.getLoadConfigurationPlanType()));
     }
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 705ecb4..4876e2f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -384,6 +384,13 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   }
 
   @Override
+  public void isMeasurementsRegistered(Node header, List<String> measurements, 
AsyncMethodCallback<Map<String, Boolean>> resultHandler) throws TException {
+    DataGroupMember dataMember = getDataMember(header, resultHandler,
+            "Check if measurements are registered");
+    dataMember.isMeasurementsRegistered(header,measurements, resultHandler);
+  }
+
+  @Override
   public void getGroupByExecutor(GroupByRequest request, 
AsyncMethodCallback<Long> resultHandler) {
     DataGroupMember dataMember = getDataMember(request.getHeader(), 
resultHandler, request);
     dataMember.getGroupByExecutor(request, resultHandler);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index a2a174b..91e92f2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -98,7 +98,9 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -120,6 +122,7 @@ import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -998,7 +1001,6 @@ public class DataGroupMember extends RaftMember implements 
TSDataService.AsyncIf
         return status;
       }
     }
-
     return forwardPlan(plan, leader, getHeader());
   }
 
@@ -1062,8 +1064,8 @@ public class DataGroupMember extends RaftMember 
implements TSDataService.AsyncIf
 
   /**
    * Create an IPointReader of "path" with “timeFilter” and "valueFilter". A 
synchronization with
-   * the leader will be performed first to preserve strong consistency.
-   * TODO-Cluster: also support weak consistency
+   * the leader will be performed first to preserve strong consistency. 
TODO-Cluster: also support
+   * weak consistency
    *
    * @param path
    * @param dataType
@@ -1089,8 +1091,8 @@ public class DataGroupMember extends RaftMember 
implements TSDataService.AsyncIf
 
   /**
    * Create an IBatchReader of "path" with “timeFilter” and "valueFilter". A 
synchronization with
-   * the leader will be performed first to preserve strong consistency.
-   * TODO-Cluster: also support weak consistency
+   * the leader will be performed first to preserve strong consistency. 
TODO-Cluster: also support
+   * weak consistency
    *
    * @param path
    * @param dataType
@@ -1144,8 +1146,7 @@ public class DataGroupMember extends RaftMember 
implements TSDataService.AsyncIf
 
   /**
    * Create an IReaderByTimestamp of "path". A synchronization with the leader 
will be performed
-   * first to preserve strong consistency.
-   * TODO-Cluster: also support weak consistency
+   * first to preserve strong consistency. TODO-Cluster: also support weak 
consistency
    *
    * @param path
    * @param dataType
@@ -1583,6 +1584,28 @@ public class DataGroupMember extends RaftMember 
implements TSDataService.AsyncIf
     resultHandler.onComplete(resultBuffers);
   }
 
+  @Override
+  public void isMeasurementsRegistered(Node header, List<String> measurements,
+      AsyncMethodCallback<Map<String, Boolean>> resultHandler) throws 
TException {
+    if (!syncLeader()) {
+      resultHandler.onError(new LeaderUnknownException(getAllNodes()));
+      return;
+    }
+    Map<String, Boolean> result = new HashMap<>();
+    for (String seriesPath : measurements) {
+      try {
+        List<String> path = 
MManager.getInstance().getAllTimeseriesName(seriesPath);
+        if (path.size() != 1) {
+          throw new MetadataException("Size of the path is not 1.");
+        }
+        result.put(seriesPath, true);
+      } catch (MetadataException e) {
+        result.put(seriesPath, false);
+      }
+    }
+    resultHandler.onComplete(result);
+  }
+
   /**
    * Execute "aggregation" over "path" with "timeFilter". This method 
currently requires strong
    * consistency. Only data managed by this group will be used for aggregation.
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 8edd6ac..ffd0c6c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -126,9 +126,11 @@ import 
org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals;
 import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.cluster.utils.nodetool.function.Partition;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
 import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -139,9 +141,13 @@ import 
org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.MetaUtils;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -152,15 +158,22 @@ import 
org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.StringContainer;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -1520,8 +1533,27 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
     } catch (MetadataException e) {
       logger.error("Cannot route plan {}", plan, e);
     }
-    // the storage group is not found locally, forward it to the leader
+    // the storage group is not found locally
     if (planGroupMap == null || planGroupMap.isEmpty()) {
+      if (plan instanceof InsertPlan && 
IoTDBDescriptor.getInstance().getConfig()
+          .isAutoCreateSchemaEnabled()) {
+        System.out.println("try to set storage group");
+        String deviceId = ((InsertPlan) plan).getDeviceId();
+        try {
+          String storageGroupName = MetaUtils
+              .getStorageGroupNameByLevel(deviceId, 
IoTDBDescriptor.getInstance()
+                  .getConfig().getDefaultStorageGroupLevel());
+          SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(
+              new Path(storageGroupName));
+          TSStatus setStorageGroupResult = 
executeNonQuery(setStorageGroupPlan);
+          if (setStorageGroupResult.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            throw new MetadataException("Failed to set storage group " + 
storageGroupName);
+          }
+          return executeNonQuery(plan);
+        } catch (MetadataException e) {
+          logger.info("Failed to set storage group of device id {}", deviceId);
+        }
+      }
       logger.debug("{}: Cannot found storage groups for {}", name, plan);
       return StatusUtils.NO_STORAGE_GROUP;
     }
@@ -1551,6 +1583,19 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
         subStatus = forwardPlan(entry.getKey(), entry.getValue());
       }
       if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        if (entry.getKey() instanceof InsertPlan
+            && subStatus.getCode() == 
TSStatusCode.STORAGE_ENGINE_ERROR.getStatusCode()
+            && 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+          boolean hasCreate = autoCreateTimeseries((InsertPlan) 
entry.getKey(), entry.getValue());
+          if (hasCreate) {
+            Map<PhysicalPlan, PartitionGroup> subPlan = new HashMap<>();
+            subPlan.put(entry.getKey(), entry.getValue());
+            subStatus = forwardPlan(subPlan);
+            continue;
+          } else {
+            logger.error("{}, Cannot auto create timeseries.", thisNode);
+          }
+        }
         // execution failed, record the error message
         errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
             subStatus.getCode(), entry.getValue().getHeader(),
@@ -1569,6 +1614,77 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
     return status;
   }
 
+  boolean autoCreateTimeseries(InsertPlan insertPlan, PartitionGroup 
partitionGroup) {
+    List<String> seriesList = new ArrayList<>();
+    String deviceId = insertPlan.getDeviceId();
+    for (String measurementId : insertPlan.getMeasurements()) {
+      seriesList.add(
+          new StringContainer(new String[]{deviceId, measurementId}, 
TsFileConstant.PATH_SEPARATOR)
+              .toString());
+    }
+    List<String> unregistedSeriesList = getUnregisteredSeriesList(seriesList, 
partitionGroup);
+    for (String seriesPath : unregistedSeriesList) {
+      int index = seriesList.indexOf(seriesPath);
+      TSDataType dataType = TypeInferenceUtils
+          .getPredictedDataType(insertPlan.getValues()[index], true);
+      TSEncoding encoding = getDefaultEncoding(dataType);
+      CompressionType compressionType = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
+      CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new 
Path(seriesPath),
+          dataType, encoding, compressionType, null, null, null, null);
+      TSStatus result = executeNonQuery(createTimeSeriesPlan);
+      if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.error("{} failed to execute create timeseries {}", thisNode, 
seriesPath);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private TSEncoding getDefaultEncoding(TSDataType dataType) {
+    IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+    switch (dataType) {
+      case BOOLEAN:
+        return conf.getDefaultBooleanEncoding();
+      case INT32:
+        return conf.getDefaultInt32Encoding();
+      case INT64:
+        return conf.getDefaultInt64Encoding();
+      case FLOAT:
+        return conf.getDefaultFloatEncoding();
+      case DOUBLE:
+        return conf.getDefaultDoubleEncoding();
+      case TEXT:
+        return conf.getDefaultTextEncoding();
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported.", 
dataType.toString()));
+    }
+  }
+
+  List<String> getUnregisteredSeriesList(List<String> seriesList, 
PartitionGroup partitionGroup) {
+    Set<String> unregistered = new HashSet<>();
+    for (Node node : partitionGroup) {
+      try {
+        DataClient client = getDataClient(node);
+        Map<String, Boolean> result = SyncClientAdaptor
+            .getUnregisteredMeasurements(client, partitionGroup.getHeader(), 
seriesList);
+        for (Map.Entry<String, Boolean> entry : result.entrySet()) {
+          if (!entry.getValue()) {
+            unregistered.add(entry.getKey());
+          }
+        }
+      } catch (TException | IOException e) {
+        logger.error("{}: cannot getting unregistered series list {} from {}", 
name,
+            Arrays.toString(seriesList.toArray(new String[0])), node, e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.error("{}: getting unregistered series list {} is interrupted 
from {}", name,
+            Arrays.toString(seriesList.toArray(new String[0])), node, e);
+      }
+    }
+    return new ArrayList<String>(unregistered);
+  }
+
   /**
    * Forward a plan to the DataGroupMember of one node in the group. Only when 
all nodes time out,
    * will a TIME_OUT be returned.
@@ -1700,7 +1816,8 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
         if (logger.isDebugEnabled()) {
           logger.debug("{}: Pulled {} timeseries schemas of {} and other {} 
paths from {} of {}",
               name,
-              schemas.size(), prefixPaths.get(0), prefixPaths.size() - 1, 
node, partitionGroup.getHeader());
+              schemas.size(), prefixPaths.get(0), prefixPaths.size() - 1, node,
+              partitionGroup.getHeader());
         }
         results.addAll(schemas);
         break;
@@ -1718,7 +1835,8 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
    * @return
    * @throws MetadataException
    */
-  public Pair<List<TSDataType>, List<TSDataType>> 
getSeriesTypesByPath(List<Path> paths, List<String> aggregations) throws
+  public Pair<List<TSDataType>, List<TSDataType>> 
getSeriesTypesByPath(List<Path> paths,
+      List<String> aggregations) throws
       MetadataException {
     try {
       // try locally first
@@ -1761,7 +1879,7 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
           MeasurementSchema schema = schemas.get(i);
           columnType.add(schema.getType());
           MManager.getInstance().cacheSchema(paths.get(i).getDevice() +
-                  IoTDBConstant.PATH_SEPARATOR + schema.getMeasurementId(), 
schema);
+              IoTDBConstant.PATH_SEPARATOR + schema.getMeasurementId(), 
schema);
         } else {
           columnType.add(dataType);
         }
@@ -1781,7 +1899,8 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
    * @return
    * @throws MetadataException
    */
-  public Pair<List<TSDataType>, List<TSDataType>> 
getSeriesTypesByString(List<String> pathStrs, String aggregation) throws
+  public Pair<List<TSDataType>, List<TSDataType>> 
getSeriesTypesByString(List<String> pathStrs,
+      String aggregation) throws
       MetadataException {
     try {
       // try locally first
@@ -1793,7 +1912,8 @@ public class MetaGroupMember extends RaftMember 
implements TSMetaService.AsyncIf
       } else {
         // if the aggregation function is not null,
         // we should recalculate the type of column in result set
-        List<TSDataType> columnDataTypes = 
SchemaUtils.getSeriesTypesByString(pathStrs, aggregation);
+        List<TSDataType> columnDataTypes = SchemaUtils
+            .getSeriesTypesByString(pathStrs, aggregation);
         return new Pair<>(columnDataTypes, measurementDataTypes);
       }
     } catch (PathNotExistException e) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 5fd6189..14ec490 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -78,6 +78,7 @@ import 
org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.thrift.TException;
@@ -980,7 +981,6 @@ public abstract class RaftMember implements 
RaftService.AsyncIface {
     if (readOnly) {
       return StatusUtils.NODE_READ_ONLY;
     }
-
     PhysicalPlanLog log = new PhysicalPlanLog();
     // assign term and index to the new log and append it
     synchronized (logManager) {
diff --git a/service-rpc/src/main/thrift/cluster.thrift 
b/service-rpc/src/main/thrift/cluster.thrift
index b8b7daf..39d59ae 100644
--- a/service-rpc/src/main/thrift/cluster.thrift
+++ b/service-rpc/src/main/thrift/cluster.thrift
@@ -350,6 +350,8 @@ service TSDataService extends RaftService {
 
   list<binary> getAggrResult(1:GetAggrResultRequest request)
 
+  map<string, bool> isMeasurementsRegistered(1: Node header, 2: list<string> 
measurements)
+
   PullSnapshotResp pullSnapshot(1:PullSnapshotRequest request)
 
   /**

Reply via email to