This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 30e5a18  [To rel/0.12][IOTDB-2032] Fix incorrect result of descending 
query with multiple time partitions (#4456)
30e5a18 is described below

commit 30e5a1832427385ecec7f0496b41c1c046698200
Author: BaiJian <[email protected]>
AuthorDate: Thu Nov 25 09:08:31 2021 +0800

    [To rel/0.12][IOTDB-2032] Fix incorrect result of descending query with 
multiple time partitions (#4456)
---
 .../cluster/query/ClusterDataQueryExecutor.java    |  7 ++--
 ....java => AssignPathAscPriorityMergeReader.java} | 37 +++++-----------
 .../mult/AssignPathDescPriorityMergeReader.java    | 49 ++++++++++++++++++++++
 .../reader/mult/AssignPathManagedMergeReader.java  | 43 +++++++++++++++----
 .../mult/IAssignPathPriorityMergeReader.java       | 47 +++++++++++++++++++++
 .../cluster/query/reader/mult/MultElement.java     | 49 ++++++++++++++++++++++
 .../mult/AssignPathManagedMergeReaderTest.java     |  3 +-
 7 files changed, 196 insertions(+), 39 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index f28f088..92f100a 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -112,9 +112,7 @@ public class ClusterDataQueryExecutor extends 
RawDataQueryExecutor {
       throw new StorageEngineException(e);
     }
     List<ManagedSeriesReader> readersOfSelectedSeries = Lists.newArrayList();
-    List<AbstractMultPointReader> multPointReaders = Lists.newArrayList();
-
-    multPointReaders =
+    List<AbstractMultPointReader> multPointReaders =
         readerFactory.getMultSeriesReader(
             queryPlan.getDeduplicatedPaths(),
             queryPlan.getDeviceToMeasurements(),
@@ -130,7 +128,8 @@ public class ClusterDataQueryExecutor extends 
RawDataQueryExecutor {
       PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i);
       TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
       AssignPathManagedMergeReader assignPathManagedMergeReader =
-          new AssignPathManagedMergeReader(partialPath.getFullPath(), 
dataType);
+          new AssignPathManagedMergeReader(
+              partialPath.getFullPath(), dataType, queryPlan.isAscending());
       for (AbstractMultPointReader multPointReader : multPointReaders) {
         if (multPointReader.getAllPaths().contains(partialPath.getFullPath())) 
{
           assignPathManagedMergeReader.addReader(multPointReader, 0);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathPriorityMergeReader.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathAscPriorityMergeReader.java
similarity index 53%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathPriorityMergeReader.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathAscPriorityMergeReader.java
index 30c6f70..7891ac7 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathPriorityMergeReader.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathAscPriorityMergeReader.java
@@ -20,47 +20,30 @@ package org.apache.iotdb.cluster.query.reader.mult;
 
 import org.apache.iotdb.db.query.reader.universal.Element;
 import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
 
-import java.io.IOException;
+import java.util.PriorityQueue;
 
 /**
  * This class extends {@link extends PriorityMergeReader} for data sources 
with different
  * priorities.
  */
-public class AssignPathPriorityMergeReader extends PriorityMergeReader {
+public class AssignPathAscPriorityMergeReader extends PriorityMergeReader
+    implements IAssignPathPriorityMergeReader {
 
   private String fullPath;
 
-  public AssignPathPriorityMergeReader(String fullPath) {
+  public AssignPathAscPriorityMergeReader(String fullPath) {
     super();
     this.fullPath = fullPath;
   }
 
-  public void addReader(AbstractMultPointReader reader, long priority) throws 
IOException {
-    if (reader.hasNextTimeValuePair(fullPath)) {
-      heap.add(
-          new MultElement(
-              reader, reader.nextTimeValuePair(fullPath), new 
MergeReaderPriority(priority, 0)));
-    } else {
-      reader.close();
-    }
+  @Override
+  public PriorityQueue<Element> getHeap() {
+    return heap;
   }
 
-  public class MultElement extends Element {
-    public MultElement(
-        AbstractMultPointReader reader, TimeValuePair timeValuePair, 
MergeReaderPriority priority) {
-      super(reader, timeValuePair, priority);
-    }
-
-    @Override
-    public boolean hasNext() throws IOException {
-      return ((AbstractMultPointReader) reader).hasNextTimeValuePair(fullPath);
-    }
-
-    @Override
-    public void next() throws IOException {
-      timeValuePair = ((AbstractMultPointReader) 
reader).nextTimeValuePair(fullPath);
-    }
+  @Override
+  public String getFullPath() {
+    return fullPath;
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathDescPriorityMergeReader.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathDescPriorityMergeReader.java
new file mode 100644
index 0000000..a38f491
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathDescPriorityMergeReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.Element;
+
+import java.util.PriorityQueue;
+
+/**
+ * This class extends {@link extends DescPriorityMergeReader} for data sources 
with different
+ * priorities.
+ */
+public class AssignPathDescPriorityMergeReader extends DescPriorityMergeReader
+    implements IAssignPathPriorityMergeReader {
+
+  private String fullPath;
+
+  public AssignPathDescPriorityMergeReader(String fullPath) {
+    super();
+    this.fullPath = fullPath;
+  }
+
+  @Override
+  public PriorityQueue<Element> getHeap() {
+    return heap;
+  }
+
+  @Override
+  public String getFullPath() {
+    return fullPath;
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReader.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReader.java
index 34ecc13..6208966 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReader.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReader.java
@@ -22,12 +22,12 @@ import 
org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
 
-public class AssignPathManagedMergeReader extends AssignPathPriorityMergeReader
-    implements ManagedSeriesReader {
+public class AssignPathManagedMergeReader implements ManagedSeriesReader, 
IPointReader {
 
   private static final int BATCH_SIZE = 4096;
   private volatile boolean managedByPool;
@@ -36,11 +36,20 @@ public class AssignPathManagedMergeReader extends 
AssignPathPriorityMergeReader
   private BatchData batchData;
   private TSDataType dataType;
 
-  public AssignPathManagedMergeReader(String fullPath, TSDataType dataType) {
-    super(fullPath);
+  private final IAssignPathPriorityMergeReader underlyingReader;
+
+  public AssignPathManagedMergeReader(String fullPath, TSDataType dataType, 
boolean isAscending) {
+    underlyingReader =
+        isAscending
+            ? new AssignPathAscPriorityMergeReader(fullPath)
+            : new AssignPathDescPriorityMergeReader(fullPath);
     this.dataType = dataType;
   }
 
+  public void addReader(AbstractMultPointReader reader, long priority) throws 
IOException {
+    underlyingReader.addReader(reader, priority);
+  }
+
   @Override
   public boolean isManagedByQueryManager() {
     return managedByPool;
@@ -71,10 +80,10 @@ public class AssignPathManagedMergeReader extends 
AssignPathPriorityMergeReader
   }
 
   private void constructBatch() throws IOException {
-    if (hasNextTimeValuePair()) {
+    if (underlyingReader.hasNextTimeValuePair()) {
       batchData = new BatchData(dataType);
-      while (hasNextTimeValuePair() && batchData.length() < BATCH_SIZE) {
-        TimeValuePair next = nextTimeValuePair();
+      while (underlyingReader.hasNextTimeValuePair() && batchData.length() < 
BATCH_SIZE) {
+        TimeValuePair next = underlyingReader.nextTimeValuePair();
         batchData.putAnObject(next.getTimestamp(), next.getValue().getValue());
       }
     }
@@ -89,4 +98,24 @@ public class AssignPathManagedMergeReader extends 
AssignPathPriorityMergeReader
     batchData = null;
     return ret;
   }
+
+  @Override
+  public boolean hasNextTimeValuePair() throws IOException {
+    return underlyingReader.hasNextTimeValuePair();
+  }
+
+  @Override
+  public TimeValuePair nextTimeValuePair() throws IOException {
+    return underlyingReader.nextTimeValuePair();
+  }
+
+  @Override
+  public TimeValuePair currentTimeValuePair() throws IOException {
+    return underlyingReader.currentTimeValuePair();
+  }
+
+  @Override
+  public void close() throws IOException {
+    underlyingReader.close();
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
new file mode 100644
index 0000000..a344288
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/IAssignPathPriorityMergeReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.universal.Element;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+/** Common codes of different priorityMergeReader */
+public interface IAssignPathPriorityMergeReader extends IPointReader {
+  PriorityQueue<Element> getHeap();
+
+  String getFullPath();
+
+  default void addReader(AbstractMultPointReader reader, long priority) throws 
IOException {
+    if (reader.hasNextTimeValuePair(getFullPath())) {
+      getHeap()
+          .add(
+              new MultElement(
+                  getFullPath(),
+                  reader,
+                  reader.nextTimeValuePair(getFullPath()),
+                  new PriorityMergeReader.MergeReaderPriority(priority, 0)));
+    } else {
+      reader.close();
+    }
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultElement.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultElement.java
new file mode 100644
index 0000000..991c249
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultElement.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.reader.mult;
+
+import org.apache.iotdb.db.query.reader.universal.Element;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+
+import java.io.IOException;
+
+/** a special Element implementation when querying data from multi readers */
+public class MultElement extends Element {
+  private final String fullPath;
+
+  public MultElement(
+      String fullPath,
+      AbstractMultPointReader reader,
+      TimeValuePair timeValuePair,
+      PriorityMergeReader.MergeReaderPriority priority) {
+    super(reader, timeValuePair, priority);
+    this.fullPath = fullPath;
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return ((AbstractMultPointReader) reader).hasNextTimeValuePair(fullPath);
+  }
+
+  @Override
+  public void next() throws IOException {
+    timeValuePair = ((AbstractMultPointReader) 
reader).nextTimeValuePair(fullPath);
+  }
+}
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
index 5864a99..12b1aa7 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
@@ -85,7 +85,8 @@ public class AssignPathManagedMergeReaderTest {
     batchData.add(TestUtils.genBatchData(TSDataType.INT32, 0, 100));
     batchUsed = false;
     metaGroupMember = new TestMetaGroupMember();
-    assignPathManagedMergeReader = new 
AssignPathManagedMergeReader("root.a.b", TSDataType.DOUBLE);
+    assignPathManagedMergeReader =
+        new AssignPathManagedMergeReader("root.a.b", TSDataType.DOUBLE, true);
   }
 
   @After

Reply via email to