This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 30e5a18 [To rel/0.12][IOTDB-2032] Fix incorrect result of descending
query with multiple time partitions (#4456)
30e5a18 is described below
commit 30e5a1832427385ecec7f0496b41c1c046698200
Author: BaiJian <[email protected]>
AuthorDate: Thu Nov 25 09:08:31 2021 +0800
[To rel/0.12][IOTDB-2032] Fix incorrect result of descending query with
multiple time partitions (#4456)
---
.../cluster/query/ClusterDataQueryExecutor.java | 7 ++--
....java => AssignPathAscPriorityMergeReader.java} | 37 +++++-----------
.../mult/AssignPathDescPriorityMergeReader.java | 49 ++++++++++++++++++++++
.../reader/mult/AssignPathManagedMergeReader.java | 43 +++++++++++++++----
.../mult/IAssignPathPriorityMergeReader.java | 47 +++++++++++++++++++++
.../cluster/query/reader/mult/MultElement.java | 49 ++++++++++++++++++++++
.../mult/AssignPathManagedMergeReaderTest.java | 3 +-
7 files changed, 196 insertions(+), 39 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index f28f088..92f100a 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -112,9 +112,7 @@ public class ClusterDataQueryExecutor extends
RawDataQueryExecutor {
throw new StorageEngineException(e);
}
List<ManagedSeriesReader> readersOfSelectedSeries = Lists.newArrayList();
- List<AbstractMultPointReader> multPointReaders = Lists.newArrayList();
-
- multPointReaders =
+ List<AbstractMultPointReader> multPointReaders =
readerFactory.getMultSeriesReader(
queryPlan.getDeduplicatedPaths(),
queryPlan.getDeviceToMeasurements(),
@@ -130,7 +128,8 @@ public class ClusterDataQueryExecutor extends
RawDataQueryExecutor {
PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i);
TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
AssignPathManagedMergeReader assignPathManagedMergeReader =
- new AssignPathManagedMergeReader(partialPath.getFullPath(),
dataType);
+ new AssignPathManagedMergeReader(
+ partialPath.getFullPath(), dataType, queryPlan.isAscending());
for (AbstractMultPointReader multPointReader : multPointReaders) {
if (multPointReader.getAllPaths().contains(partialPath.getFullPath()))
{
assignPathManagedMergeReader.addReader(multPointReader, 0);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathPriorityMergeReader.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathAscPriorityMergeReader.java
similarity index 53%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathPriorityMergeReader.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathAscPriorityMergeReader.java
index 30c6f70..7891ac7 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathPriorityMergeReader.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathAscPriorityMergeReader.java
@@ -20,47 +20,30 @@ package org.apache.iotdb.cluster.query.reader.mult;
import org.apache.iotdb.db.query.reader.universal.Element;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-import java.io.IOException;
+import java.util.PriorityQueue;
/**
* This class extends {@link extends PriorityMergeReader} for data sources
with different
* priorities.
*/
-public class AssignPathPriorityMergeReader extends PriorityMergeReader {
+public class AssignPathAscPriorityMergeReader extends PriorityMergeReader
+ implements IAssignPathPriorityMergeReader {
private String fullPath;
- public AssignPathPriorityMergeReader(String fullPath) {
+ public AssignPathAscPriorityMergeReader(String fullPath) {
super();
this.fullPath = fullPath;
}
- public void addReader(AbstractMultPointReader reader, long priority) throws
IOException {
- if (reader.hasNextTimeValuePair(fullPath)) {
- heap.add(
- new MultElement(
- reader, reader.nextTimeValuePair(fullPath), new
MergeReaderPriority(priority, 0)));
- } else {
- reader.close();
- }
+ @Override
+ public PriorityQueue<Element> getHeap() {
+ return heap;
}
- public class MultElement extends Element {
- public MultElement(
- AbstractMultPointReader reader, TimeValuePair timeValuePair,
MergeReaderPriority priority) {
- super(reader, timeValuePair, priority);
- }
-
- @Override
- public boolean hasNext() throws IOException {
- return ((AbstractMultPointReader) reader).hasNextTimeValuePair(fullPath);
- }
-
- @Override
- public void next() throws IOException {
- timeValuePair = ((AbstractMultPointReader)
reader).nextTimeValuePair(fullPath);
- }
+ @Override
+ public String getFullPath() {
+ return fullPath;
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathDescPriorityMergeReader.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathDescPriorityMergeReader.java
new file mode 100644
index 0000000..a38f491
--- /dev/null
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathDescPriorityMergeReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.Element;
+
+import java.util.PriorityQueue;
+
+/**
+ * This class extends {@link extends DescPriorityMergeReader} for data sources
with different
+ * priorities.
+ */
+public class AssignPathDescPriorityMergeReader extends DescPriorityMergeReader
+ implements IAssignPathPriorityMergeReader {
+
+ private String fullPath;
+
+ public AssignPathDescPriorityMergeReader(String fullPath) {
+ super();
+ this.fullPath = fullPath;
+ }
+
+ @Override
+ public PriorityQueue<Element> getHeap() {
+ return heap;
+ }
+
+ @Override
+ public String getFullPath() {
+ return fullPath;
+ }
+}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReader.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReader.java
index 34ecc13..6208966 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReader.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReader.java
@@ -22,12 +22,12 @@ import
org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import java.io.IOException;
import java.util.NoSuchElementException;
-public class AssignPathManagedMergeReader extends AssignPathPriorityMergeReader
- implements ManagedSeriesReader {
+public class AssignPathManagedMergeReader implements ManagedSeriesReader,
IPointReader {
private static final int BATCH_SIZE = 4096;
private volatile boolean managedByPool;
@@ -36,11 +36,20 @@ public class AssignPathManagedMergeReader extends
AssignPathPriorityMergeReader
private BatchData batchData;
private TSDataType dataType;
- public AssignPathManagedMergeReader(String fullPath, TSDataType dataType) {
- super(fullPath);
+ private final IAssignPathPriorityMergeReader underlyingReader;
+
+ public AssignPathManagedMergeReader(String fullPath, TSDataType dataType,
boolean isAscending) {
+ underlyingReader =
+ isAscending
+ ? new AssignPathAscPriorityMergeReader(fullPath)
+ : new AssignPathDescPriorityMergeReader(fullPath);
this.dataType = dataType;
}
+ public void addReader(AbstractMultPointReader reader, long priority) throws
IOException {
+ underlyingReader.addReader(reader, priority);
+ }
+
@Override
public boolean isManagedByQueryManager() {
return managedByPool;
@@ -71,10 +80,10 @@ public class AssignPathManagedMergeReader extends
AssignPathPriorityMergeReader
}
private void constructBatch() throws IOException {
- if (hasNextTimeValuePair()) {
+ if (underlyingReader.hasNextTimeValuePair()) {
batchData = new BatchData(dataType);
- while (hasNextTimeValuePair() && batchData.length() < BATCH_SIZE) {
- TimeValuePair next = nextTimeValuePair();
+ while (underlyingReader.hasNextTimeValuePair() && batchData.length() <
BATCH_SIZE) {
+ TimeValuePair next = underlyingReader.nextTimeValuePair();
batchData.putAnObject(next.getTimestamp(), next.getValue().getValue());
}
}
@@ -89,4 +98,24 @@ public class AssignPathManagedMergeReader extends
AssignPathPriorityMergeReader
batchData = null;
return ret;
}
+
+ @Override
+ public boolean hasNextTimeValuePair() throws IOException {
+ return underlyingReader.hasNextTimeValuePair();
+ }
+
+ @Override
+ public TimeValuePair nextTimeValuePair() throws IOException {
+ return underlyingReader.nextTimeValuePair();
+ }
+
+ @Override
+ public TimeValuePair currentTimeValuePair() throws IOException {
+ return underlyingReader.currentTimeValuePair();
+ }
+
+ @Override
+ public void close() throws IOException {
+ underlyingReader.close();
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
new file mode 100644
index 0000000..a344288
--- /dev/null
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.universal.Element;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+/** Common codes of different priorityMergeReader */
+public interface IAssignPathPriorityMergeReader extends IPointReader {
+ PriorityQueue<Element> getHeap();
+
+ String getFullPath();
+
+ default void addReader(AbstractMultPointReader reader, long priority) throws
IOException {
+ if (reader.hasNextTimeValuePair(getFullPath())) {
+ getHeap()
+ .add(
+ new MultElement(
+ getFullPath(),
+ reader,
+ reader.nextTimeValuePair(getFullPath()),
+ new PriorityMergeReader.MergeReaderPriority(priority, 0)));
+ } else {
+ reader.close();
+ }
+ }
+}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultElement.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultElement.java
new file mode 100644
index 0000000..991c249
--- /dev/null
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultElement.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.universal.Element;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+
+import java.io.IOException;
+
+/** a special Element implementation when querying data from multi readers */
+public class MultElement extends Element {
+ private final String fullPath;
+
+ public MultElement(
+ String fullPath,
+ AbstractMultPointReader reader,
+ TimeValuePair timeValuePair,
+ PriorityMergeReader.MergeReaderPriority priority) {
+ super(reader, timeValuePair, priority);
+ this.fullPath = fullPath;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return ((AbstractMultPointReader) reader).hasNextTimeValuePair(fullPath);
+ }
+
+ @Override
+ public void next() throws IOException {
+ timeValuePair = ((AbstractMultPointReader)
reader).nextTimeValuePair(fullPath);
+ }
+}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
index 5864a99..12b1aa7 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
@@ -85,7 +85,8 @@ public class AssignPathManagedMergeReaderTest {
batchData.add(TestUtils.genBatchData(TSDataType.INT32, 0, 100));
batchUsed = false;
metaGroupMember = new TestMetaGroupMember();
- assignPathManagedMergeReader = new
AssignPathManagedMergeReader("root.a.b", TSDataType.DOUBLE);
+ assignPathManagedMergeReader =
+ new AssignPathManagedMergeReader("root.a.b", TSDataType.DOUBLE, true);
}
@After