This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8cec7b97bc72b56900f9fe527b96c9211e49a02b
Author: JackieTien97 <[email protected]>
AuthorDate: Wed May 29 12:08:07 2024 +0800

    Add StreamSortOperator
---
 .../process/relational/StreamSortOperator.java     | 381 +++++++++++++++++++++
 1 file changed, 381 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
new file mode 100644
index 00000000000..d85ca438479
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/relational/StreamSortOperator.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.relational;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
+import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+import org.apache.iotdb.db.utils.sort.DiskSpiller;
+import org.apache.iotdb.db.utils.sort.MemoryReader;
+import org.apache.iotdb.db.utils.sort.SortBufferManager;
+import org.apache.iotdb.db.utils.sort.SortReader;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.db.utils.sort.SortBufferManager.SORT_BUFFER_SIZE;
+
+public class StreamSortOperator implements ProcessOperator {
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(SortOperator.class);
+  private final OperatorContext operatorContext;
+  private final Operator inputOperator;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  // Use to output the result in memory.
+  // Because the memory may be larger than tsBlockBuilder's max size
+  // so the data may be return in multiple times.
+  private int curRow = -1;
+
+  private List<SortKey> cachedData;
+  private final Comparator<SortKey> comparator;
+  private long cachedBytes;
+  private final DiskSpiller diskSpiller;
+  private final SortBufferManager sortBufferManager;
+
+  // For mergeSort
+
+  private MergeSortHeap mergeSortHeap;
+  private List<SortReader> sortReaders;
+  private boolean[] noMoreData;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SortOperator.class);
+
+  private final int maxReturnSize =
+      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+
+  private long prepareUntilReadyCost = 0;
+  private long dataSize = 0;
+  private long sortCost = 0;
+
+  public StreamSortOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      List<TSDataType> dataTypes,
+      String folderPath,
+      Comparator<SortKey> comparator) {
+    this.operatorContext = operatorContext;
+    this.inputOperator = inputOperator;
+    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.cachedData = new ArrayList<>();
+    this.comparator = comparator;
+    this.cachedBytes = 0;
+    this.diskSpiller =
+        new DiskSpiller(folderPath, folderPath + 
operatorContext.getOperatorId(), dataTypes);
+    this.sortBufferManager = new SortBufferManager();
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return inputOperator.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (!inputOperator.hasNextWithTimer()) {
+      if (diskSpiller.hasSpilledData()) {
+        try {
+          prepareSortReaders();
+          return mergeSort();
+        } catch (Exception e) {
+          clear();
+          throw e;
+        }
+      } else {
+        if (curRow == -1) {
+          long startTime = System.nanoTime();
+          cachedData.sort(comparator);
+          sortCost += System.nanoTime() - startTime;
+          curRow = 0;
+        }
+        return buildTsBlockInMemory();
+      }
+    }
+    long startTime = System.nanoTime();
+    try {
+      TsBlock tsBlock = inputOperator.nextWithTimer();
+      if (tsBlock == null) {
+        return null;
+      }
+      dataSize += tsBlock.getRetainedSizeInBytes();
+      cacheTsBlock(tsBlock);
+    } catch (IoTDBException e) {
+      clear();
+      throw e;
+    } finally {
+      prepareUntilReadyCost += System.nanoTime() - startTime;
+    }
+
+    return null;
+  }
+
+  private void recordMetrics() {
+    operatorContext.recordSpecifiedInfo("prepareCost/ns", 
Long.toString(prepareUntilReadyCost));
+    operatorContext.recordSpecifiedInfo("sortedDataSize", 
Long.toString(dataSize));
+    operatorContext.recordSpecifiedInfo("sortCost/ns", 
Long.toString(sortCost));
+    int spilledFileSize = diskSpiller.getFileSize();
+    if (spilledFileSize > 0) {
+      operatorContext.recordSpecifiedInfo(
+          "merge sort branch", Integer.toString(diskSpiller.getFileSize() + 
1));
+    }
+  }
+
+  private void prepareSortReaders() throws IoTDBException {
+    if (sortReaders != null) {
+      return;
+    }
+    sortReaders = new ArrayList<>();
+    if (cachedBytes != 0) {
+      cachedData.sort(comparator);
+      if (sortBufferManager.allocate(cachedBytes)) {
+        sortReaders.add(
+            new MemoryReader(
+                
cachedData.stream().map(MergeSortKey::new).collect(Collectors.toList())));
+      } else {
+        sortBufferManager.allocateOneSortBranch();
+        diskSpiller.spillSortedData(cachedData);
+        cachedData = null;
+      }
+    }
+    sortReaders.addAll(diskSpiller.getReaders(sortBufferManager));
+    // if reader is finished
+    noMoreData = new boolean[sortReaders.size()];
+  }
+
+  private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+    long bytesSize = tsBlock.getRetainedSizeInBytes();
+    if (bytesSize + cachedBytes < SORT_BUFFER_SIZE) {
+      cachedBytes += bytesSize;
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        cachedData.add(new MergeSortKey(tsBlock, i));
+      }
+    } else {
+      cachedData.sort(comparator);
+      spill();
+      cachedData.clear();
+      cachedBytes = bytesSize;
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        cachedData.add(new MergeSortKey(tsBlock, i));
+      }
+    }
+  }
+
+  private void spill() throws IoTDBException {
+    // if current memory cannot put this tsBlock, an exception will be thrown 
in spillSortedData()
+    // because there should be at least 
tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for
+    // one branch.
+    sortBufferManager.allocateOneSortBranch();
+    diskSpiller.spillSortedData(cachedData);
+  }
+
+  private TsBlock buildTsBlockInMemory() {
+    tsBlockBuilder.reset();
+    TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
+    for (int i = curRow; i < cachedData.size(); i++) {
+      SortKey sortKey = cachedData.get(i);
+      TsBlock tsBlock = sortKey.tsBlock;
+      timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(sortKey.rowIndex));
+      for (int j = 0; j < valueColumnBuilders.length; j++) {
+        if (tsBlock.getColumn(j).isNull(sortKey.rowIndex)) {
+          valueColumnBuilders[j].appendNull();
+          continue;
+        }
+        valueColumnBuilders[j].write(tsBlock.getColumn(j), sortKey.rowIndex);
+      }
+      tsBlockBuilder.declarePosition();
+      curRow++;
+      if (tsBlockBuilder.isFull()) {
+        break;
+      }
+    }
+
+    return tsBlockBuilder.build();
+  }
+
+  private TsBlock mergeSort() throws IoTDBException {
+
+    // 1. fill the input from each reader
+    initMergeSortHeap();
+
+    long startTime = System.nanoTime();
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
+    // 2. do merge sort until one TsBlock is consumed up
+    tsBlockBuilder.reset();
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
+    while (!mergeSortHeap.isEmpty()) {
+
+      MergeSortKey mergeSortKey = mergeSortHeap.poll();
+      TsBlock targetBlock = mergeSortKey.tsBlock;
+      timeBuilder.writeLong(targetBlock.getTimeByIndex(mergeSortKey.rowIndex));
+      for (int i = 0; i < valueColumnBuilders.length; i++) {
+        if (targetBlock.getColumn(i).isNull(mergeSortKey.rowIndex)) {
+          valueColumnBuilders[i].appendNull();
+          continue;
+        }
+        valueColumnBuilders[i].write(targetBlock.getColumn(i), 
mergeSortKey.rowIndex);
+      }
+      tsBlockBuilder.declarePosition();
+
+      int readerIndex = mergeSortKey.inputChannelIndex;
+      mergeSortKey = readNextMergeSortKey(readerIndex);
+      if (mergeSortKey != null) {
+        mergeSortHeap.push(mergeSortKey);
+      } else {
+        noMoreData[readerIndex] = true;
+        sortBufferManager.releaseOneSortBranch();
+      }
+
+      // break if time is out or tsBlockBuilder is full or sortBuffer is not 
enough
+      if (System.nanoTime() - startTime > maxRuntime || 
tsBlockBuilder.isFull()) {
+        break;
+      }
+    }
+    sortCost += System.nanoTime() - startTime;
+    return tsBlockBuilder.build();
+  }
+
+  private void initMergeSortHeap() throws IoTDBException {
+    if (mergeSortHeap == null) {
+      mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
+      for (int i = 0; i < sortReaders.size(); i++) {
+        SortReader sortReader = sortReaders.get(i);
+        if (sortReader.hasNext()) {
+          MergeSortKey mergeSortKey = sortReader.next();
+          mergeSortKey.inputChannelIndex = i;
+          mergeSortHeap.push(mergeSortKey);
+        } else {
+          noMoreData[i] = true;
+          sortBufferManager.releaseOneSortBranch();
+        }
+      }
+    }
+  }
+
+  private MergeSortKey readNextMergeSortKey(int readerIndex) throws 
IoTDBException {
+    SortReader sortReader = sortReaders.get(readerIndex);
+    if (sortReader.hasNext()) {
+      MergeSortKey mergeSortKey = sortReader.next();
+      mergeSortKey.inputChannelIndex = readerIndex;
+      return mergeSortKey;
+    }
+    return null;
+  }
+
+  private boolean hasMoreData() {
+    if (noMoreData == null) {
+      return true;
+    }
+    for (boolean noMore : noMoreData) {
+      if (!noMore) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void clear() {
+    if (!diskSpiller.hasSpilledData()) {
+      return;
+    }
+    try {
+      if (sortReaders != null) {
+        for (SortReader sortReader : sortReaders) {
+          sortReader.close();
+        }
+      }
+    } catch (Exception e) {
+      logger.error("Fail to close fileChannel", e);
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return inputOperator.hasNextWithTimer()
+        || (!diskSpiller.hasSpilledData() && curRow != cachedData.size())
+        || (diskSpiller.hasSpilledData() && hasMoreData());
+  }
+
+  @Override
+  public void close() throws Exception {
+    recordMetrics();
+    cachedData = null;
+    clear();
+    inputOperator.close();
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !this.hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return inputOperator.calculateMaxPeekMemoryWithCounter()
+        + inputOperator.calculateRetainedSizeAfterCallingNext()
+        + SORT_BUFFER_SIZE;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return inputOperator.calculateRetainedSizeAfterCallingNext() + 
SORT_BUFFER_SIZE;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + RamUsageEstimator.sizeOf(noMoreData)
+        + tsBlockBuilder.getRetainedSizeInBytes();
+  }
+}

Reply via email to