This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9c7d6c7bbddb9a3169017e65978fbc7194cd4773
Author: Minghui Liu <[email protected]>
AuthorDate: Mon Nov 14 14:00:29 2022 +0800

    add SampleWindowSliceIterator
---
 .../timerangeiterator/SampleWindowIterator.java    |   6 +-
 .../SampleWindowSliceIterator.java                 | 129 +++++++++++++++++++++
 .../TimeRangeIteratorFactory.java                  |   7 +-
 .../db/mpp/aggregation/TimeRangeIteratorTest.java  |  71 +++++++++++-
 4 files changed, 207 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
index 0ba0dd06cf..f95e3a287f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
@@ -59,7 +59,7 @@ public class SampleWindowIterator implements 
ITimeRangeIterator {
   public TimeRange nextTimeRange() {
     while (allTimeRangeIterator.hasNextTimeRange()) {
       TimeRange timeRange = allTimeRangeIterator.nextTimeRange();
-      if (timeRangeIndex + 1 == samplingIndexes.get(sampleIndex)) {
+      if (timeRangeIndex == samplingIndexes.get(sampleIndex)) {
         curTimeRange = timeRange;
         timeRangeIndex++;
         sampleIndex++;
@@ -72,7 +72,7 @@ public class SampleWindowIterator implements 
ITimeRangeIterator {
 
   @Override
   public boolean isAscending() {
-    throw new UnsupportedOperationException();
+    return true;
   }
 
   @Override
@@ -82,6 +82,6 @@ public class SampleWindowIterator implements 
ITimeRangeIterator {
 
   @Override
   public long getTotalIntervalNum() {
-    throw new UnsupportedOperationException();
+    return samplingIndexes.size();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java
new file mode 100644
index 0000000000..4c01f63fac
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.timerangeiterator;
+
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+
+import java.util.List;
+
+public class SampleWindowSliceIterator implements ITimeRangeIterator {
+
+  private static final int HEAP_MAX_SIZE = 100;
+  private final TimeSelector timeBoundaryHeap;
+
+  private final SampleWindowIterator sampleWindowIterator;
+
+  private long curStartTimeForIterator;
+  private long lastEndTime;
+  private TimeRange curTimeRange;
+  private boolean hasCachedTimeRange;
+
+  public SampleWindowSliceIterator(
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep,
+      List<Integer> samplingIndexes) {
+    this.timeBoundaryHeap = new TimeSelector(HEAP_MAX_SIZE, true);
+    this.sampleWindowIterator =
+        new SampleWindowIterator(startTime, endTime, interval, slidingStep, 
samplingIndexes);
+    initHeap();
+  }
+
+  private void initHeap() {
+    TimeRange firstTimeRange = sampleWindowIterator.nextTimeRange();
+    timeBoundaryHeap.add(firstTimeRange.getMin());
+    timeBoundaryHeap.add(firstTimeRange.getMax() + 1);
+    curStartTimeForIterator = firstTimeRange.getMin();
+    tryToExpandHeap();
+  }
+
+  private void tryToExpandHeap() {
+    TimeRange timeRangeToExpand;
+    while (sampleWindowIterator.hasNextTimeRange() && timeBoundaryHeap.size() 
< HEAP_MAX_SIZE) {
+      timeRangeToExpand = sampleWindowIterator.nextTimeRange();
+      timeBoundaryHeap.add(timeRangeToExpand.getMin());
+      timeBoundaryHeap.add(timeRangeToExpand.getMax() + 1);
+      curStartTimeForIterator = timeRangeToExpand.getMin();
+    }
+  }
+
+  @Override
+  public TimeRange getFirstTimeRange() {
+    long retStartTime = timeBoundaryHeap.pollFirst();
+    lastEndTime = timeBoundaryHeap.first();
+    return new TimeRange(retStartTime, lastEndTime);
+  }
+
+  @Override
+  public boolean hasNextTimeRange() {
+    if (hasCachedTimeRange) {
+      return true;
+    }
+    if (curTimeRange == null) {
+      curTimeRange = getFirstTimeRange();
+      hasCachedTimeRange = true;
+      return true;
+    }
+
+    if (lastEndTime >= curStartTimeForIterator) {
+      tryToExpandHeap();
+    }
+    if (timeBoundaryHeap.isEmpty()) {
+      return false;
+    }
+    long retStartTime = timeBoundaryHeap.pollFirst();
+    if (retStartTime >= curStartTimeForIterator) {
+      tryToExpandHeap();
+    }
+    if (timeBoundaryHeap.isEmpty()) {
+      return false;
+    }
+    lastEndTime = timeBoundaryHeap.first();
+    curTimeRange = new TimeRange(retStartTime, lastEndTime);
+    hasCachedTimeRange = true;
+    return true;
+  }
+
+  @Override
+  public TimeRange nextTimeRange() {
+    if (hasCachedTimeRange || hasNextTimeRange()) {
+      hasCachedTimeRange = false;
+      return getFinalTimeRange(curTimeRange, true);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean isAscending() {
+    return true;
+  }
+
+  @Override
+  public long currentOutputTime() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getTotalIntervalNum() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
index 552cca4bb2..bfbc69061b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
@@ -82,6 +82,11 @@ public class TimeRangeIteratorFactory {
       long slidingStep,
       List<Integer> samplingIndexes,
       boolean outputPartialTimeWindow) {
-    return new SampleWindowIterator(startTime, endTime, interval, slidingStep, 
samplingIndexes);
+    if (outputPartialTimeWindow && interval > slidingStep) {
+      return new SampleWindowSliceIterator(
+          startTime, endTime, interval, slidingStep, samplingIndexes);
+    } else {
+      return new SampleWindowIterator(startTime, endTime, interval, 
slidingStep, samplingIndexes);
+    }
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
index 1d3f3ccf79..b9bd3b3fe8 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+
 public class TimeRangeIteratorTest {
 
   private static final long MS_TO_MONTH = 30 * 86400_000L;
@@ -294,9 +296,69 @@ public class TimeRangeIteratorTest {
         res4);
   }
 
-  private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
-    Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum());
+  @Test
+  public void testSampleTimeRange() {
+    String[] res1 = {"[ 0 : 3 ]", "[ 3 : 6 ]", "[ 6 : 9 ]", "[ 9 : 12 ]"};
+    String[] res2 = {"[ 3 : 6 ]", "[ 12 : 15 ]", "[ 18 : 21 ]", "[ 27 : 30 ]"};
+    String[] res3 = {"[ 3 : 6 ]", "[ 6 : 9 ]", "[ 18 : 21 ]", "[ 21 : 24 ]"};
+
+    long startTime = 0, endTime = 32, interval = 4, slidingStep = 3;
+
+    checkRes(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(0, 1, 2, 
3), false),
+        res1);
+    checkRes(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(1, 4, 6, 
9), false),
+        res2);
+    checkRes(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(1, 2, 6, 
7), false),
+        res3);
+  }
+
+  @Test
+  public void testSampleTimeRangeSlice() {
+    String[] res1 = {
+      "[ 0 : 2 ]", "[ 3 : 3 ]", "[ 4 : 5 ]", "[ 6 : 6 ]", "[ 7 : 8 ]", "[ 9 : 
9 ]", "[ 10 : 12 ]"
+    };
+    String[] res2 = {
+      "[ 3 : 6 ]",
+      "[ 7 : 11 ]", // pad
+      "[ 12 : 15 ]",
+      "[ 16 : 17 ]", // pad
+      "[ 18 : 21 ]",
+      "[ 22 : 26 ]", // pad
+      "[ 27 : 30 ]"
+    };
+    String[] res3 = {
+      "[ 3 : 5 ]",
+      "[ 6 : 6 ]",
+      "[ 7 : 9 ]",
+      "[ 10 : 17 ]", // pad
+      "[ 18 : 20 ]",
+      "[ 21 : 21 ]",
+      "[ 22 : 24 ]"
+    };
+
+    long startTime = 0, endTime = 32, interval = 4, slidingStep = 3;
 
+    checkResWithoutNum(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(0, 1, 2, 
3), true),
+        res1);
+    checkResWithoutNum(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(1, 4, 6, 
9), true),
+        res2);
+    checkResWithoutNum(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(1, 2, 6, 
7), true),
+        res3);
+  }
+
+  private void checkResWithoutNum(ITimeRangeIterator timeRangeIterator, 
String[] res) {
     boolean isAscending = timeRangeIterator.isAscending();
     int cnt = isAscending ? 0 : res.length - 1;
 
@@ -307,4 +369,9 @@ public class TimeRangeIteratorTest {
       cnt += isAscending ? 1 : -1;
     }
   }
+
+  private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
+    Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum());
+    checkResWithoutNum(timeRangeIterator, res);
+  }
 }

Reply via email to