This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-1971 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9086826929d0455d34c1aabe82dcb3fe2c513225 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Dec 1 17:35:47 2021 +0800 init QueryDataSet and fix bug --- .../db/query/dataset/udf/UDTFAlignByTimeDataSet.java | 2 +- .../iotdb/db/query/dataset/udf/UDTFJoinDataSet.java | 19 ++++++++++--------- .../iotdb/db/query/udf/core/layer/LayerBuilder.java | 8 ++++++-- .../iotdb/tsfile/read/query/dataset/QueryDataSet.java | 18 ++++++++++++++++++ 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java index b0b6a3e..7572516 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java @@ -111,7 +111,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy public QueryDataSet executeInFragmentsIfPossible() throws QueryProcessException, IOException { // TODO make the behaviour of the return value of layerBuilder.generateJoinDataSet() the same as // TODO the original dataset - return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet() : this; + return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet(this) : this; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java index 323e1c2..f92f600 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java @@ -19,17 +19,15 @@ package org.apache.iotdb.db.query.dataset.udf; -import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet; -import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.db.utils.datastructure.TimeSelector; -import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; -import org.apache.iotdb.tsfile.exception.NotImplementedException; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import java.io.IOException; -public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet { +public class UDTFJoinDataSet extends QueryDataSet +// implements DirectAlignByTimeDataSet +{ private final UDTFFragmentDataSet[] fragmentDataSets; @@ -51,9 +49,12 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa private TimeSelector timeHeap; public UDTFJoinDataSet( + UDTFAlignByTimeDataSet udtfAlignByTimeDataSet, UDTFFragmentDataSet[] fragmentDataSets, int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex) throws IOException { + super(udtfAlignByTimeDataSet); + this.fragmentDataSets = fragmentDataSets; this.resultColumnOutputIndexToFragmentDataSetOutputIndex = resultColumnOutputIndexToFragmentDataSetOutputIndex; @@ -114,8 +115,8 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa return rowRecord; } - @Override - public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) { - throw new NotImplementedException(); - } + // @Override + // public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) { + // throw new NotImplementedException(); + // } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java index a7b0582..ab60387 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.udf.core.layer; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; +import org.apache.iotdb.db.query.dataset.udf.UDTFAlignByTimeDataSet; import org.apache.iotdb.db.query.dataset.udf.UDTFFragmentDataSet; import org.apache.iotdb.db.query.dataset.udf.UDTFJoinDataSet; import org.apache.iotdb.db.query.expression.Expression; @@ -141,7 +142,8 @@ public class LayerBuilder { return 4 <= fragmentDataSetIndexToLayerPointReaders.size(); } - public QueryDataSet generateJoinDataSet() throws QueryProcessException, IOException { + public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet udtfAlignByTimeDataSet) + throws QueryProcessException, IOException { int n = fragmentDataSetIndexToLayerPointReaders.size(); UDTFFragmentDataSet[] fragmentDataSets = new UDTFFragmentDataSet[n]; for (int i = 0; i < n; ++i) { @@ -152,6 +154,8 @@ public class LayerBuilder { } return new UDTFJoinDataSet( - fragmentDataSets, resultColumnOutputIndexToFragmentDataSetOutputIndex); + udtfAlignByTimeDataSet, + fragmentDataSets, + resultColumnOutputIndexToFragmentDataSetOutputIndex); } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java index e1e2f4f..c418e51 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java @@ -50,6 +50,24 @@ public abstract class QueryDataSet { protected int columnNum; + public QueryDataSet(QueryDataSet that) { + this.paths = that.paths; + this.dataTypes = that.dataTypes; + + this.rowLimit = that.rowLimit; + this.rowOffset = that.rowOffset; + this.alreadyReturnedRowNum = that.alreadyReturnedRowNum; + this.fetchSize = that.fetchSize; + this.ascending = that.ascending; + + this.endPoint = that.endPoint; + + this.withoutAnyNull = that.withoutAnyNull; + this.withoutAllNull = that.withoutAllNull; + + this.columnNum = that.columnNum; + } + /** For redirect query. Need keep consistent with EndPoint in rpc.thrift. */ public static class EndPoint { private String ip = null;
