This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9c7d6c7bbddb9a3169017e65978fbc7194cd4773 Author: Minghui Liu <[email protected]> AuthorDate: Mon Nov 14 14:00:29 2022 +0800 add SampleWindowSliceIterator --- .../timerangeiterator/SampleWindowIterator.java | 6 +- .../SampleWindowSliceIterator.java | 129 +++++++++++++++++++++ .../TimeRangeIteratorFactory.java | 7 +- .../db/mpp/aggregation/TimeRangeIteratorTest.java | 71 +++++++++++- 4 files changed, 207 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java index 0ba0dd06cf..f95e3a287f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java @@ -59,7 +59,7 @@ public class SampleWindowIterator implements ITimeRangeIterator { public TimeRange nextTimeRange() { while (allTimeRangeIterator.hasNextTimeRange()) { TimeRange timeRange = allTimeRangeIterator.nextTimeRange(); - if (timeRangeIndex + 1 == samplingIndexes.get(sampleIndex)) { + if (timeRangeIndex == samplingIndexes.get(sampleIndex)) { curTimeRange = timeRange; timeRangeIndex++; sampleIndex++; @@ -72,7 +72,7 @@ public class SampleWindowIterator implements ITimeRangeIterator { @Override public boolean isAscending() { - throw new UnsupportedOperationException(); + return true; } @Override @@ -82,6 +82,6 @@ public class SampleWindowIterator implements ITimeRangeIterator { @Override public long getTotalIntervalNum() { - throw new UnsupportedOperationException(); + return samplingIndexes.size(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java new file mode 100644 index 0000000000..4c01f63fac --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.aggregation.timerangeiterator; + +import org.apache.iotdb.db.utils.datastructure.TimeSelector; +import org.apache.iotdb.tsfile.read.common.TimeRange; + +import java.util.List; + +public class SampleWindowSliceIterator implements ITimeRangeIterator { + + private static final int HEAP_MAX_SIZE = 100; + private final TimeSelector timeBoundaryHeap; + + private final SampleWindowIterator sampleWindowIterator; + + private long curStartTimeForIterator; + private long lastEndTime; + private TimeRange curTimeRange; + private boolean hasCachedTimeRange; + + public SampleWindowSliceIterator( + long startTime, + long endTime, + long interval, + long slidingStep, + List<Integer> samplingIndexes) { + this.timeBoundaryHeap = new TimeSelector(HEAP_MAX_SIZE, true); + this.sampleWindowIterator = + new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes); + initHeap(); + } + + private void initHeap() { + TimeRange firstTimeRange = sampleWindowIterator.nextTimeRange(); + timeBoundaryHeap.add(firstTimeRange.getMin()); + timeBoundaryHeap.add(firstTimeRange.getMax() + 1); + curStartTimeForIterator = firstTimeRange.getMin(); + tryToExpandHeap(); + } + + private void tryToExpandHeap() { + TimeRange timeRangeToExpand; + while (sampleWindowIterator.hasNextTimeRange() && timeBoundaryHeap.size() < HEAP_MAX_SIZE) { + timeRangeToExpand = sampleWindowIterator.nextTimeRange(); + timeBoundaryHeap.add(timeRangeToExpand.getMin()); + timeBoundaryHeap.add(timeRangeToExpand.getMax() + 1); + curStartTimeForIterator = timeRangeToExpand.getMin(); + } + } + + @Override + public TimeRange getFirstTimeRange() { + long retStartTime = timeBoundaryHeap.pollFirst(); + lastEndTime = timeBoundaryHeap.first(); + return new TimeRange(retStartTime, lastEndTime); + } + + @Override + public boolean hasNextTimeRange() { + if (hasCachedTimeRange) { + return true; + } + if (curTimeRange == null) { + curTimeRange = getFirstTimeRange(); + hasCachedTimeRange = true; + return true; + } + + if (lastEndTime >= curStartTimeForIterator) { + tryToExpandHeap(); + } + if (timeBoundaryHeap.isEmpty()) { + return false; + } + long retStartTime = timeBoundaryHeap.pollFirst(); + if (retStartTime >= curStartTimeForIterator) { + tryToExpandHeap(); + } + if (timeBoundaryHeap.isEmpty()) { + return false; + } + lastEndTime = timeBoundaryHeap.first(); + curTimeRange = new TimeRange(retStartTime, lastEndTime); + hasCachedTimeRange = true; + return true; + } + + @Override + public TimeRange nextTimeRange() { + if (hasCachedTimeRange || hasNextTimeRange()) { + hasCachedTimeRange = false; + return getFinalTimeRange(curTimeRange, true); + } + return null; + } + + @Override + public boolean isAscending() { + return true; + } + + @Override + public long currentOutputTime() { + throw new UnsupportedOperationException(); + } + + @Override + public long getTotalIntervalNum() { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java index 552cca4bb2..bfbc69061b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java @@ -82,6 +82,11 @@ public class TimeRangeIteratorFactory { long slidingStep, List<Integer> samplingIndexes, boolean outputPartialTimeWindow) { - return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes); + if (outputPartialTimeWindow && interval > slidingStep) { + return new SampleWindowSliceIterator( + startTime, endTime, interval, slidingStep, samplingIndexes); + } else { + return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes); + } } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java index 1d3f3ccf79..b9bd3b3fe8 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java @@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.read.common.TimeRange; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; + public class TimeRangeIteratorTest { private static final long MS_TO_MONTH = 30 * 86400_000L; @@ -294,9 +296,69 @@ public class TimeRangeIteratorTest { res4); } - private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) { - Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum()); + @Test + public void testSampleTimeRange() { + String[] res1 = {"[ 0 : 3 ]", "[ 3 : 6 ]", "[ 6 : 9 ]", "[ 9 : 12 ]"}; + String[] res2 = {"[ 3 : 6 ]", "[ 12 : 15 ]", "[ 18 : 21 ]", "[ 27 : 30 ]"}; + String[] res3 = {"[ 3 : 6 ]", "[ 6 : 9 ]", "[ 18 : 21 ]", "[ 21 : 24 ]"}; + + long startTime = 0, endTime = 32, interval = 4, slidingStep = 3; + + checkRes( + TimeRangeIteratorFactory.getSampleTimeRangeIterator( + startTime, endTime, interval, slidingStep, Arrays.asList(0, 1, 2, 3), false), + res1); + checkRes( + TimeRangeIteratorFactory.getSampleTimeRangeIterator( + startTime, endTime, interval, slidingStep, Arrays.asList(1, 4, 6, 9), false), + res2); + checkRes( + TimeRangeIteratorFactory.getSampleTimeRangeIterator( + startTime, endTime, interval, slidingStep, Arrays.asList(1, 2, 6, 7), false), + res3); + } + + @Test + public void testSampleTimeRangeSlice() { + String[] res1 = { + "[ 0 : 2 ]", "[ 3 : 3 ]", "[ 4 : 5 ]", "[ 6 : 6 ]", "[ 7 : 8 ]", "[ 9 : 9 ]", "[ 10 : 12 ]" + }; + String[] res2 = { + "[ 3 : 6 ]", + "[ 7 : 11 ]", // pad + "[ 12 : 15 ]", + "[ 16 : 17 ]", // pad + "[ 18 : 21 ]", + "[ 22 : 26 ]", // pad + "[ 27 : 30 ]" + }; + String[] res3 = { + "[ 3 : 5 ]", + "[ 6 : 6 ]", + "[ 7 : 9 ]", + "[ 10 : 17 ]", // pad + "[ 18 : 20 ]", + "[ 21 : 21 ]", + "[ 22 : 24 ]" + }; + + long startTime = 0, endTime = 32, interval = 4, slidingStep = 3; + checkResWithoutNum( + TimeRangeIteratorFactory.getSampleTimeRangeIterator( + startTime, endTime, interval, slidingStep, Arrays.asList(0, 1, 2, 3), true), + res1); + checkResWithoutNum( + TimeRangeIteratorFactory.getSampleTimeRangeIterator( + startTime, endTime, interval, slidingStep, Arrays.asList(1, 4, 6, 9), true), + res2); + checkResWithoutNum( + TimeRangeIteratorFactory.getSampleTimeRangeIterator( + startTime, endTime, interval, slidingStep, Arrays.asList(1, 2, 6, 7), true), + res3); + } + + private void checkResWithoutNum(ITimeRangeIterator timeRangeIterator, String[] res) { boolean isAscending = timeRangeIterator.isAscending(); int cnt = isAscending ? 0 : res.length - 1; @@ -307,4 +369,9 @@ public class TimeRangeIteratorTest { cnt += isAscending ? 1 : -1; } } + + private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) { + Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum()); + checkResWithoutNum(timeRangeIterator, res); + } }
