This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2245ea5  [IOTDB-1144]support the show devices plan in distributed 
version (#2619)
2245ea5 is described below

commit 2245ea5b98ecfb185bfc656cb93d350eef9bb5b3
Author: HouliangQi <[email protected]>
AuthorDate: Thu Feb 25 15:55:58 2021 +0800

    [IOTDB-1144]support the show devices plan in distributed version (#2619)
---
 .../cluster/client/sync/SyncClientAdaptor.java     |  21 +++
 .../apache/iotdb/cluster/metadata/CMManager.java   | 159 ++++++++++++++++++++-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |  18 +++
 .../iotdb/cluster/server/DataClusterServer.java    |  13 ++
 .../server/handlers/caller/GetDevicesHandler.java  |  60 ++++++++
 .../cluster/server/service/DataAsyncService.java   |  10 ++
 .../cluster/server/service/DataSyncService.java    |   9 ++
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   8 +-
 .../iotdb/db/qp/physical/sys/ShowDevicesPlan.java  |  26 ++++
 .../db/qp/physical/sys/ShowTimeSeriesPlan.java     |   2 +-
 .../iotdb/db/query/dataset/ShowDevicesResult.java  |  44 ++++++
 .../db/query/dataset/ShowDevicesResultTest.java}   |  29 ++--
 thrift/src/main/thrift/cluster.thrift              |   5 +
 13 files changed, 387 insertions(+), 17 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 809a140..d518c62 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import 
org.apache.iotdb.cluster.server.handlers.caller.GetChildNodeNextLevelPathHandler;
+import org.apache.iotdb.cluster.server.handlers.caller.GetDevicesHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GetNodesListHandler;
 import 
org.apache.iotdb.cluster.server.handlers.caller.GetTimeseriesSchemaHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.JoinClusterHandler;
@@ -51,6 +52,7 @@ import 
org.apache.iotdb.cluster.server.handlers.forwarder.ForwardPlanHandler;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.SerializeUtils;
@@ -359,6 +361,25 @@ public class SyncClientAdaptor {
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
+  public static ByteBuffer getDevices(AsyncDataClient client, Node header, 
ShowDevicesPlan plan)
+      throws InterruptedException, TException, IOException {
+    GetDevicesHandler handler = new GetDevicesHandler();
+    AtomicReference<ByteBuffer> response = new AtomicReference<>(null);
+    handler.setResponse(response);
+    handler.setContact(client.getNode());
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
+    plan.serialize(dataOutputStream);
+
+    client.getDevices(header, 
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()), handler);
+    synchronized (handler) {
+      if (response.get() == null) {
+        response.wait(RaftServer.getReadOperationTimeoutMS());
+      }
+    }
+    return response.get();
+  }
+
   public static Long getGroupByExecutor(AsyncDataClient client, GroupByRequest 
request)
       throws TException, InterruptedException {
     AtomicReference<Long> result = new AtomicReference<>();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index c5094c8..ee26656 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -57,8 +57,10 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.SchemaUtils;
@@ -1436,6 +1438,53 @@ public class CMManager extends MManager {
     return super.showTimeseries(plan, context);
   }
 
+  public List<ShowDevicesResult> getLocalDevices(ShowDevicesPlan plan) throws 
MetadataException {
+    return super.getDevices(plan);
+  }
+
+  @Override
+  public List<ShowDevicesResult> getDevices(ShowDevicesPlan plan) throws 
MetadataException {
+    ConcurrentSkipListSet<ShowDevicesResult> resultSet = new 
ConcurrentSkipListSet<>();
+    ExecutorService pool =
+        new ThreadPoolExecutor(
+            THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new 
LinkedBlockingDeque<>());
+    List<PartitionGroup> globalGroups = 
metaGroupMember.getPartitionTable().getGlobalGroups();
+
+    int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
+    int offset = plan.getOffset();
+    // do not use limit and offset in sub-queries unless offset is 0, 
otherwise the results are
+    // not combinable
+    if (offset != 0) {
+      plan.setLimit(0);
+      plan.setOffset(0);
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Fetch devices schemas of {} from {} groups", plan.getPath(), 
globalGroups.size());
+    }
+
+    List<Future<Void>> futureList = new ArrayList<>();
+    for (PartitionGroup group : globalGroups) {
+      futureList.add(
+          pool.submit(
+              () -> {
+                try {
+                  getDevices(group, plan, resultSet);
+                } catch (CheckConsistencyException e) {
+                  logger.error("Cannot get show devices result of {} from {}", 
plan, group);
+                }
+                return null;
+              }));
+    }
+
+    waitForThreadPool(futureList, pool, "getDevices()");
+    List<ShowDevicesResult> showDevicesResults =
+        applyShowDevicesLimitOffset(resultSet, limit, offset);
+    logger.debug("show devices {} has {} results", plan.getPath(), 
showDevicesResults.size());
+    return showDevicesResults;
+  }
+
   @Override
   public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, 
QueryContext context)
       throws MetadataException {
@@ -1497,6 +1546,22 @@ public class CMManager extends MManager {
     return showTimeSeriesResults;
   }
 
+  private List<ShowDevicesResult> applyShowDevicesLimitOffset(
+      Set<ShowDevicesResult> resultSet, int limit, int offset) {
+    List<ShowDevicesResult> showDevicesResults = new ArrayList<>();
+    Iterator<ShowDevicesResult> iterator = resultSet.iterator();
+    while (iterator.hasNext() && limit > 0) {
+      if (offset > 0) {
+        offset--;
+        iterator.next();
+      } else {
+        limit--;
+        showDevicesResults.add(iterator.next());
+      }
+    }
+    return showDevicesResults;
+  }
+
   private void showTimeseries(
       PartitionGroup group,
       ShowTimeSeriesPlan plan,
@@ -1510,6 +1575,32 @@ public class CMManager extends MManager {
     }
   }
 
+  private void getDevices(
+      PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> 
resultSet)
+      throws CheckConsistencyException, MetadataException {
+    if (group.contains(metaGroupMember.getThisNode())) {
+      getLocalDevices(group, plan, resultSet);
+    } else {
+      getRemoteDevices(group, plan, resultSet);
+    }
+  }
+
+  private void getLocalDevices(
+      PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> 
resultSet)
+      throws CheckConsistencyException, MetadataException {
+    Node header = group.getHeader();
+    DataGroupMember localDataMember = 
metaGroupMember.getLocalDataMember(header);
+    localDataMember.syncLeaderWithConsistencyCheck(false);
+    try {
+      List<ShowDevicesResult> localResult = super.getDevices(plan);
+      resultSet.addAll(localResult);
+      logger.debug("Fetched {} devices of {} from {}", localResult.size(), 
plan.getPath(), group);
+    } catch (MetadataException e) {
+      logger.error("Cannot execute show devices plan {} from {} locally.", 
plan, group);
+      throw e;
+    }
+  }
+
   private void showLocalTimeseries(
       PartitionGroup group,
       ShowTimeSeriesPlan plan,
@@ -1522,7 +1613,11 @@ public class CMManager extends MManager {
     try {
       List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, 
context);
       resultSet.addAll(localResult);
-      logger.debug("Fetched {} schemas of {} from {}", localResult.size(), 
plan.getPath(), group);
+      logger.debug(
+          "Fetched local timeseries {} schemas of {} from {}",
+          localResult.size(),
+          plan.getPath(),
+          group);
     } catch (MetadataException e) {
       logger.error("Cannot execute show timeseries plan  {} from {} locally.", 
plan, group);
       throw e;
@@ -1550,7 +1645,8 @@ public class CMManager extends MManager {
 
     if (resultBinary != null) {
       int size = resultBinary.getInt();
-      logger.debug("Fetched {} schemas of {} from {}", size, plan.getPath(), 
group);
+      logger.debug(
+          "Fetched remote timeseries {} schemas of {} from {}", size, 
plan.getPath(), group);
       for (int i = 0; i < size; i++) {
         resultSet.add(ShowTimeSeriesResult.deserialize(resultBinary));
       }
@@ -1559,6 +1655,36 @@ public class CMManager extends MManager {
     }
   }
 
+  private void getRemoteDevices(
+      PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult> 
resultSet) {
+    ByteBuffer resultBinary = null;
+    for (Node node : group) {
+      try {
+        resultBinary = getRemoteDevices(node, group, plan);
+        if (resultBinary != null) {
+          break;
+        }
+      } catch (IOException e) {
+        logger.error(LOG_FAIL_CONNECT, node, e);
+      } catch (TException e) {
+        logger.error("Error occurs when getting devices schemas in node {}.", 
node, e);
+      } catch (InterruptedException e) {
+        logger.error("Interrupted when getting devices schemas in node {}.", 
node, e);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    if (resultBinary != null) {
+      int size = resultBinary.getInt();
+      logger.debug("Fetched remote devices {} schemas of {} from {}", size, 
plan.getPath(), group);
+      for (int i = 0; i < size; i++) {
+        resultSet.add(ShowDevicesResult.deserialize(resultBinary));
+      }
+    } else {
+      logger.error("Failed to execute show devices {} in group: {}.", plan, 
group);
+    }
+  }
+
   private ByteBuffer showRemoteTimeseries(Node node, PartitionGroup group, 
ShowTimeSeriesPlan plan)
       throws IOException, TException, InterruptedException {
     ByteBuffer resultBinary;
@@ -1588,6 +1714,35 @@ public class CMManager extends MManager {
     return resultBinary;
   }
 
+  private ByteBuffer getRemoteDevices(Node node, PartitionGroup group, 
ShowDevicesPlan plan)
+      throws IOException, TException, InterruptedException {
+    ByteBuffer resultBinary;
+    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+      AsyncDataClient client =
+          metaGroupMember
+              .getClientProvider()
+              .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
+      resultBinary = SyncClientAdaptor.getDevices(client, group.getHeader(), 
plan);
+    } else {
+      SyncDataClient syncDataClient = null;
+      try {
+        syncDataClient =
+            metaGroupMember
+                .getClientProvider()
+                .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
+        plan.serialize(dataOutputStream);
+        resultBinary =
+            syncDataClient.getDevices(
+                group.getHeader(), 
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+      } finally {
+        ClientUtils.putBackSyncClient(syncDataClient);
+      }
+    }
+    return resultBinary;
+  }
+
   public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias)
       throws MetadataException {
     List<String> retPaths = new ArrayList<>();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index f48b398..028b3fc 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -42,10 +42,12 @@ import 
org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByExecutor;
 import org.apache.iotdb.db.query.dataset.groupby.LocalGroupByExecutor;
@@ -410,6 +412,22 @@ public class LocalQueryExecutor {
     return ByteBuffer.wrap(outputStream.toByteArray());
   }
 
+  public ByteBuffer getDevices(ByteBuffer planBuffer)
+      throws CheckConsistencyException, IOException, MetadataException {
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
+    ShowDevicesPlan plan = (ShowDevicesPlan) 
PhysicalPlan.Factory.create(planBuffer);
+    List<ShowDevicesResult> allDevicesResult = 
getCMManager().getLocalDevices(plan);
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    try (DataOutputStream dataOutputStream = new 
DataOutputStream(outputStream)) {
+      dataOutputStream.writeInt(allDevicesResult.size());
+      for (ShowDevicesResult result : allDevicesResult) {
+        result.serialize(outputStream);
+      }
+    }
+    return ByteBuffer.wrap(outputStream.toByteArray());
+  }
+
   /**
    * Execute aggregations over the given path and return the results to the 
requester.
    *
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 630fb8e..9256f37 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -419,6 +419,14 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
+  public void getDevices(
+      Node header, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer> 
resultHandler)
+      throws TException {
+    DataAsyncService service = getDataAsyncService(header, resultHandler, "Get 
devices");
+    service.getDevices(header, planBytes, resultHandler);
+  }
+
+  @Override
   public void getNodeList(
       Node header, String path, int nodeLevel, 
AsyncMethodCallback<List<String>> resultHandler) {
     DataAsyncService service = getDataAsyncService(header, resultHandler, "Get 
node list");
@@ -757,6 +765,11 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
+  public ByteBuffer getDevices(Node header, ByteBuffer planBinary) throws 
TException {
+    return getDataSyncService(header).getDevices(header, planBinary);
+  }
+
+  @Override
   public List<ByteBuffer> getAggrResult(GetAggrResultRequest request) throws 
TException {
     return getDataSyncService(request.getHeader()).getAggrResult(request);
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetDevicesHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetDevicesHandler.java
new file mode 100644
index 0000000..7a4d717
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetDevicesHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.caller;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
+
+@SuppressWarnings("common-java:DuplicatedBlocks")
+public class GetDevicesHandler implements AsyncMethodCallback<ByteBuffer> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(GetDevicesHandler.class);
+
+  private Node contact;
+  private AtomicReference<ByteBuffer> result;
+
+  @Override
+  public void onComplete(ByteBuffer resp) {
+    logger.debug("Received devices schema from {}", contact);
+    synchronized (result) {
+      result.set(resp);
+      result.notifyAll();
+    }
+  }
+
+  @Override
+  public void onError(Exception exception) {
+    logger.warn("Cannot get devices schema from {}, because ", contact, 
exception);
+  }
+
+  public void setResponse(AtomicReference<ByteBuffer> response) {
+    this.result = response;
+  }
+
+  public void setContact(Node contact) {
+    this.contact = contact;
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 5bc07c7..41a2689 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -245,6 +245,16 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
+  public void getDevices(
+      Node header, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer> 
resultHandler) {
+    try {
+      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getDevices(planBinary));
+    } catch (CheckConsistencyException | IOException | MetadataException e) {
+      resultHandler.onError(e);
+    }
+  }
+
+  @Override
   public void getNodeList(
       Node header, String path, int nodeLevel, 
AsyncMethodCallback<List<String>> resultHandler) {
     try {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index f4f4bee..61ae338 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -242,6 +242,15 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
+  public ByteBuffer getDevices(Node header, ByteBuffer planBinary) throws 
TException {
+    try {
+      return dataGroupMember.getLocalQueryExecutor().getDevices(planBinary);
+    } catch (CheckConsistencyException | IOException | MetadataException e) {
+      throw new TException(e);
+    }
+  }
+
+  @Override
   public List<String> getNodeList(Node header, String path, int nodeLevel) 
throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index b329812..5a5ad27 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -313,6 +314,10 @@ public abstract class PhysicalPlan {
           plan = new ShowTimeSeriesPlan();
           plan.deserialize(buffer);
           break;
+        case SHOW_DEVICES:
+          plan = new ShowDevicesPlan();
+          plan.deserialize(buffer);
+          break;
         case LOAD_CONFIGURATION:
           plan = new LoadConfigurationPlan();
           plan.deserialize(buffer);
@@ -396,7 +401,8 @@ public abstract class PhysicalPlan {
     STORAGE_GROUP_MNODE,
     BATCH_INSERT_ONE_DEVICE,
     MULTI_BATCH_INSERT,
-    BATCH_INSERT_ROWS
+    BATCH_INSERT_ROWS,
+    SHOW_DEVICES
   }
 
   public long getIndex() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
index 89345ea..c641e32 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
@@ -18,10 +18,19 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 public class ShowDevicesPlan extends ShowPlan {
 
+  public ShowDevicesPlan() {
+    super(ShowContentType.DEVICES);
+  }
+
   private boolean hasSgCol;
 
   public ShowDevicesPlan(PartialPath path) {
@@ -33,6 +42,23 @@ public class ShowDevicesPlan extends ShowPlan {
     this.hasSgCol = hasSgCol;
   }
 
+  @Override
+  public void serialize(DataOutputStream outputStream) throws IOException {
+    outputStream.write(PhysicalPlanType.SHOW_DEVICES.ordinal());
+    putString(outputStream, path.getFullPath());
+    outputStream.writeInt(limit);
+    outputStream.writeInt(offset);
+    outputStream.writeLong(index);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+    path = new PartialPath(readString(buffer));
+    limit = buffer.getInt();
+    offset = buffer.getInt();
+    this.index = buffer.getLong();
+  }
+
   public boolean hasSgCol() {
     return hasSgCol;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
index e52f619..5d5e382 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
@@ -116,7 +116,7 @@ public class ShowTimeSeriesPlan extends ShowPlan {
     key = readString(buffer);
     value = readString(buffer);
     limit = buffer.getInt();
-    limit = buffer.getInt();
+    offset = buffer.getInt();
     orderByHeat = buffer.get() == 1;
     this.index = buffer.getLong();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowDevicesResult.java 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowDevicesResult.java
index 8c3ec9a..b11e434 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowDevicesResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowDevicesResult.java
@@ -18,7 +18,17 @@
  */
 package org.apache.iotdb.db.query.dataset;
 
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
 public class ShowDevicesResult extends ShowResult {
+  public ShowDevicesResult() {
+    super();
+  }
 
   public ShowDevicesResult(String name, String sgName) {
     super(name, sgName);
@@ -27,4 +37,38 @@ public class ShowDevicesResult extends ShowResult {
   public ShowDevicesResult(String name) {
     super(name);
   }
+
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(name, outputStream);
+    ReadWriteIOUtils.write(sgName, outputStream);
+  }
+
+  public static ShowDevicesResult deserialize(ByteBuffer buffer) {
+    ShowDevicesResult result = new ShowDevicesResult();
+    result.name = ReadWriteIOUtils.readString(buffer);
+    result.sgName = ReadWriteIOUtils.readString(buffer);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "ShowDevicesResult{" + " name='" + name + '\'' + ", sgName='" + 
sgName + '\'' + "}";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ShowDevicesResult result = (ShowDevicesResult) o;
+    return Objects.equals(name, result.name) && Objects.equals(sgName, 
result.sgName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, sgName);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java 
b/server/src/test/java/org/apache/iotdb/db/query/dataset/ShowDevicesResultTest.java
similarity index 52%
copy from 
server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
copy to 
server/src/test/java/org/apache/iotdb/db/query/dataset/ShowDevicesResultTest.java
index 89345ea..aa5845b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/query/dataset/ShowDevicesResultTest.java
@@ -16,24 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.qp.physical.sys;
+package org.apache.iotdb.db.query.dataset;
 
-import org.apache.iotdb.db.metadata.PartialPath;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class ShowDevicesPlan extends ShowPlan {
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
-  private boolean hasSgCol;
+public class ShowDevicesResultTest {
 
-  public ShowDevicesPlan(PartialPath path) {
-    super(ShowContentType.DEVICES, path);
-  }
+  @Test
+  public void serializeTest() throws IOException {
+    ShowDevicesResult showDevicesResult = new ShowDevicesResult("root.sg1.d1", 
"root.sg1");
 
-  public ShowDevicesPlan(PartialPath path, int limit, int offset, int 
fetchSize, boolean hasSgCol) {
-    super(ShowContentType.DEVICES, path, limit, offset, fetchSize);
-    this.hasSgCol = hasSgCol;
-  }
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    showDevicesResult.serialize(outputStream);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
+    ShowDevicesResult result = ShowDevicesResult.deserialize(byteBuffer);
 
-  public boolean hasSgCol() {
-    return hasSgCol;
+    Assert.assertEquals("root.sg1.d1", result.getName());
+    Assert.assertEquals("root.sg1", result.getSgName());
   }
 }
diff --git a/thrift/src/main/thrift/cluster.thrift 
b/thrift/src/main/thrift/cluster.thrift
index 1008e85..44d2b48 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -362,6 +362,11 @@ service TSDataService extends RaftService {
    **/
   set<string> getAllDevices(1:Node header, 2:list<string> path)
 
+  /**
+   * Get the devices from the header according to the showDevicesPlan
+   **/
+  binary getDevices(1:Node header, 2: binary planBinary)
+
   list<string> getNodeList(1:Node header, 2:string path, 3:int nodeLevel)
 
   set<string> getChildNodePathInNextLevel(1: Node header, 2: string path)

Reply via email to