This is an automated email from the ASF dual-hosted git repository. chaow pushed a commit to branch support_redirect_query in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 68048a4ca1c979325b98a664ab3053d02dde3f57 Author: chaow <[email protected]> AuthorDate: Thu Mar 18 11:54:49 2021 +0800 IOTDB-1241 support redirect query for cluster --- .../resources/conf/iotdb-cluster.properties | 2 + .../apache/iotdb/cluster/config/ClusterConfig.java | 10 +++ .../iotdb/cluster/config/ClusterDescriptor.java | 5 ++ .../apache/iotdb/cluster/metadata/CMManager.java | 7 +- .../cluster/query/ClusterDataQueryExecutor.java | 65 ++++++++++++++- .../cluster/query/reader/ClusterReaderFactory.java | 36 +++++++- .../cluster/query/reader/ClusterTimeGenerator.java | 37 +++++++-- .../cluster/query/reader/ManagedMergeReader.java | 13 +++ .../cluster/query/reader/MergedReaderByTime.java | 10 +++ .../query/ClusterDataQueryExecutorTest.java | 85 +++++++++++++++++++ .../main/java/org/apache/iotdb/SessionExample.java | 97 ++++++++++++++++++++++ .../iotdb/db/qp/physical/crud/QueryPlan.java | 10 +++ .../db/query/dataset/AlignByDeviceDataSet.java | 11 +++ .../dataset/RawQueryDataSetWithoutValueFilter.java | 11 +++ .../db/query/executor/RawDataQueryExecutor.java | 22 +++++ .../org/apache/iotdb/db/service/TSServiceImpl.java | 52 ++++++++++-- .../org/apache/iotdb/rpc/RedirectException.java | 3 +- .../java/org/apache/iotdb/session/Session.java | 81 +++++++++++++++++- .../apache/iotdb/session/SessionConnection.java | 32 ++++++- thrift/src/main/thrift/rpc.thrift | 3 + .../tsfile/read/query/dataset/QueryDataSet.java | 40 +++++++++ 21 files changed, 603 insertions(+), 29 deletions(-) diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties index 1627d43..87283a1 100644 --- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties +++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties @@ -166,3 +166,5 @@ max_client_pernode_permember_number=1000 # we need to wait so much time for other connections to be released until timeout, # or a new connection will be created. wait_client_timeout_ms=5000 + +enable_query_redirect=false \ No newline at end of file diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index 08f7608..e8a3953 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java @@ -161,6 +161,8 @@ public class ClusterConfig { private boolean openServerRpcPort = false; + private boolean enableQueryRedirect = false; + public int getSelectorNumOfClientPool() { return selectorNumOfClientPool; } @@ -456,4 +458,12 @@ public class ClusterConfig { public void setWaitClientTimeoutMS(long waitClientTimeoutMS) { this.waitClientTimeoutMS = waitClientTimeoutMS; } + + public boolean isEnableQueryRedirect() { + return enableQueryRedirect; + } + + public void setEnableQueryRedirect(boolean enableQueryRedirect) { + this.enableQueryRedirect = enableQueryRedirect; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java index d6bbf82..799a7a4 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java @@ -293,6 +293,11 @@ public class ClusterDescriptor { properties.getProperty( "wait_client_timeout_ms", String.valueOf(config.getWaitClientTimeoutMS())))); + config.setEnableQueryRedirect( + Boolean.parseBoolean( + properties.getProperty( + "enable_query_redirect", String.valueOf(config.isEnableQueryRedirect())))); + String consistencyLevel = properties.getProperty("consistency_level"); if (consistencyLevel != null) { config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel)); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index 6320432..cd572b2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -1241,11 +1241,12 @@ public class CMManager extends MManager { try { Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery); logger.debug( - "{}: get matched paths of {} from {}, result {}", + "{}: get matched paths of {} from {}, result {} for {}", metaGroupMember.getName(), partitionGroup, node, - paths); + paths, + pathsToQuery); if (paths != null) { // query next group Set<PartialPath> partialPaths = new HashSet<>(); @@ -1397,7 +1398,7 @@ public class CMManager extends MManager { public Set<String> getAllDevices(List<String> paths) throws MetadataException { Set<String> results = new HashSet<>(); for (String path : paths) { - getAllTimeseriesPath(new PartialPath(path)).stream() + getDevices(new PartialPath(path)).stream() .map(PartialPath::getFullPath) .forEach(results::add); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java index fbf8963..d2e8737 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java @@ -23,12 +23,15 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException; import org.apache.iotdb.cluster.exception.EmptyIntervalException; import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory; import org.apache.iotdb.cluster.query.reader.ClusterTimeGenerator; +import org.apache.iotdb.cluster.query.reader.ManagedMergeReader; +import org.apache.iotdb.cluster.query.reader.MergedReaderByTime; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter; import org.apache.iotdb.db.query.executor.RawDataQueryExecutor; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; @@ -36,6 +39,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; import org.slf4j.Logger; @@ -51,6 +55,8 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor { private static final Logger logger = LoggerFactory.getLogger(ClusterDataQueryExecutor.class); private MetaGroupMember metaGroupMember; private ClusterReaderFactory readerFactory; + private QueryDataSet.EndPoint endPoint = null; + private boolean hasLocalReader = false; ClusterDataQueryExecutor(RawDataQueryPlan plan, MetaGroupMember metaGroupMember) { super(plan); @@ -74,6 +80,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor { } List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>(); + hasLocalReader = false; for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) { PartialPath path = queryPlan.getDeduplicatedPaths().get(i); TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i); @@ -95,9 +102,22 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor { } readersOfSelectedSeries.add(reader); + if (((ManagedMergeReader) reader).getEndPoint() != null) { + endPoint = ((ManagedMergeReader) reader).getEndPoint(); + } else { + hasLocalReader = true; + } } if (logger.isDebugEnabled()) { - logger.debug("Initialized {} readers for {}", readersOfSelectedSeries.size(), queryPlan); + logger.debug( + "Initialized {} readers for {} has localReader {}", + readersOfSelectedSeries.size(), + queryPlan, + hasLocalReader); + } + if (hasLocalReader) { + // no need to redirect query + endPoint = null; } return readersOfSelectedSeries; } @@ -106,8 +126,15 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor { protected IReaderByTimestamp getReaderByTimestamp( PartialPath path, Set<String> deviceMeasurements, TSDataType dataType, QueryContext context) throws StorageEngineException, QueryProcessException { - return readerFactory.getReaderByTimestamp( - path, deviceMeasurements, dataType, context, queryPlan.isAscending()); + IReaderByTimestamp iReaderByTimestamp = + readerFactory.getReaderByTimestamp( + path, deviceMeasurements, dataType, context, queryPlan.isAscending()); + if (((MergedReaderByTime) iReaderByTimestamp).getEndPoint() == null) { + hasLocalReader = true; + } else if (!hasLocalReader && endPoint == null) { + endPoint = ((MergedReaderByTime) iReaderByTimestamp).getEndPoint(); + } + return iReaderByTimestamp; } @Override @@ -116,4 +143,36 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor { throws StorageEngineException { return new ClusterTimeGenerator(queryExpression, context, metaGroupMember, rawDataQueryPlan); } + + public QueryDataSet.EndPoint getEndPoint() { + return endPoint; + } + + public void setEndPoint(QueryDataSet.EndPoint endPoint) { + this.endPoint = endPoint; + } + + @Override + protected QueryDataSet needRedirect(long queryId, TimeGenerator timeGenerator) { + ClusterTimeGenerator clusterTimeGenerator = (ClusterTimeGenerator) timeGenerator; + logger.debug( + "redirect queryId {}, {}, {}, {}, {}", + queryId, + hasLocalReader, + queryPlan.getOperatorType(), + endPoint, + clusterTimeGenerator); + if (hasLocalReader + || !queryPlan.isEnableRedirect() + || (clusterTimeGenerator != null && clusterTimeGenerator.isHasLocalReader())) { + endPoint = null; + } + if (!hasLocalReader && endPoint != null && queryPlan.isEnableRedirect()) { + // dummy dataSet + QueryDataSet dataSet = new RawQueryDataSetWithoutValueFilter(queryId); + dataSet.setEndPoint(endPoint); + return dataSet; + } + return null; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index 91819b1..a68881a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -61,6 +61,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.reader.IBatchReader; import org.apache.iotdb.tsfile.read.reader.IPointReader; @@ -119,14 +120,29 @@ public class ClusterReaderFactory { path, partitionGroups.size()); List<IReaderByTimestamp> readers = new ArrayList<>(partitionGroups.size()); + QueryDataSet.EndPoint endPoint = new QueryDataSet.EndPoint(); + boolean hasLocalReader = false; for (PartitionGroup partitionGroup : partitionGroups) { // query each group to get a reader in that group - readers.add( + IReaderByTimestamp iReaderByTimestamp = getSeriesReaderByTime( - partitionGroup, path, deviceMeasurements, context, dataType, ascending)); + partitionGroup, path, deviceMeasurements, context, dataType, ascending); + readers.add(iReaderByTimestamp); + if (iReaderByTimestamp instanceof SeriesReaderByTimestamp) { + hasLocalReader = true; + } else if (iReaderByTimestamp instanceof RemoteSeriesReaderByTimestamp) { + endPoint.setIp(partitionGroup.getHeader().getClientIp()); + endPoint.setPort(partitionGroup.getHeader().getClientPort()); + } + } + if (hasLocalReader) { + // no need redirect query to the endpoint + endPoint = null; } // merge the readers - return new MergedReaderByTime(readers); + MergedReaderByTime mergedReaderByTime = new MergedReaderByTime(readers); + mergedReaderByTime.setEndPoint(endPoint); + return mergedReaderByTime; } /** @@ -227,6 +243,8 @@ public class ClusterReaderFactory { path, partitionGroups.size()); ManagedMergeReader mergeReader = new ManagedMergeReader(dataType); + QueryDataSet.EndPoint endPoint = new QueryDataSet.EndPoint(); + boolean hasLocalReader = false; try { // build a reader for each group and merge them for (PartitionGroup partitionGroup : partitionGroups) { @@ -240,11 +258,23 @@ public class ClusterReaderFactory { context, dataType, ascending); + if (seriesReader instanceof SeriesRawDataPointReader + && seriesReader.hasNextTimeValuePair()) { + hasLocalReader = true; + } else if (seriesReader instanceof RemoteSimpleSeriesReader) { + endPoint.setIp(partitionGroup.getHeader().getClientIp()); + endPoint.setPort(partitionGroup.getHeader().getClientPort()); + } mergeReader.addReader(seriesReader, 0); } } catch (IOException | QueryProcessException e) { throw new StorageEngineException(e); } + if (hasLocalReader) { + // no need redirect query to the endpoint + endPoint = null; + } + mergeReader.setEndPoint(endPoint); return mergeReader; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java index b7b5f51..8f88a23 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -40,6 +41,7 @@ import java.util.Collections; public class ClusterTimeGenerator extends ServerTimeGenerator { private ClusterReaderFactory readerFactory; + private boolean hasLocalReader = false; /** Constructor of EngineTimeGenerator. */ public ClusterTimeGenerator( @@ -65,22 +67,41 @@ public class ClusterTimeGenerator extends ServerTimeGenerator { Filter filter = expression.getFilter(); PartialPath path = (PartialPath) expression.getSeriesPath(); TSDataType dataType; + ManagedSeriesReader mergeReader = null; try { dataType = ((CMManager) IoTDB.metaManager) .getSeriesTypesByPaths(Collections.singletonList(path), null) .left .get(0); - return readerFactory.getSeriesReader( - path, - queryPlan.getAllMeasurementsInDevice(path.getDevice()), - dataType, - null, - filter, - context, - queryPlan.isAscending()); + mergeReader = + readerFactory.getSeriesReader( + path, + queryPlan.getAllMeasurementsInDevice(path.getDevice()), + dataType, + null, + filter, + context, + queryPlan.isAscending()); } catch (Exception e) { throw new IOException(e); } + if (((ManagedMergeReader) mergeReader).getEndPoint() == null) { + hasLocalReader = true; + } + return mergeReader; + } + + public boolean isHasLocalReader() { + return hasLocalReader; + } + + public void setHasLocalReader(boolean hasLocalReader) { + this.hasLocalReader = hasLocalReader; + } + + @Override + public String toString() { + return super.toString() + ", has local reader:" + hasLocalReader; } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java index e54dede..acaffe4 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import java.io.IOException; import java.util.NoSuchElementException; @@ -38,6 +39,10 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe private BatchData batchData; private TSDataType dataType; + /* + * whether need to redirect node to the node + */ + private QueryDataSet.EndPoint endPoint = null; public ManagedMergeReader(TSDataType dataType) { this.dataType = dataType; @@ -91,4 +96,12 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe batchData = null; return ret; } + + public QueryDataSet.EndPoint getEndPoint() { + return endPoint; + } + + public void setEndPoint(QueryDataSet.EndPoint endPoint) { + this.endPoint = endPoint; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java index 2db6a09..ae215f8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java @@ -20,6 +20,7 @@ package org.apache.iotdb.cluster.query.reader; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import java.io.IOException; import java.util.List; @@ -27,6 +28,7 @@ import java.util.List; public class MergedReaderByTime implements IReaderByTimestamp { private List<IReaderByTimestamp> innerReaders; + private QueryDataSet.EndPoint endPoint = null; public MergedReaderByTime(List<IReaderByTimestamp> innerReaders) { this.innerReaders = innerReaders; @@ -44,4 +46,12 @@ public class MergedReaderByTime implements IReaderByTimestamp { } return null; } + + public QueryDataSet.EndPoint getEndPoint() { + return endPoint; + } + + public void setEndPoint(QueryDataSet.EndPoint endPoint) { + this.endPoint = endPoint; + } } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java index aa48624..bee1d3a 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.cluster.query; import org.apache.iotdb.cluster.common.TestUtils; +import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -27,18 +28,40 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; +import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; +import org.apache.iotdb.tsfile.read.filter.operator.AndFilter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.IOException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + public class ClusterDataQueryExecutorTest extends BaseQueryTest { private ClusterDataQueryExecutor queryExecutor; + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(true); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(false); + } + @Test public void testNoFilter() throws IOException, StorageEngineException, QueryProcessException { RawDataQueryPlan plan = new RawDataQueryPlan(); @@ -75,4 +98,66 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest { QueryResourceManager.getInstance().endQuery(context.getQueryId()); } } + + @Test + public void testNoFilterWithRedirect() + throws IOException, StorageEngineException, QueryProcessException { + RawDataQueryPlan plan = new RawDataQueryPlan(); + plan.setDeduplicatedPaths(pathList); + plan.setDeduplicatedDataTypes(dataTypes); + plan.setEnableRedirect(true); + queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember); + RemoteQueryContext context = + new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1)); + try { + QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context); + System.out.println(dataSet.getEndPoint()); + assertNull(dataSet.getEndPoint()); + } finally { + QueryResourceManager.getInstance().endQuery(context.getQueryId()); + } + } + + @Test + public void testFilterWithValueFilterRedirect() + throws IOException, StorageEngineException, QueryProcessException, IllegalPathException { + IExpression expression = + new SingleSeriesExpression( + new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0)); + RawDataQueryPlan plan = new RawDataQueryPlan(); + plan.setDeduplicatedPaths(pathList); + plan.setDeduplicatedDataTypes(dataTypes); + plan.setExpression(expression); + plan.setEnableRedirect(true); + queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember); + RemoteQueryContext context = + new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1)); + try { + QueryDataSet dataSet = queryExecutor.executeWithValueFilter(context); + assertNull(dataSet.getEndPoint()); + } finally { + QueryResourceManager.getInstance().endQuery(context.getQueryId()); + } + } + + @Test + public void testFilterWithTimeFilterRedirect() + throws IOException, StorageEngineException, QueryProcessException, IllegalPathException { + IExpression expression = + new GlobalTimeExpression(new AndFilter(TimeFilter.gtEq(5), TimeFilter.ltEq(10))); + RawDataQueryPlan plan = new RawDataQueryPlan(); + plan.setDeduplicatedPaths(pathList.subList(0, 1)); + plan.setDeduplicatedDataTypes(dataTypes.subList(0, 1)); + plan.setExpression(expression); + plan.setEnableRedirect(true); + queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember); + RemoteQueryContext context = + new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1)); + try { + QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context); + assertEquals("ip:port=0.0.0.0:6667", dataSet.getEndPoint().toString()); + } finally { + QueryResourceManager.getInstance().endQuery(context.getQueryId()); + } + } } diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 20387de..9868e26 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -39,6 +39,7 @@ import java.util.Random; public class SessionExample { private static Session session; + private static Session sessionEnableRedirect; private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1"; private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2"; private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3"; @@ -77,6 +78,17 @@ public class SessionExample { deleteTimeseries(); setTimeout(); session.close(); + + sessionEnableRedirect = new Session("127.0.0.1", 6667, "root", "root"); + sessionEnableRedirect.setEnableQueryRedirection(true); + sessionEnableRedirect.open(false); + + // set session fetchSize + sessionEnableRedirect.setFetchSize(10000); + + insertRecord4Redirect(); + query4Redirect(); + sessionEnableRedirect.close(); } private static void createTimeseries() @@ -193,6 +205,31 @@ public class SessionExample { } } + private static void insertRecord4Redirect() + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < 6; i++) { + for (int j = 0; j < 2; j++) { + String deviceId = "root.redirect" + i + ".d" + j; + List<String> measurements = new ArrayList<>(); + List<TSDataType> types = new ArrayList<>(); + measurements.add("s1"); + measurements.add("s2"); + measurements.add("s3"); + types.add(TSDataType.INT64); + types.add(TSDataType.INT64); + types.add(TSDataType.INT64); + + for (long time = 0; time < 5; time++) { + List<Object> values = new ArrayList<>(); + values.add(1L + time); + values.add(2L + time); + values.add(3L + time); + session.insertRecord(deviceId, time, measurements, types, values); + } + } + } + } + private static void insertStrRecord() throws IoTDBConnectionException, StatementExecutionException { String deviceId = ROOT_SG1_D1; @@ -448,6 +485,66 @@ public class SessionExample { dataSet.closeOperationHandle(); } + private static void query4Redirect() + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < 6; i++) { + SessionDataSet dataSet = + sessionEnableRedirect.executeQueryStatement("select * from root.redirect" + i + ".d1"); + System.out.println(dataSet.getColumnNames()); + dataSet.setFetchSize(1024); // default is 10000 + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + + dataSet.closeOperationHandle(); + } + + for (int i = 0; i < 6; i++) { + SessionDataSet dataSet = + sessionEnableRedirect.executeQueryStatement( + "select * from root.redirect" + i + ".d1 where time >= 1 and time < 10"); + System.out.println(dataSet.getColumnNames()); + dataSet.setFetchSize(1024); // default is 10000 + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + + dataSet.closeOperationHandle(); + } + + for (int i = 0; i < 6; i++) { + SessionDataSet dataSet = + sessionEnableRedirect.executeQueryStatement( + "select * from root.redirect" + + i + + ".d1 where time >= 1 and time < 10 align by device"); + System.out.println(dataSet.getColumnNames()); + dataSet.setFetchSize(1024); // default is 10000 + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + + dataSet.closeOperationHandle(); + } + + for (int i = 0; i < 6; i++) { + SessionDataSet dataSet = + sessionEnableRedirect.executeQueryStatement( + "select * from root.redirect" + + i + + ".d1 where time >= 1 and time < 10 and root.redirect" + + i + + "d1.s1 > 1"); + System.out.println(dataSet.getColumnNames()); + dataSet.setFetchSize(1024); // default is 10000 + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + + dataSet.closeOperationHandle(); + } + } + private static void queryWithTimeout() throws IoTDBConnectionException, StatementExecutionException { SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1", 2000); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java index bcd5b05..a188db5 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java @@ -42,6 +42,8 @@ public abstract class QueryPlan extends PhysicalPlan { private Map<String, Integer> pathToIndex = new HashMap<>(); + private boolean enableRedirect = false; + public QueryPlan() { super(true); setOperatorType(Operator.OperatorType.QUERY); @@ -126,4 +128,12 @@ public abstract class QueryPlan extends PhysicalPlan { throws IllegalPathException { return columnForReader; } + + public boolean isEnableRedirect() { + return enableRedirect; + } + + public void setEnableRedirect(boolean enableRedirect) { + this.enableRedirect = enableRedirect; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java index f8ce3ea..b37a16e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.executor.IQueryRouter; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.rpc.RedirectException; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Field; @@ -108,6 +109,8 @@ public class AlignByDeviceDataSet extends QueryDataSet { this.dataSetType = DataSetType.QUERY; this.rawDataQueryPlan = new RawDataQueryPlan(); this.rawDataQueryPlan.setAscending(alignByDevicePlan.isAscending()); + // only redirect query for raw data query + this.rawDataQueryPlan.setEnableRedirect(alignByDevicePlan.isEnableRedirect()); } this.curDataSetInitialized = false; @@ -198,6 +201,14 @@ public class AlignByDeviceDataSet extends QueryDataSet { throw new IOException(e); } + if (currentDataSet.getEndPoint() != null) { + org.apache.iotdb.service.rpc.thrift.EndPoint endPoint = + new org.apache.iotdb.service.rpc.thrift.EndPoint(); + endPoint.setIp(currentDataSet.getEndPoint().getIp()); + endPoint.setPort(currentDataSet.getEndPoint().getPort()); + throw new RedirectException(endPoint); + } + if (currentDataSet.hasNext()) { curDataSetInitialized = true; return true; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java index 63f08c1..a707063 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java @@ -180,6 +180,17 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet init(); } + /** + * dummy dataSet for redirect query + * + * @param queryId queryId for the query + */ + public RawQueryDataSetWithoutValueFilter(long queryId) { + this.queryId = queryId; + blockingQueueArray = new BlockingQueue[0]; + timeHeap = new TimeSelector(0, ascending); + } + private void init() throws IOException, InterruptedException { timeHeap = new TimeSelector(seriesReaderList.size() << 1, ascending); for (int i = 0; i < seriesReaderList.size(); i++) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index 042934a..cb76e20 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -62,6 +62,10 @@ public class RawDataQueryExecutor { public QueryDataSet executeWithoutValueFilter(QueryContext context) throws StorageEngineException, QueryProcessException { List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context); + QueryDataSet dataSet = needRedirect(context.getQueryId(), null); + if (dataSet != null) { + return dataSet; + } try { return new RawQueryDataSetWithoutValueFilter( context.getQueryId(), @@ -80,6 +84,10 @@ public class RawDataQueryExecutor { public final QueryDataSet executeNonAlign(QueryContext context) throws StorageEngineException, QueryProcessException { List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context); + QueryDataSet dataSet = needRedirect(context.getQueryId(), null); + if (dataSet != null) { + return dataSet; + } return new NonAlignEngineDataSet( context.getQueryId(), queryPlan.getDeduplicatedPaths(), @@ -142,6 +150,10 @@ public class RawDataQueryExecutor { timestampGenerator.hasOrNode()); List<IReaderByTimestamp> readersOfSelectedSeries = initSeriesReaderByTimestamp(context, queryPlan, cached); + QueryDataSet dataSet = needRedirect(context.getQueryId(), timestampGenerator); + if (dataSet != null) { + return dataSet; + } return new RawQueryDataSetWithValueFilter( queryPlan.getDeduplicatedPaths(), queryPlan.getDeduplicatedDataTypes(), @@ -196,4 +208,14 @@ public class RawDataQueryExecutor { throws StorageEngineException { return new ServerTimeGenerator(expression, context, queryPlan); } + + /** + * check whether need to redirect query to other node + * + * @param queryId queryId to cancel query + * @return dummyDataSet to avoid more cost, if null, no need + */ + protected QueryDataSet needRedirect(long queryId, TimeGenerator timeGenerator) { + return null; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 031b2bc..95119c8 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -83,8 +83,10 @@ import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.db.utils.SchemaUtils; +import org.apache.iotdb.rpc.RedirectException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.service.rpc.thrift.ServerProperties; import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq; import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; @@ -531,7 +533,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { physicalPlan, req.fetchSize, req.timeout, - sessionIdUsernameMap.get(req.getSessionId())) + sessionIdUsernameMap.get(req.getSessionId()), + req.isEnableRedirectQuery()) : executeUpdateStatement(physicalPlan, req.getSessionId()); } catch (Exception e) { return RpcUtils.getTSExecuteStatementResp(onQueryException(e, "executing executeStatement")); @@ -557,7 +560,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { physicalPlan, req.fetchSize, req.timeout, - sessionIdUsernameMap.get(req.getSessionId())) + sessionIdUsernameMap.get(req.getSessionId()), + req.isEnableRedirectQuery()) : RpcUtils.getTSExecuteStatementResp( TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); } catch (Exception e) { @@ -582,7 +586,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { physicalPlan, req.fetchSize, config.getQueryTimeoutThreshold(), - sessionIdUsernameMap.get(req.getSessionId())) + sessionIdUsernameMap.get(req.getSessionId()), + req.isEnableRedirectQuery()) : RpcUtils.getTSExecuteStatementResp( TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); } catch (Exception e) { @@ -602,7 +607,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { PhysicalPlan plan, int fetchSize, long timeout, - String username) + String username, + boolean enableRedirect) throws QueryProcessException, SQLException, StorageEngineException, QueryFilterOptimizationException, MetadataException, IOException, InterruptedException, TException, AuthException { @@ -644,8 +650,25 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) { resp = getQueryColumnHeaders(plan, username); } + if (plan instanceof QueryPlan) { + ((QueryPlan) plan).setEnableRedirect(enableRedirect); + } // create and cache dataset QueryDataSet newDataSet = createQueryDataSet(queryId, plan); + + if (newDataSet.getEndPoint() != null && enableRedirect) { + // redirect query + LOGGER.debug( + "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint()); + TSStatus status = new TSStatus(); + status.setRedirectNode( + new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort())); + status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); + resp.setStatus(status); + resp.setQueryId(queryId); + return resp; + } + if (plan instanceof ShowPlan || plan instanceof AuthorPlan) { resp = getListDataSetHeaders(newDataSet); } else if (plan instanceof UDFPlan) { @@ -662,7 +685,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (newDataSet instanceof DirectNonAlignDataSet) { resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username)); } else { - resp.setQueryDataSet(fillRpcReturnData(fetchSize, newDataSet, username)); + try { + TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username); + resp.setQueryDataSet(tsQueryDataSet); + } catch (RedirectException e) { + LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint()); + if (enableRedirect) { + // redirect query + TSStatus status = new TSStatus(); + status.setRedirectNode(e.getEndPoint()); + status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode()); + resp.setStatus(status); + resp.setQueryId(queryId); + return resp; + } else { + LOGGER.error( + "execute {} error, if session not support redirect, should not throw redirection exception", + statement, + e); + } + } } resp.setQueryId(queryId); diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java index e84a96a..15f3157 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java @@ -21,9 +21,10 @@ package org.apache.iotdb.rpc; import org.apache.iotdb.service.rpc.thrift.EndPoint; +import java.io.IOException; import java.util.Map; -public class RedirectException extends Exception { +public class RedirectException extends IOException { private final EndPoint endPoint; diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index b873ade..c3db5e4 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -94,6 +94,8 @@ public class Session { protected Map<EndPoint, SessionConnection> endPointToSessionConnection; private AtomicReference<IoTDBConnectionException> tmp = new AtomicReference<>(); + protected boolean enableQueryRedirection = false; + public Session(String host, int rpcPort) { this( host, @@ -254,6 +256,7 @@ public class Session { this.enableRPCCompression = enableRPCCompression; this.connectionTimeoutInMs = connectionTimeoutInMs; defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId); + defaultSessionConnection.setEnableRedirect(enableQueryRedirection); metaSessionConnection = defaultSessionConnection; isClosed = false; if (enableCacheLeader) { @@ -452,7 +455,29 @@ public class Session { */ public SessionDataSet executeQueryStatement(String sql) throws StatementExecutionException, IoTDBConnectionException { - return defaultSessionConnection.executeQueryStatement(sql, timeout); + try { + logger.info("{} execute sql {}", defaultSessionConnection.getEndPoint(), sql); + return defaultSessionConnection.executeQueryStatement(sql, timeout); + } catch (RedirectException e) { + handleQueryRedirection(e.getEndPoint()); + if (enableQueryRedirection) { + logger.debug( + "{} redirect query {} to {}", + defaultSessionConnection.getEndPoint(), + sql, + e.getEndPoint()); + // retry + try { + return defaultSessionConnection.executeQueryStatement(sql, timeout); + } catch (RedirectException redirectException) { + logger.error("{} redirect twice", sql, redirectException); + throw new StatementExecutionException(sql + " redirect twice, please try again."); + } + } else { + throw new StatementExecutionException( + "raw data query do not support redirect, please confirm the session and server conf."); + } + } } /** @@ -467,7 +492,24 @@ public class Session { if (timeoutInMs < 0) { throw new StatementExecutionException("Timeout must be >= 0, please check and try again."); } - return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs); + try { + return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs); + } catch (RedirectException e) { + handleQueryRedirection(e.getEndPoint()); + if (enableQueryRedirection) { + logger.debug("redirect query {} to {}", sql, e.getEndPoint()); + // retry + try { + return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs); + } catch (RedirectException redirectException) { + logger.error("{} redirect twice", sql, redirectException); + throw new StatementExecutionException(sql + " redirect twice, please try again."); + } + } else { + throw new StatementExecutionException( + "raw data query do not support redirect, please confirm the session and server conf."); + } + } } /** @@ -493,7 +535,24 @@ public class Session { */ public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime) throws StatementExecutionException, IoTDBConnectionException { - return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime); + try { + return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime); + } catch (RedirectException e) { + handleQueryRedirection(e.getEndPoint()); + if (enableQueryRedirection) { + logger.debug("redirect query {} to {}", paths, e.getEndPoint()); + // retry + try { + return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime); + } catch (RedirectException redirectException) { + logger.error("Redirect twice", redirectException); + throw new StatementExecutionException("Redirect twice, please try again."); + } + } else { + throw new StatementExecutionException( + "raw data query do not support redirect, please confirm the session and server conf."); + } + } } /** @@ -585,6 +644,14 @@ public class Session { } } + private void handleQueryRedirection(EndPoint endPoint) throws IoTDBConnectionException { + if (enableQueryRedirection) { + defaultSessionConnection.close(); + defaultSessionConnection = constructSessionConnection(this, endPoint, zoneId); + defaultSessionConnection.setEnableRedirect(enableQueryRedirection); + } + } + /** * insert data in one row, if you want improve your performance, please use insertInBatch method * or insertBatch method @@ -1454,4 +1521,12 @@ public class Session { throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType); } } + + public boolean isEnableQueryRedirection() { + return enableQueryRedirection; + } + + public void setEnableQueryRedirection(boolean enableQueryRedirection) { + this.enableQueryRedirection = enableQueryRedirection; + } } diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 7a43483..f2b6665 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -69,6 +69,7 @@ public class SessionConnection { private long statementId; private ZoneId zoneId; private EndPoint endPoint; + private boolean enableRedirect = false; // TestOnly public SessionConnection() {} @@ -254,7 +255,11 @@ public class SessionConnection { throws IoTDBConnectionException, StatementExecutionException { SessionDataSet dataSet = null; try { - dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout); + try { + dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout); + } catch (RedirectException e) { + throw new StatementExecutionException("need to redirect query, should not see this.", e); + } return dataSet.hasNext(); } finally { if (dataSet != null) { @@ -264,13 +269,15 @@ public class SessionConnection { } protected SessionDataSet executeQueryStatement(String sql, long timeout) - throws StatementExecutionException, IoTDBConnectionException { + throws StatementExecutionException, IoTDBConnectionException, RedirectException { TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId); execReq.setFetchSize(session.fetchSize); execReq.setTimeout(timeout); TSExecuteStatementResp execResp; try { + execReq.setEnableRedirectQuery(enableRedirect); execResp = client.executeQueryStatement(execReq); + RpcUtils.verifySuccessWithRedirection(execResp.getStatus()); } catch (TException e) { if (reconnect()) { try { @@ -304,6 +311,7 @@ public class SessionConnection { throws IoTDBConnectionException, StatementExecutionException { TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId); try { + execReq.setEnableRedirectQuery(enableRedirect); TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq); RpcUtils.verifySuccess(execResp.getStatus()); } catch (TException e) { @@ -322,13 +330,15 @@ public class SessionConnection { } protected SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime) - throws StatementExecutionException, IoTDBConnectionException { + throws StatementExecutionException, IoTDBConnectionException, RedirectException { TSRawDataQueryReq execReq = new TSRawDataQueryReq(sessionId, paths, startTime, endTime, statementId); execReq.setFetchSize(session.fetchSize); TSExecuteStatementResp execResp; try { + execReq.setEnableRedirectQuery(enableRedirect); execResp = client.executeRawDataQuery(execReq); + RpcUtils.verifySuccessWithRedirection(execResp.getStatus()); } catch (TException e) { if (reconnect()) { try { @@ -660,4 +670,20 @@ public class SessionConnection { } return flag; } + + public boolean isEnableRedirect() { + return enableRedirect; + } + + public void setEnableRedirect(boolean enableRedirect) { + this.enableRedirect = enableRedirect; + } + + public EndPoint getEndPoint() { + return endPoint; + } + + public void setEndPoint(EndPoint endPoint) { + this.endPoint = endPoint; + } } diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift index 83cb70c..94892fa 100644 --- a/thrift/src/main/thrift/rpc.thrift +++ b/thrift/src/main/thrift/rpc.thrift @@ -117,6 +117,8 @@ struct TSExecuteStatementReq { 4: optional i32 fetchSize 5: optional i64 timeout + + 6: optional bool enableRedirectQuery; } struct TSExecuteBatchStatementReq{ @@ -276,6 +278,7 @@ struct TSRawDataQueryReq { 4: required i64 startTime 5: required i64 endTime 6: required i64 statementId + 7: optional bool enableRedirectQuery; } struct TSCreateMultiTimeseriesReq { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java index eb7a206..4f25739 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java @@ -34,6 +34,38 @@ public abstract class QueryDataSet { protected int rowOffset = 0; protected int alreadyReturnedRowNum = 0; protected boolean ascending; + /* + * whether current data group has data for query. + * If not null(must be in cluster mode), we need to redirect the query to any data group which has some data to speed up query. + */ + protected EndPoint endPoint = null; + + /** For redirect query. Need keep consistent with EndPoint in rpc.thrift. */ + public static class EndPoint { + private String ip = null; + private int port = 0; + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + @Override + public String toString() { + return "ip:port=" + ip + ":" + port; + } + } public QueryDataSet() {} @@ -114,4 +146,12 @@ public abstract class QueryDataSet { public boolean hasLimit() { return rowLimit > 0; } + + public EndPoint getEndPoint() { + return endPoint; + } + + public void setEndPoint(EndPoint endPoint) { + this.endPoint = endPoint; + } }
