This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit db3eab0588888c61626efe9faac5f743b66813ec
Author: liuminghui233 <[email protected]>
AuthorDate: Mon Nov 14 20:31:48 2022 +0800

    support shuffle
---
 client-py/SessionExample.py                        |  2 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    | 12 ++++---
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  8 +++--
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 38 +++++++++++++++++-----
 4 files changed, 43 insertions(+), 17 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index 4f04855def..e50521ad18 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -45,7 +45,7 @@ fetch_args = {
     "end_time": 32,
     "interval": 4,
     "sliding_step": 1,
-    "indexes": [0, 3, 5, 9]
+    "indexes": [9, 0, 5, 3]
 }
 
 print(session.fetch_window_batch(ts_path_list, None, fetch_args))
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 3d3c02b2e8..14496b69cf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -74,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 /**
  * This visitor is used to generate a logical plan for the statement and 
returns the {@link
@@ -294,16 +295,17 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   public PlanNode visitFetchWindowBatch(
       FetchWindowBatchStatement fetchWindowBatchStatement, MPPQueryContext 
context) {
     LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+    List<Integer> sortedSamplingIndexes =
+        fetchWindowBatchStatement.getSamplingIndexes().stream()
+            .sorted()
+            .collect(Collectors.toList());
     planBuilder
         .planRawDataSource(analysis.getSourceExpressions(), Ordering.ASC, null)
         .planTransform(
             analysis.getSourceTransformExpressions(), true, 
ZoneId.systemDefault(), Ordering.ASC)
-        .planWindowSplit(
-            fetchWindowBatchStatement.getGroupByTimeParameter(),
-            fetchWindowBatchStatement.getSamplingIndexes())
+        .planWindowSplit(fetchWindowBatchStatement.getGroupByTimeParameter(), 
sortedSamplingIndexes)
         .planWindowConcat(
-            fetchWindowBatchStatement.getGroupByTimeParameter(),
-            fetchWindowBatchStatement.getSamplingIndexes());
+            fetchWindowBatchStatement.getGroupByTimeParameter(), 
sortedSamplingIndexes);
 
     return planBuilder.getRoot();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index ab027fb138..814fd22deb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowBatchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -459,7 +460,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         TSFetchWindowBatchResp resp =
             createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
         resp.setWindowBatchDataSetList(
-            
QueryDataSetUtils.convertTsBlocksToWindowBatchDataSetList(queryExecution));
+            QueryDataSetUtils.convertTsBlocksToWindowBatchDataSetList(
+                queryExecution, ((FetchWindowBatchStatement) 
s).getSamplingIndexes()));
         return resp;
       }
     } catch (Exception e) {
@@ -514,7 +516,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
         TSFetchWindowBatchResp resp =
             createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
-        
resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution));
+        resp.setWindowBatch(
+            QueryDataSetUtils.convertTsBlocksToWindowBatch(
+                queryExecution, ((FetchWindowBatchStatement) 
s).getSamplingIndexes()));
         return resp;
       }
     } catch (Exception e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 8880ca2810..b0d26a7b0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /** TimeValuePairUtils to convert between thrift format and TsFile format. */
 public class QueryDataSetUtils {
@@ -400,9 +401,9 @@ public class QueryDataSetUtils {
     return res;
   }
 
-  public static List<List<ByteBuffer>> 
convertTsBlocksToWindowBatch(IQueryExecution queryExecution)
-      throws IoTDBException {
-    List<List<ByteBuffer>> windowSet = new ArrayList<>();
+  public static List<List<ByteBuffer>> convertTsBlocksToWindowBatch(
+      IQueryExecution queryExecution, List<Integer> samplingIndexes) throws 
IoTDBException {
+    List<List<ByteBuffer>> sortedWindowBatch = new ArrayList<>();
 
     while (true) {
       Optional<ByteBuffer> optionalByteBuffer = 
queryExecution.getByteBufferBatchResult();
@@ -423,14 +424,24 @@ public class QueryDataSetUtils {
         res.add(byteBuffer);
       }
 
-      windowSet.add(res);
+      sortedWindowBatch.add(res);
     }
-    return windowSet;
+
+    List<List<ByteBuffer>> windowBatch = new 
ArrayList<>(sortedWindowBatch.size());
+    List<Integer> sortedSamplingIndexes =
+        samplingIndexes.stream().sorted().collect(Collectors.toList());
+
+    for (Integer samplingIndex : samplingIndexes) {
+      int mapIndex = sortedSamplingIndexes.indexOf(samplingIndex);
+      windowBatch.add(sortedWindowBatch.get(mapIndex));
+    }
+    return windowBatch;
   }
 
   public static List<TSQueryDataSet> convertTsBlocksToWindowBatchDataSetList(
-      IQueryExecution queryExecution) throws IoTDBException, IOException {
-    List<TSQueryDataSet> windowSet = new ArrayList<>();
+      IQueryExecution queryExecution, List<Integer> samplingIndexes)
+      throws IoTDBException, IOException {
+    List<TSQueryDataSet> sortedWindowBatch = new ArrayList<>();
 
     int columnNum = queryExecution.getOutputValueColumnCount();
     // one time column and each value column has an actual value buffer and a 
bitmap value to
@@ -626,9 +637,18 @@ public class QueryDataSetUtils {
       tsQueryDataSet.setBitmapList(bitmapList);
       tsQueryDataSet.setValueList(valueList);
 
-      windowSet.add(tsQueryDataSet);
+      sortedWindowBatch.add(tsQueryDataSet);
+    }
+
+    List<TSQueryDataSet> windowBatch = new 
ArrayList<>(sortedWindowBatch.size());
+    List<Integer> sortedSamplingIndexes =
+        samplingIndexes.stream().sorted().collect(Collectors.toList());
+
+    for (Integer samplingIndex : samplingIndexes) {
+      int mapIndex = sortedSamplingIndexes.indexOf(samplingIndex);
+      windowBatch.add(sortedWindowBatch.get(mapIndex));
     }
-    return windowSet;
+    return windowBatch;
   }
 
   public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {

Reply via email to