This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 626f2a39c9cb2e71c14d0a0fba9ad8c90d0b8d47 Author: lta <[email protected]> AuthorDate: Tue May 21 20:58:57 2019 +0800 add group by it and add ClusterNullableBatach data to handle null timevalue pair --- .../apache/iotdb/cluster/config/ClusterConfig.java | 2 +- .../cluster/qp/executor/NonQueryExecutor.java | 2 - ...atchData.java => ClusterNullableBatchData.java} | 42 ++++++--- .../ClusterGroupByDataSetWithOnlyTimeFilter.java | 12 ++- .../ClusterGroupByDataSetWithTimeGenerator.java | 101 +++++++++++++++++++++ .../cluster/query/executor/ClusterQueryRouter.java | 2 +- .../querynode/ClusterLocalQueryManager.java | 7 +- .../querynode/ClusterLocalSingleQueryManager.java | 10 +- .../ClusterFillSelectSeriesBatchReader.java | 11 ++- ...lusterGroupBySelectSeriesBatchReaderEntity.java | 10 +- .../query/utils/ClusterTimeValuePairUtils.java | 34 ++++++- .../iotdb/cluster/rpc/raft/NodeAsClient.java | 7 +- .../iotdb/cluster/integration/IOTDBGroupByIT.java | 59 ++++++------ .../integration/IoTDBAggregationSmallDataIT.java | 15 --- .../groupby/GroupByWithValueFilterDataSet.java | 10 +- .../apache/iotdb/tsfile/read/common/BatchData.java | 4 +- 16 files changed, 228 insertions(+), 100 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index 45df39f..627f561 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java @@ -102,7 +102,7 @@ public class ClusterConfig { * then it sends requests to other nodes in the cluster. This parameter represents the maximum * timeout for these requests. The unit is milliseconds. **/ - private int qpTaskTimeout = 500000; + private int qpTaskTimeout = 1000; /** * Number of virtual nodes diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java index 75fc3a8..ac9bddf 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java @@ -352,8 +352,6 @@ public class NonQueryExecutor extends AbstractQPExecutor { return RaftUtils.executeRaftTaskForLocalProcessor(service, qpTask, response); } - - /** * Async handle task by QPTask and leader id. * diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java similarity index 54% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java rename to cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java index 2d17d0c..8315a04 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java @@ -18,48 +18,62 @@ */ package org.apache.iotdb.cluster.query.common; +import java.util.ArrayList; +import java.util.List; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; /** - * <code>FillBatchData</code> is a self-defined data structure which is used in cluster query - * process of fill type, which only contains one TimeValuePair and value can be null. + * <code>ClusterNullableBatchData</code> is a self-defined data structure which is used in cluster + * query process of fill type and group by type, which may contain <code>null</code> in list of + * TimeValuePair. */ -public class FillBatchData extends BatchData { +public class ClusterNullableBatchData extends BatchData { - private TimeValuePair timeValuePair; - private boolean isUsed; + private List<TimeValuePair> timeValuePairList; + private int index; - public FillBatchData(TimeValuePair timeValuePair, boolean isUsed) { - this.timeValuePair = timeValuePair; - this.isUsed = isUsed; + public ClusterNullableBatchData() { + this.timeValuePairList = new ArrayList<>(); + this.index = 0; } @Override public boolean hasNext() { - return !isUsed; + return index < timeValuePairList.size(); } @Override public void next() { - isUsed = true; + index++; } @Override public long currentTime() { - return timeValuePair.getTimestamp(); + rangeCheckForTime(index); + return timeValuePairList.get(index).getTimestamp(); } @Override public Object currentValue() { - if (!isUsed) { - return timeValuePair.getValue() == null ? null : timeValuePair.getValue().getValue(); + if (index < length()) { + return timeValuePairList.get(index).getValue() == null ? null + : timeValuePairList.get(index).getValue().getValue(); } else { return null; } } + @Override + public int length() { + return timeValuePairList.size(); + } + public TimeValuePair getTimeValuePair() { - return isUsed ? null : timeValuePair; + return index < length() ? timeValuePairList.get(index) : null; + } + + public void addTimeValuePair(TimeValuePair timeValuePair){ + timeValuePairList.add(timeValuePair); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java index 98460e4..ef5386d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java @@ -64,7 +64,7 @@ public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTime List<Path> paths, long unit, long origin, List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) { super(jobId, paths, unit, origin, mergedIntervals); - this.queryManager =queryManager; + this.queryManager = queryManager; this.readersOfSelectedSeries = new ArrayList<>(); } @@ -132,10 +132,14 @@ public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTime RowRecord record = new RowRecord(startTime); for (int i = 0; i < functions.size(); i++) { IPointReader reader = readersOfSelectedSeries.get(i); - if(reader != null){ + if (reader != null) { TimeValuePair timeValuePair = reader.next(); - record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i))); - }else { + if (timeValuePair == null) { + record.addField(new Field(null)); + } else { + record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i))); + } + } else { AggreResultData res; try { res = nextSeries(i); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java index 00f2d88..89ed1b9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity; @@ -30,11 +31,13 @@ import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.utils.Pair; @@ -88,4 +91,102 @@ public class ClusterGroupByDataSetWithTimeGenerator extends GroupByWithValueFilt .createReadersByTimestampOfSelectedPaths(selectedSeries, context, queryManager, selectSeriesDataTypes); } + + @Override + public RowRecord next() throws IOException { + if (!hasCachedTimeInterval) { + throw new IOException("need to call hasNext() before calling next()" + + " in GroupByWithOnlyTimeFilterDataSet."); + } + hasCachedTimeInterval = false; + for (AggregateFunction function : functions) { + function.init(); + } + + long[] timestampArray = new long[timestampFetchSize]; + int timeArrayLength = 0; + if (hasCachedTimestamp) { + if (timestamp < endTime) { + hasCachedTimestamp = false; + timestampArray[timeArrayLength++] = timestamp; + } else { + return constructRowRecord(); + } + } + + while (timestampGenerator.hasNext()) { + // construct timestamp array + timeArrayLength = constructTimeArrayForOneCal(timestampArray, timeArrayLength); + + fetchSelectDataFromRemoteNode(timeArrayLength, timestampArray); + + // cal result using timestamp array + for (int i = 0; i < selectedSeries.size(); i++) { + functions.get(i).calcAggregationUsingTimestamps( + timestampArray, timeArrayLength, allDataReaderList.get(i)); + } + + timeArrayLength = 0; + // judge if it's end + if (timestamp >= endTime) { + hasCachedTimestamp = true; + break; + } + } + + // fetch select series data from remote node + fetchSelectDataFromRemoteNode(timeArrayLength, timestampArray); + + if (timeArrayLength > 0) { + // cal result using timestamp array + for (int i = 0; i < selectedSeries.size(); i++) { + functions.get(i).calcAggregationUsingTimestamps( + timestampArray, timeArrayLength, allDataReaderList.get(i)); + } + } + return constructRowRecord(); + } + + /** + * Get select series batch data by batch timestamp + * @param timeArrayLength length of batch timestamp + * @param timestampArray timestamp array + */ + private void fetchSelectDataFromRemoteNode(int timeArrayLength, long[] timestampArray) + throws IOException { + if(timeArrayLength != 0){ + List<Long> batchTimestamp = new ArrayList<>(); + for(int i = 0 ; i < timeArrayLength; i++){ + batchTimestamp.add(timestampArray[i]); + } + + try { + queryManager.fetchBatchDataByTimestampForAllSelectPaths(batchTimestamp); + } catch ( + RaftConnectionException e) { + throw new IOException(e); + } + } + } + + /** + * construct an array of timestamps for one batch of a group by partition calculating. + * + * @param timestampArray timestamp array + * @param timeArrayLength the current length of timestamp array + * @return time array length + */ + private int constructTimeArrayForOneCal(long[] timestampArray, int timeArrayLength) + throws IOException { + for (int cnt = 1; cnt < timestampFetchSize && timestampGenerator.hasNext(); cnt++) { + timestamp = timestampGenerator.next(); + if (timestamp < endTime) { + timestampArray[timeArrayLength++] = timestamp; + } else { + hasCachedTimestamp = true; + break; + } + } + return timeArrayLength; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java index 54e0df5..9f9dc41 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java @@ -168,7 +168,7 @@ public class ClusterQueryRouter extends AbstractQueryRouter { .optimize(expression, selectedSeries); try { if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) { -// queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel()); + queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel()); ClusterGroupByDataSetWithOnlyTimeFilter groupByEngine = new ClusterGroupByDataSetWithOnlyTimeFilter( jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager); groupByEngine.initGroupBy(context, aggres, optimizedExpression); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java index c83e2a2..4e09af8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java @@ -59,12 +59,7 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager { TASK_ID_MAP_JOB_ID.put(taskId, jobId); ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId); SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager); - try { - return localQueryManager.createSeriesReader(request); - }catch (Exception e){ - e.printStackTrace(); - return null; - } + return localQueryManager.createSeriesReader(request); } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java index e9c0dcb..25adbf5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java @@ -492,11 +492,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM @Override public void run() { -// try { -//// close(); -// } catch (FileNodeManagerException e) { -// LOGGER.error(e.getMessage()); -// } + try { + close(); + } catch (FileNodeManagerException e) { + LOGGER.error(e.getMessage()); + } } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java index fadd92f..a16a220 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java @@ -19,8 +19,9 @@ package org.apache.iotdb.cluster.query.reader.querynode; import java.io.IOException; -import org.apache.iotdb.cluster.query.common.FillBatchData; +import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData; import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; @@ -34,6 +35,12 @@ public class ClusterFillSelectSeriesBatchReader extends ClusterSelectSeriesBatch @Override public BatchData nextBatch() throws IOException { - return hasNext() ? new FillBatchData(reader.next(), false) : new FillBatchData(null, true); + if(hasNext()){ + ClusterNullableBatchData batchData = new ClusterNullableBatchData(); + batchData.addTimeValuePair(reader.next()); + return batchData; + }else{ + return new ClusterNullableBatchData(); + } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java index 3b7fabe..5c1d9d1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java @@ -23,7 +23,10 @@ import static org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSerie import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData; +import org.apache.iotdb.cluster.query.utils.ClusterTimeValuePairUtils; import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet; +import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Field; @@ -56,7 +59,7 @@ public class ClusterGroupBySelectSeriesBatchReaderEntity implements public List<BatchData> nextBatchList() throws IOException { List<BatchData> batchDataList = new ArrayList<>(paths.size()); for (int i = 0; i < paths.size(); i++) { - batchDataList.add(new BatchData(dataTypes.get(i), true)); + batchDataList.add(new ClusterNullableBatchData()); } int dataPointCount = 0; while (true) { @@ -68,10 +71,9 @@ public class ClusterGroupBySelectSeriesBatchReaderEntity implements long time = rowRecord.getTimestamp(); List<Field> fieldList = rowRecord.getFields(); for (int j = 0; j < paths.size(); j++) { - BatchData batchData = batchDataList.get(j); + ClusterNullableBatchData batchData = (ClusterNullableBatchData) batchDataList.get(j); Object value = fieldList.get(j).getObjectValue(dataTypes.get(j)); - batchData.putTime(time); - batchData.putAnObject(value); + batchData.addTimeValuePair(fieldList.get(j).toString().equals("null") ? null : ClusterTimeValuePairUtils.getTimeValuePair(time, value,dataTypes.get(j))); } } return batchDataList; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java index 7525368..d9c8d75 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java @@ -18,10 +18,14 @@ */ package org.apache.iotdb.cluster.query.utils; -import org.apache.iotdb.cluster.query.common.FillBatchData; +import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.db.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.utils.Binary; public class ClusterTimeValuePairUtils { @@ -35,10 +39,32 @@ public class ClusterTimeValuePairUtils { * @return -given data's (time,value) pair */ public static TimeValuePair getCurrentTimeValuePair(BatchData data) { - if (data instanceof FillBatchData){ - return ((FillBatchData)data).getTimeValuePair(); - }else{ + if (data instanceof ClusterNullableBatchData) { + return ((ClusterNullableBatchData) data).getTimeValuePair(); + } else { return TimeValuePairUtils.getCurrentTimeValuePair(data); } } + + /** + * Get (time,value) pair according to data type + */ + public static TimeValuePair getTimeValuePair(long time, Object v, TSDataType dataType) { + switch (dataType) { + case INT32: + return new TimeValuePair(time, new TsPrimitiveType.TsInt((int) v)); + case INT64: + return new TimeValuePair(time, new TsPrimitiveType.TsLong((long) v)); + case FLOAT: + return new TimeValuePair(time, new TsPrimitiveType.TsFloat((float) v)); + case DOUBLE: + return new TimeValuePair(time, new TsPrimitiveType.TsDouble((double) v)); + case TEXT: + return new TimeValuePair(time, new TsPrimitiveType.TsBinary((Binary) v)); + case BOOLEAN: + return new TimeValuePair(time, new TsPrimitiveType.TsBoolean((boolean) v)); + default: + throw new UnSupportedDataTypeException(String.valueOf(v)); + } + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java index 197c7eb..b4a2f25 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java @@ -20,9 +20,9 @@ package org.apache.iotdb.cluster.rpc.raft; import com.alipay.sofa.jraft.entity.PeerId; import org.apache.iotdb.cluster.exception.RaftConnectionException; +import org.apache.iotdb.cluster.qp.task.QueryTask; import org.apache.iotdb.cluster.qp.task.SingleQPTask; import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest; -import org.apache.iotdb.cluster.qp.task.QueryTask; /** * Handle the request and process the result as a client with the current node @@ -31,7 +31,8 @@ public interface NodeAsClient { /** * Asynchronous processing requests - * @param leader leader node of the target group + * + * @param leader leader node of the target group * @param qpTask single QPTask to be executed */ void asyncHandleRequest(BasicRequest request, PeerId leader, @@ -39,8 +40,8 @@ public interface NodeAsClient { /** * Synchronous processing requests - * @param peerId leader node of the target group * + * @param peerId leader node of the target group */ QueryTask syncHandleRequest(BasicRequest request, PeerId peerId); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java index 265d509..0165bba 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java @@ -120,7 +120,6 @@ public class IOTDBGroupByIT { EnvironmentUtils.closeMemControl(); CLUSTER_CONFIG.createAllPath(); server = Server.getInstance(); - QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0); server.start(); EnvironmentUtils.envSetUp(); Class.forName(Config.JDBC_DRIVER_NAME); @@ -135,7 +134,7 @@ public class IOTDBGroupByIT { } @Test - public void countSumMeanTest() throws SQLException { + public void countSumMeanTest() { String[] retArray1 = new String[]{ "2,1,4.4,4.4", "5,3,35.8,11.933333333333332", @@ -162,30 +161,10 @@ public class IOTDBGroupByIT { }; try (Connection connection = DriverManager. getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) { -// Statement statement = connection.createStatement(); -// boolean hasResultSet = statement.execute( -// "select count(temperature), sum(temperature), mean(temperature) from " -// + "root.ln.wf01.wt01 where time > 3 " -// + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])"); -// -// Assert.assertTrue(hasResultSet); -// ResultSet resultSet = statement.getResultSet(); -// int cnt = 0; -// while (resultSet.next()) { -// String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet -// .getString(count("root.ln.wf01.wt01.temperature")) + "," + -// resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet -// .getString(mean("root.ln.wf01.wt01.temperature")); -// Assert.assertEquals(retArray1[cnt], ans); -// cnt++; -// } -// Assert.assertEquals(retArray1.length, cnt); -// statement.close(); - Statement statement = connection.createStatement(); boolean hasResultSet = statement.execute( "select count(temperature), sum(temperature), mean(temperature) from " - + "root.ln.wf01.wt01 where temperature > 3 " + + "root.ln.wf01.wt01 where time > 3 " + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])"); Assert.assertTrue(hasResultSet); @@ -196,6 +175,26 @@ public class IOTDBGroupByIT { .getString(count("root.ln.wf01.wt01.temperature")) + "," + resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet .getString(mean("root.ln.wf01.wt01.temperature")); + Assert.assertEquals(retArray1[cnt], ans); + cnt++; + } + Assert.assertEquals(retArray1.length, cnt); + statement.close(); + + statement = connection.createStatement(); + hasResultSet = statement.execute( + "select count(temperature), sum(temperature), mean(temperature) from " + + "root.ln.wf01.wt01 where temperature > 3 " + + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])"); + + Assert.assertTrue(hasResultSet); + resultSet = statement.getResultSet(); + cnt = 0; + while (resultSet.next()) { + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet + .getString(count("root.ln.wf01.wt01.temperature")) + "," + + resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet + .getString(mean("root.ln.wf01.wt01.temperature")); Assert.assertEquals(retArray2[cnt], ans); cnt++; } @@ -209,7 +208,7 @@ public class IOTDBGroupByIT { } @Test - public void maxMinValeTimeTest() throws SQLException { + public void maxMinValeTimeTest() { String[] retArray1 = new String[]{ "2,4.4,4.4,4,4", "5,20.2,5.5,20,5", @@ -274,7 +273,6 @@ public class IOTDBGroupByIT { + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature")); Assert.assertEquals(retArray2[cnt], ans); cnt++; - //System.out.println(ans); } Assert.assertEquals(retArray2.length, cnt); statement.close(); @@ -286,7 +284,7 @@ public class IOTDBGroupByIT { } @Test - public void firstLastTest() throws SQLException { + public void firstLastTest() { String[] retArray1 = new String[]{ "2,4.4,4.4", "5,20.2,5.5", @@ -325,7 +323,6 @@ public class IOTDBGroupByIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet .getString(last("root.ln.wf01.wt01.temperature")) + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")); - System.out.println(ans); Assert.assertEquals(retArray1[cnt], ans); cnt++; } @@ -345,7 +342,6 @@ public class IOTDBGroupByIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet .getString(last("root.ln.wf01.wt01.temperature")) + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")); - System.out.println(ans); Assert.assertEquals(retArray2[cnt], ans); cnt++; } @@ -359,7 +355,7 @@ public class IOTDBGroupByIT { } @Test - public void largeIntervalTest() throws SQLException { + public void largeIntervalTest() { String[] retArray1 = new String[]{ "2,4.4,4,20,4", "30,30.3,16,610,30", @@ -410,7 +406,6 @@ public class IOTDBGroupByIT { + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature")); Assert.assertEquals(retArray2[cnt], ans); cnt++; - //System.out.println(ans); } Assert.assertEquals(retArray2.length, cnt); statement.close(); @@ -422,7 +417,7 @@ public class IOTDBGroupByIT { } @Test - public void smallPartitionTest() throws SQLException { + public void smallPartitionTest() { String[] retArray1 = new String[]{ "50,100.1,50.5,150.6", "615,500.5,500.5,500.5" @@ -449,7 +444,6 @@ public class IOTDBGroupByIT { .getString(last("root.ln.wf01.wt01.temperature")) + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + "," + resultSet.getString(sum("root.ln.wf01.wt01.temperature")); - System.out.println(ans); Assert.assertEquals(retArray1[cnt], ans); cnt++; } @@ -470,7 +464,6 @@ public class IOTDBGroupByIT { .getString(last("root.ln.wf01.wt01.temperature")) + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + "," + resultSet.getString(sum("root.ln.wf01.wt01.temperature")); - System.out.println(ans); Assert.assertEquals(retArray2[cnt], ans); cnt++; } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java index 77afbea..056a70f 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java @@ -294,7 +294,6 @@ public class IoTDBAggregationSmallDataIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0)) + "," + resultSet.getString(last(d0s1)); - //System.out.println("!!!!!============ " + ans); Assert.assertEquals(retArray[cnt], ans); cnt++; } @@ -323,7 +322,6 @@ public class IoTDBAggregationSmallDataIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0)) + "," + resultSet.getString(first(d0s1)); - //System.out.println("!!!!!============ " + ans); Assert.assertEquals(retArray[cnt], ans); cnt++; } @@ -352,7 +350,6 @@ public class IoTDBAggregationSmallDataIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0)) + "," + resultSet.getString(sum(d0s1)) + "," + Math .round(resultSet.getDouble(sum(d0s2))); - //System.out.println("!!!!!============ " + ans); Assert.assertEquals(retArray[cnt], ans); cnt++; } @@ -381,7 +378,6 @@ public class IoTDBAggregationSmallDataIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(mean(d0s0)) + "," + Math.round(resultSet.getDouble(mean(d0s1))) + "," + Math.round(resultSet.getDouble(mean(d0s2))); - //System.out.println("!!!!!============ " + ans); Assert.assertEquals(retArray[cnt], ans); cnt++; } @@ -404,7 +400,6 @@ public class IoTDBAggregationSmallDataIT { Statement statement = connection.createStatement(); boolean hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3)," + "count(s4) from root.vehicle.d0 where s2 >= 3.33"); - // System.out.println(hasResultSet + "..."); Assert.assertTrue(hasResultSet); ResultSet resultSet = statement.getResultSet(); int cnt = 0; @@ -412,7 +407,6 @@ public class IoTDBAggregationSmallDataIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0)) + "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2)) + "," + resultSet.getString(count(d0s3)) + "," + resultSet.getString(count(d0s4)); - // System.out.println("============ " + ans); Assert.assertEquals(retArray[cnt], ans); cnt++; } @@ -444,7 +438,6 @@ public class IoTDBAggregationSmallDataIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(min_time(d0s0)) + "," + resultSet.getString(min_time(d0s1)) + "," + resultSet.getString(min_time(d0s2)) + "," + resultSet.getString(min_time(d0s3)) + "," + resultSet.getString(min_time(d0s4)); - // System.out.println("============ " + ans); Assert.assertEquals(ans, retArray[cnt]); cnt++; Assert.assertEquals(1, cnt); @@ -476,7 +469,6 @@ public class IoTDBAggregationSmallDataIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0)) + "," + resultSet.getString(max_time(d0s1)) + "," + resultSet.getString(max_time(d0s2)) + "," + resultSet.getString(max_time(d0s3)) + "," + resultSet.getString(max_time(d0s4)); - // System.out.println("============ " + ans); Assert.assertEquals(ans, retArray[cnt]); cnt++; } @@ -510,7 +502,6 @@ public class IoTDBAggregationSmallDataIT { "," + resultSet.getString(min_value(d0s2)) + "," + resultSet.getString(min_value(d0s3)) + "," + resultSet.getString(min_value(d0s4)); - // System.out.println("============ " + ans); Assert.assertEquals(ans, retArray[cnt]); cnt++; } @@ -545,8 +536,6 @@ public class IoTDBAggregationSmallDataIT { .getString(max_value(d0s2)) + "," + resultSet.getString(max_value(d0s3)) + "," + resultSet .getString(max_value(d0s4)); - //System.out.println("============ " + ans); - //Assert.assertEquals(ans, retArray[cnt]); cnt++; } Assert.assertEquals(1, cnt); @@ -568,13 +557,11 @@ public class IoTDBAggregationSmallDataIT { Statement statement = connection.createStatement(); boolean hasResultSet = statement.execute( "select count(s0) from root.vehicle.d0 where s2 >= 3.33"); - // System.out.println(hasResultSet + "..."); Assert.assertTrue(hasResultSet); ResultSet resultSet = statement.getResultSet(); int cnt = 0; while (resultSet.next()) { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0)); - //System.out.println("============ " + ans); Assert.assertEquals(ans, retArray[cnt]); cnt++; } @@ -613,7 +600,6 @@ public class IoTDBAggregationSmallDataIT { (Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")) { Statement statement = connection.createStatement(); boolean hasResultSet = statement.execute("select * from root"); - // System.out.println(hasResultSet + "..."); if (hasResultSet) { ResultSet resultSet = statement.getResultSet(); int cnt = 0; @@ -621,7 +607,6 @@ public class IoTDBAggregationSmallDataIT { String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(d0s0) + "," + resultSet.getString(d0s1) + "," + resultSet.getString(d0s2) + "," + resultSet.getString(d0s3) + "," + resultSet.getString(d1s0); - // System.out.println(ans); Assert.assertEquals(ans, retArray[cnt]); cnt++; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java index 528b378..002c28f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java @@ -42,19 +42,21 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { protected List<EngineReaderByTimeStamp> allDataReaderList; protected TimeGenerator timestampGenerator; + /** * cached timestamp for next group by partition. */ - private long timestamp; + protected long timestamp; + /** * if this object has cached timestamp for next group by partition. */ - private boolean hasCachedTimestamp; + protected boolean hasCachedTimestamp; /** * group by batch calculation size. */ - private int timestampFetchSize; + protected int timestampFetchSize; /** * constructor. @@ -152,7 +154,7 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { return timeArrayLength; } - private RowRecord constructRowRecord() { + protected RowRecord constructRowRecord() { RowRecord record = new RowRecord(startTime); functions.forEach(function -> record.addField(getField(function.getResult()))); return record; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java index aeb789e..de52849 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java @@ -378,11 +378,11 @@ public class BatchData implements Serializable { /** * Checks if the given index is in range. If not, throws an appropriate runtime exception. */ - private void rangeCheckForTime(int idx) { + protected void rangeCheckForTime(int idx) { if (idx < 0) { throw new IndexOutOfBoundsException("BatchData time range check, Index is negative: " + idx); } - if (idx >= timeLength) { + if (idx >= length()) { throw new IndexOutOfBoundsException( "BatchData time range check, Index : " + idx + ". Length : " + timeLength); }
