This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2245ea5 [IOTDB-1144]support the show devices plan in distributed
version (#2619)
2245ea5 is described below
commit 2245ea5b98ecfb185bfc656cb93d350eef9bb5b3
Author: HouliangQi <[email protected]>
AuthorDate: Thu Feb 25 15:55:58 2021 +0800
[IOTDB-1144]support the show devices plan in distributed version (#2619)
---
.../cluster/client/sync/SyncClientAdaptor.java | 21 +++
.../apache/iotdb/cluster/metadata/CMManager.java | 159 ++++++++++++++++++++-
.../iotdb/cluster/query/LocalQueryExecutor.java | 18 +++
.../iotdb/cluster/server/DataClusterServer.java | 13 ++
.../server/handlers/caller/GetDevicesHandler.java | 60 ++++++++
.../cluster/server/service/DataAsyncService.java | 10 ++
.../cluster/server/service/DataSyncService.java | 9 ++
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 8 +-
.../iotdb/db/qp/physical/sys/ShowDevicesPlan.java | 26 ++++
.../db/qp/physical/sys/ShowTimeSeriesPlan.java | 2 +-
.../iotdb/db/query/dataset/ShowDevicesResult.java | 44 ++++++
.../db/query/dataset/ShowDevicesResultTest.java} | 29 ++--
thrift/src/main/thrift/cluster.thrift | 5 +
13 files changed, 387 insertions(+), 17 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 809a140..d518c62 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import
org.apache.iotdb.cluster.server.handlers.caller.GetChildNodeNextLevelPathHandler;
+import org.apache.iotdb.cluster.server.handlers.caller.GetDevicesHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GetNodesListHandler;
import
org.apache.iotdb.cluster.server.handlers.caller.GetTimeseriesSchemaHandler;
import org.apache.iotdb.cluster.server.handlers.caller.JoinClusterHandler;
@@ -51,6 +52,7 @@ import
org.apache.iotdb.cluster.server.handlers.forwarder.ForwardPlanHandler;
import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.SerializeUtils;
@@ -359,6 +361,25 @@ public class SyncClientAdaptor {
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
+ public static ByteBuffer getDevices(AsyncDataClient client, Node header,
ShowDevicesPlan plan)
+ throws InterruptedException, TException, IOException {
+ GetDevicesHandler handler = new GetDevicesHandler();
+ AtomicReference<ByteBuffer> response = new AtomicReference<>(null);
+ handler.setResponse(response);
+ handler.setContact(client.getNode());
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ plan.serialize(dataOutputStream);
+
+ client.getDevices(header,
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()), handler);
+ synchronized (handler) {
+ if (response.get() == null) {
+ response.wait(RaftServer.getReadOperationTimeoutMS());
+ }
+ }
+ return response.get();
+ }
+
public static Long getGroupByExecutor(AsyncDataClient client, GroupByRequest
request)
throws TException, InterruptedException {
AtomicReference<Long> result = new AtomicReference<>();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index c5094c8..ee26656 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -57,8 +57,10 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.SchemaUtils;
@@ -1436,6 +1438,53 @@ public class CMManager extends MManager {
return super.showTimeseries(plan, context);
}
+ public List<ShowDevicesResult> getLocalDevices(ShowDevicesPlan plan) throws
MetadataException {
+ return super.getDevices(plan);
+ }
+
+ @Override
+ public List<ShowDevicesResult> getDevices(ShowDevicesPlan plan) throws
MetadataException {
+ ConcurrentSkipListSet<ShowDevicesResult> resultSet = new
ConcurrentSkipListSet<>();
+ ExecutorService pool =
+ new ThreadPoolExecutor(
+ THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new
LinkedBlockingDeque<>());
+ List<PartitionGroup> globalGroups =
metaGroupMember.getPartitionTable().getGlobalGroups();
+
+ int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
+ int offset = plan.getOffset();
+ // do not use limit and offset in sub-queries unless offset is 0,
otherwise the results are
+ // not combinable
+ if (offset != 0) {
+ plan.setLimit(0);
+ plan.setOffset(0);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Fetch devices schemas of {} from {} groups", plan.getPath(),
globalGroups.size());
+ }
+
+ List<Future<Void>> futureList = new ArrayList<>();
+ for (PartitionGroup group : globalGroups) {
+ futureList.add(
+ pool.submit(
+ () -> {
+ try {
+ getDevices(group, plan, resultSet);
+ } catch (CheckConsistencyException e) {
+ logger.error("Cannot get show devices result of {} from {}",
plan, group);
+ }
+ return null;
+ }));
+ }
+
+ waitForThreadPool(futureList, pool, "getDevices()");
+ List<ShowDevicesResult> showDevicesResults =
+ applyShowDevicesLimitOffset(resultSet, limit, offset);
+ logger.debug("show devices {} has {} results", plan.getPath(),
showDevicesResults.size());
+ return showDevicesResults;
+ }
+
@Override
public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan,
QueryContext context)
throws MetadataException {
@@ -1497,6 +1546,22 @@ public class CMManager extends MManager {
return showTimeSeriesResults;
}
+ private List<ShowDevicesResult> applyShowDevicesLimitOffset(
+ Set<ShowDevicesResult> resultSet, int limit, int offset) {
+ List<ShowDevicesResult> showDevicesResults = new ArrayList<>();
+ Iterator<ShowDevicesResult> iterator = resultSet.iterator();
+ while (iterator.hasNext() && limit > 0) {
+ if (offset > 0) {
+ offset--;
+ iterator.next();
+ } else {
+ limit--;
+ showDevicesResults.add(iterator.next());
+ }
+ }
+ return showDevicesResults;
+ }
+
private void showTimeseries(
PartitionGroup group,
ShowTimeSeriesPlan plan,
@@ -1510,6 +1575,32 @@ public class CMManager extends MManager {
}
}
+ private void getDevices(
+ PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult>
resultSet)
+ throws CheckConsistencyException, MetadataException {
+ if (group.contains(metaGroupMember.getThisNode())) {
+ getLocalDevices(group, plan, resultSet);
+ } else {
+ getRemoteDevices(group, plan, resultSet);
+ }
+ }
+
+ private void getLocalDevices(
+ PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult>
resultSet)
+ throws CheckConsistencyException, MetadataException {
+ Node header = group.getHeader();
+ DataGroupMember localDataMember =
metaGroupMember.getLocalDataMember(header);
+ localDataMember.syncLeaderWithConsistencyCheck(false);
+ try {
+ List<ShowDevicesResult> localResult = super.getDevices(plan);
+ resultSet.addAll(localResult);
+ logger.debug("Fetched {} devices of {} from {}", localResult.size(),
plan.getPath(), group);
+ } catch (MetadataException e) {
+ logger.error("Cannot execute show devices plan {} from {} locally.",
plan, group);
+ throw e;
+ }
+ }
+
private void showLocalTimeseries(
PartitionGroup group,
ShowTimeSeriesPlan plan,
@@ -1522,7 +1613,11 @@ public class CMManager extends MManager {
try {
List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan,
context);
resultSet.addAll(localResult);
- logger.debug("Fetched {} schemas of {} from {}", localResult.size(),
plan.getPath(), group);
+ logger.debug(
+ "Fetched local timeseries {} schemas of {} from {}",
+ localResult.size(),
+ plan.getPath(),
+ group);
} catch (MetadataException e) {
logger.error("Cannot execute show timeseries plan {} from {} locally.",
plan, group);
throw e;
@@ -1550,7 +1645,8 @@ public class CMManager extends MManager {
if (resultBinary != null) {
int size = resultBinary.getInt();
- logger.debug("Fetched {} schemas of {} from {}", size, plan.getPath(),
group);
+ logger.debug(
+ "Fetched remote timeseries {} schemas of {} from {}", size,
plan.getPath(), group);
for (int i = 0; i < size; i++) {
resultSet.add(ShowTimeSeriesResult.deserialize(resultBinary));
}
@@ -1559,6 +1655,36 @@ public class CMManager extends MManager {
}
}
+ private void getRemoteDevices(
+ PartitionGroup group, ShowDevicesPlan plan, Set<ShowDevicesResult>
resultSet) {
+ ByteBuffer resultBinary = null;
+ for (Node node : group) {
+ try {
+ resultBinary = getRemoteDevices(node, group, plan);
+ if (resultBinary != null) {
+ break;
+ }
+ } catch (IOException e) {
+ logger.error(LOG_FAIL_CONNECT, node, e);
+ } catch (TException e) {
+ logger.error("Error occurs when getting devices schemas in node {}.",
node, e);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted when getting devices schemas in node {}.",
node, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (resultBinary != null) {
+ int size = resultBinary.getInt();
+ logger.debug("Fetched remote devices {} schemas of {} from {}", size,
plan.getPath(), group);
+ for (int i = 0; i < size; i++) {
+ resultSet.add(ShowDevicesResult.deserialize(resultBinary));
+ }
+ } else {
+ logger.error("Failed to execute show devices {} in group: {}.", plan,
group);
+ }
+ }
+
private ByteBuffer showRemoteTimeseries(Node node, PartitionGroup group,
ShowTimeSeriesPlan plan)
throws IOException, TException, InterruptedException {
ByteBuffer resultBinary;
@@ -1588,6 +1714,35 @@ public class CMManager extends MManager {
return resultBinary;
}
+ private ByteBuffer getRemoteDevices(Node node, PartitionGroup group,
ShowDevicesPlan plan)
+ throws IOException, TException, InterruptedException {
+ ByteBuffer resultBinary;
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ AsyncDataClient client =
+ metaGroupMember
+ .getClientProvider()
+ .getAsyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
+ resultBinary = SyncClientAdaptor.getDevices(client, group.getHeader(),
plan);
+ } else {
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ metaGroupMember
+ .getClientProvider()
+ .getSyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ plan.serialize(dataOutputStream);
+ resultBinary =
+ syncDataClient.getDevices(
+ group.getHeader(),
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+ } finally {
+ ClientUtils.putBackSyncClient(syncDataClient);
+ }
+ }
+ return resultBinary;
+ }
+
public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias)
throws MetadataException {
List<String> retPaths = new ArrayList<>();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index f48b398..028b3fc 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -42,10 +42,12 @@ import
org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.query.dataset.groupby.GroupByExecutor;
import org.apache.iotdb.db.query.dataset.groupby.LocalGroupByExecutor;
@@ -410,6 +412,22 @@ public class LocalQueryExecutor {
return ByteBuffer.wrap(outputStream.toByteArray());
}
+ public ByteBuffer getDevices(ByteBuffer planBuffer)
+ throws CheckConsistencyException, IOException, MetadataException {
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
+ ShowDevicesPlan plan = (ShowDevicesPlan)
PhysicalPlan.Factory.create(planBuffer);
+ List<ShowDevicesResult> allDevicesResult =
getCMManager().getLocalDevices(plan);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (DataOutputStream dataOutputStream = new
DataOutputStream(outputStream)) {
+ dataOutputStream.writeInt(allDevicesResult.size());
+ for (ShowDevicesResult result : allDevicesResult) {
+ result.serialize(outputStream);
+ }
+ }
+ return ByteBuffer.wrap(outputStream.toByteArray());
+ }
+
/**
* Execute aggregations over the given path and return the results to the
requester.
*
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 630fb8e..9256f37 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -419,6 +419,14 @@ public class DataClusterServer extends RaftServer
}
@Override
+ public void getDevices(
+ Node header, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer>
resultHandler)
+ throws TException {
+ DataAsyncService service = getDataAsyncService(header, resultHandler, "Get
devices");
+ service.getDevices(header, planBytes, resultHandler);
+ }
+
+ @Override
public void getNodeList(
Node header, String path, int nodeLevel,
AsyncMethodCallback<List<String>> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Get
node list");
@@ -757,6 +765,11 @@ public class DataClusterServer extends RaftServer
}
@Override
+ public ByteBuffer getDevices(Node header, ByteBuffer planBinary) throws
TException {
+ return getDataSyncService(header).getDevices(header, planBinary);
+ }
+
+ @Override
public List<ByteBuffer> getAggrResult(GetAggrResultRequest request) throws
TException {
return getDataSyncService(request.getHeader()).getAggrResult(request);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetDevicesHandler.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetDevicesHandler.java
new file mode 100644
index 0000000..7a4d717
--- /dev/null
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/GetDevicesHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.caller;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
+
+@SuppressWarnings("common-java:DuplicatedBlocks")
+public class GetDevicesHandler implements AsyncMethodCallback<ByteBuffer> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(GetDevicesHandler.class);
+
+ private Node contact;
+ private AtomicReference<ByteBuffer> result;
+
+ @Override
+ public void onComplete(ByteBuffer resp) {
+ logger.debug("Received devices schema from {}", contact);
+ synchronized (result) {
+ result.set(resp);
+ result.notifyAll();
+ }
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ logger.warn("Cannot get devices schema from {}, because ", contact,
exception);
+ }
+
+ public void setResponse(AtomicReference<ByteBuffer> response) {
+ this.result = response;
+ }
+
+ public void setContact(Node contact) {
+ this.contact = contact;
+ }
+}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 5bc07c7..41a2689 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -245,6 +245,16 @@ public class DataAsyncService extends BaseAsyncService
implements TSDataService.
}
@Override
+ public void getDevices(
+ Node header, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer>
resultHandler) {
+ try {
+
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getDevices(planBinary));
+ } catch (CheckConsistencyException | IOException | MetadataException e) {
+ resultHandler.onError(e);
+ }
+ }
+
+ @Override
public void getNodeList(
Node header, String path, int nodeLevel,
AsyncMethodCallback<List<String>> resultHandler) {
try {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index f4f4bee..61ae338 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -242,6 +242,15 @@ public class DataSyncService extends BaseSyncService
implements TSDataService.If
}
@Override
+ public ByteBuffer getDevices(Node header, ByteBuffer planBinary) throws
TException {
+ try {
+ return dataGroupMember.getLocalQueryExecutor().getDevices(planBinary);
+ } catch (CheckConsistencyException | IOException | MetadataException e) {
+ throw new TException(e);
+ }
+ }
+
+ @Override
public List<String> getNodeList(Node header, String path, int nodeLevel)
throws TException {
try {
dataGroupMember.syncLeaderWithConsistencyCheck(false);
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index b329812..5a5ad27 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -313,6 +314,10 @@ public abstract class PhysicalPlan {
plan = new ShowTimeSeriesPlan();
plan.deserialize(buffer);
break;
+ case SHOW_DEVICES:
+ plan = new ShowDevicesPlan();
+ plan.deserialize(buffer);
+ break;
case LOAD_CONFIGURATION:
plan = new LoadConfigurationPlan();
plan.deserialize(buffer);
@@ -396,7 +401,8 @@ public abstract class PhysicalPlan {
STORAGE_GROUP_MNODE,
BATCH_INSERT_ONE_DEVICE,
MULTI_BATCH_INSERT,
- BATCH_INSERT_ROWS
+ BATCH_INSERT_ROWS,
+ SHOW_DEVICES
}
public long getIndex() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
index 89345ea..c641e32 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
@@ -18,10 +18,19 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
public class ShowDevicesPlan extends ShowPlan {
+ public ShowDevicesPlan() {
+ super(ShowContentType.DEVICES);
+ }
+
private boolean hasSgCol;
public ShowDevicesPlan(PartialPath path) {
@@ -33,6 +42,23 @@ public class ShowDevicesPlan extends ShowPlan {
this.hasSgCol = hasSgCol;
}
+ @Override
+ public void serialize(DataOutputStream outputStream) throws IOException {
+ outputStream.write(PhysicalPlanType.SHOW_DEVICES.ordinal());
+ putString(outputStream, path.getFullPath());
+ outputStream.writeInt(limit);
+ outputStream.writeInt(offset);
+ outputStream.writeLong(index);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+ path = new PartialPath(readString(buffer));
+ limit = buffer.getInt();
+ offset = buffer.getInt();
+ this.index = buffer.getLong();
+ }
+
public boolean hasSgCol() {
return hasSgCol;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
index e52f619..5d5e382 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java
@@ -116,7 +116,7 @@ public class ShowTimeSeriesPlan extends ShowPlan {
key = readString(buffer);
value = readString(buffer);
limit = buffer.getInt();
- limit = buffer.getInt();
+ offset = buffer.getInt();
orderByHeat = buffer.get() == 1;
this.index = buffer.getLong();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowDevicesResult.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowDevicesResult.java
index 8c3ec9a..b11e434 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowDevicesResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowDevicesResult.java
@@ -18,7 +18,17 @@
*/
package org.apache.iotdb.db.query.dataset;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
public class ShowDevicesResult extends ShowResult {
+ public ShowDevicesResult() {
+ super();
+ }
public ShowDevicesResult(String name, String sgName) {
super(name, sgName);
@@ -27,4 +37,38 @@ public class ShowDevicesResult extends ShowResult {
public ShowDevicesResult(String name) {
super(name);
}
+
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(name, outputStream);
+ ReadWriteIOUtils.write(sgName, outputStream);
+ }
+
+ public static ShowDevicesResult deserialize(ByteBuffer buffer) {
+ ShowDevicesResult result = new ShowDevicesResult();
+ result.name = ReadWriteIOUtils.readString(buffer);
+ result.sgName = ReadWriteIOUtils.readString(buffer);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ShowDevicesResult{" + " name='" + name + '\'' + ", sgName='" +
sgName + '\'' + "}";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ShowDevicesResult result = (ShowDevicesResult) o;
+ return Objects.equals(name, result.name) && Objects.equals(sgName,
result.sgName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, sgName);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
b/server/src/test/java/org/apache/iotdb/db/query/dataset/ShowDevicesResultTest.java
similarity index 52%
copy from
server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
copy to
server/src/test/java/org/apache/iotdb/db/query/dataset/ShowDevicesResultTest.java
index 89345ea..aa5845b 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowDevicesPlan.java
+++
b/server/src/test/java/org/apache/iotdb/db/query/dataset/ShowDevicesResultTest.java
@@ -16,24 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.qp.physical.sys;
+package org.apache.iotdb.db.query.dataset;
-import org.apache.iotdb.db.metadata.PartialPath;
+import org.junit.Assert;
+import org.junit.Test;
-public class ShowDevicesPlan extends ShowPlan {
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
- private boolean hasSgCol;
+public class ShowDevicesResultTest {
- public ShowDevicesPlan(PartialPath path) {
- super(ShowContentType.DEVICES, path);
- }
+ @Test
+ public void serializeTest() throws IOException {
+ ShowDevicesResult showDevicesResult = new ShowDevicesResult("root.sg1.d1",
"root.sg1");
- public ShowDevicesPlan(PartialPath path, int limit, int offset, int
fetchSize, boolean hasSgCol) {
- super(ShowContentType.DEVICES, path, limit, offset, fetchSize);
- this.hasSgCol = hasSgCol;
- }
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ showDevicesResult.serialize(outputStream);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
+ ShowDevicesResult result = ShowDevicesResult.deserialize(byteBuffer);
- public boolean hasSgCol() {
- return hasSgCol;
+ Assert.assertEquals("root.sg1.d1", result.getName());
+ Assert.assertEquals("root.sg1", result.getSgName());
}
}
diff --git a/thrift/src/main/thrift/cluster.thrift
b/thrift/src/main/thrift/cluster.thrift
index 1008e85..44d2b48 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -362,6 +362,11 @@ service TSDataService extends RaftService {
**/
set<string> getAllDevices(1:Node header, 2:list<string> path)
+ /**
+ * Get the devices from the header according to the showDevicesPlan
+ **/
+ binary getDevices(1:Node header, 2: binary planBinary)
+
list<string> getNodeList(1:Node header, 2:string path, 3:int nodeLevel)
set<string> getChildNodePathInNextLevel(1: Node header, 2: string path)