This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit afb1d5f33ce9c401e5c08248a6bdf33e9b925125
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Dec 1 18:04:56 2021 +0800

    move packBuffer and putPBOSToBuffer into QueryDataSetUtils
---
 .../query/dataset/udf/UDTFAlignByTimeDataSet.java  |  40 +-----
 .../db/query/dataset/udf/UDTFJoinDataSet.java      | 140 +++++++++++++++++++--
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  33 +++++
 3 files changed, 168 insertions(+), 45 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index 7572516..7c7f1bb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -40,8 +41,6 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 
 public class UDTFAlignByTimeDataSet extends UDTFDataSet implements 
DirectAlignByTimeDataSet {
@@ -109,8 +108,6 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet 
implements DirectAlignBy
   }
 
   public QueryDataSet executeInFragmentsIfPossible() throws 
QueryProcessException, IOException {
-    // TODO make the behaviour of the return value of 
layerBuilder.generateJoinDataSet() the same as
-    // TODO the original dataset
     return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet(this) 
: this;
   }
 
@@ -237,39 +234,8 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet 
implements DirectAlignBy
       }
     }
 
-    return packBuffer(tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList);
-  }
-
-  protected TSQueryDataSet packBuffer(
-      TSQueryDataSet tsQueryDataSet,
-      PublicBAOS timeBAOS,
-      PublicBAOS[] valueBAOSList,
-      PublicBAOS[] bitmapBAOSList) {
-    int columnsNum = transformers.length;
-
-    ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
-    timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
-    timeBuffer.flip();
-    tsQueryDataSet.setTime(timeBuffer);
-
-    List<ByteBuffer> valueBufferList = new ArrayList<>();
-    List<ByteBuffer> bitmapBufferList = new ArrayList<>();
-    for (int i = 0; i < columnsNum; ++i) {
-      putPBOSToBuffer(valueBAOSList, valueBufferList, i);
-      putPBOSToBuffer(bitmapBAOSList, bitmapBufferList, i);
-    }
-    tsQueryDataSet.setValueList(valueBufferList);
-    tsQueryDataSet.setBitmapList(bitmapBufferList);
-
-    return tsQueryDataSet;
-  }
-
-  protected void putPBOSToBuffer(
-      PublicBAOS[] bitmapBAOSList, List<ByteBuffer> bitmapBufferList, int 
tsIndex) {
-    ByteBuffer bitmapBuffer = 
ByteBuffer.allocate(bitmapBAOSList[tsIndex].size());
-    bitmapBuffer.put(bitmapBAOSList[tsIndex].getBuf(), 0, 
bitmapBAOSList[tsIndex].size());
-    bitmapBuffer.flip();
-    bitmapBufferList.add(bitmapBuffer);
+    return QueryDataSetUtils.packBuffer(
+        tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList, 
transformers.length);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index f92f600..1e7bd02 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -19,15 +19,19 @@
 
 package org.apache.iotdb.db.query.dataset.udf;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import java.io.IOException;
 
-public class UDTFJoinDataSet extends QueryDataSet
-//    implements DirectAlignByTimeDataSet
-{
+public class UDTFJoinDataSet extends QueryDataSet implements 
DirectAlignByTimeDataSet {
 
   private final UDTFFragmentDataSet[] fragmentDataSets;
 
@@ -77,6 +81,131 @@ public class UDTFJoinDataSet extends QueryDataSet
   }
 
   @Override
+  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
+      throws IOException, QueryProcessException {
+    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+
+    PublicBAOS timeBAOS = new PublicBAOS();
+    PublicBAOS[] valueBAOSList = new PublicBAOS[resultColumnsLength];
+    PublicBAOS[] bitmapBAOSList = new PublicBAOS[resultColumnsLength];
+    for (int i = 0; i < resultColumnsLength; ++i) {
+      valueBAOSList[i] = new PublicBAOS();
+      bitmapBAOSList[i] = new PublicBAOS();
+    }
+    int[] currentBitmapList = new int[resultColumnsLength];
+
+    //    int rowCount = 0;
+    //    while (rowCount < fetchSize
+    //        && (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit)
+    //        && !timeHeap.isEmpty()) {
+    //
+    //      long minTime = timeHeap.pollFirst();
+    //      if (rowOffset == 0) {
+    //        timeBAOS.write(BytesUtils.longToBytes(minTime));
+    //      }
+    //
+    //      for (int i = 0; i < resultColumnsLength; ++i) {
+    //        LayerPointReader reader = transformers[i];
+    //
+    //        if (!reader.next() || reader.currentTime() != minTime) {
+    //          if (rowOffset == 0) {
+    //            currentBitmapList[i] = (currentBitmapList[i] << 1);
+    //          }
+    //          continue;
+    //        }
+    //
+    //        if (rowOffset == 0) {
+    //          currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG;
+    //          TSDataType type = reader.getDataType();
+    //          switch (type) {
+    //            case INT32:
+    //              int intValue = reader.currentInt();
+    //              ReadWriteIOUtils.write(
+    //                  encoder != null && encoder.needEncode(minTime)
+    //                      ? encoder.encodeInt(intValue, minTime)
+    //                      : intValue,
+    //                  valueBAOSList[i]);
+    //              break;
+    //            case INT64:
+    //              long longValue = reader.currentLong();
+    //              ReadWriteIOUtils.write(
+    //                  encoder != null && encoder.needEncode(minTime)
+    //                      ? encoder.encodeLong(longValue, minTime)
+    //                      : longValue,
+    //                  valueBAOSList[i]);
+    //              break;
+    //            case FLOAT:
+    //              float floatValue = reader.currentFloat();
+    //              ReadWriteIOUtils.write(
+    //                  encoder != null && encoder.needEncode(minTime)
+    //                      ? encoder.encodeFloat(floatValue, minTime)
+    //                      : floatValue,
+    //                  valueBAOSList[i]);
+    //              break;
+    //            case DOUBLE:
+    //              double doubleValue = reader.currentDouble();
+    //              ReadWriteIOUtils.write(
+    //                  encoder != null && encoder.needEncode(minTime)
+    //                      ? encoder.encodeDouble(doubleValue, minTime)
+    //                      : doubleValue,
+    //                  valueBAOSList[i]);
+    //              break;
+    //            case BOOLEAN:
+    //              ReadWriteIOUtils.write(reader.currentBoolean(), 
valueBAOSList[i]);
+    //              break;
+    //            case TEXT:
+    //              ReadWriteIOUtils.write(reader.currentBinary(), 
valueBAOSList[i]);
+    //              break;
+    //            default:
+    //              throw new UnSupportedDataTypeException(
+    //                  String.format("Data type %s is not supported.", type));
+    //          }
+    //        }
+    //
+    //        reader.readyForNext();
+    //
+    //        if (reader.next()) {
+    //          timeHeap.add(reader.currentTime());
+    //        }
+    //      }
+    //
+    //      if (rowOffset == 0) {
+    //        ++rowCount;
+    //        if (rowCount % 8 == 0) {
+    //          for (int i = 0; i < resultColumnsLength; ++i) {
+    //            ReadWriteIOUtils.write((byte) currentBitmapList[i], 
bitmapBAOSList[i]);
+    //            currentBitmapList[i] = 0;
+    //          }
+    //        }
+    //        if (rowLimit > 0) {
+    //          ++alreadyReturnedRowNum;
+    //        }
+    //      } else {
+    //        --rowOffset;
+    //      }
+    //
+    //      rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
+    //    }
+    //
+    //    /*
+    //     * feed the bitmap with remaining 0 in the right
+    //     * if current bitmap is 00011111 and remaining is 3, after feeding 
the bitmap is 11111000
+    //     */
+    //    if (rowCount > 0) {
+    //      int remaining = rowCount % 8;
+    //      if (remaining != 0) {
+    //        for (int i = 0; i < resultColumnsLength; ++i) {
+    //          ReadWriteIOUtils.write(
+    //              (byte) (currentBitmapList[i] << (8 - remaining)), 
bitmapBAOSList[i]);
+    //        }
+    //      }
+    //    }
+
+    return QueryDataSetUtils.packBuffer(
+        tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList, 
resultColumnsLength);
+  }
+
+  @Override
   public boolean hasNextWithoutConstraint() throws IOException {
     return !timeHeap.isEmpty();
   }
@@ -114,9 +243,4 @@ public class UDTFJoinDataSet extends QueryDataSet
 
     return rowRecord;
   }
-
-  //  @Override
-  //  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder 
encoder) {
-  //    throw new NotImplementedException();
-  //  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 34c1483..7816169 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -28,11 +28,13 @@ import 
org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -269,4 +271,35 @@ public class QueryDataSetUtils {
     }
     return values;
   }
+
+  public static TSQueryDataSet packBuffer(
+      TSQueryDataSet tsQueryDataSet,
+      PublicBAOS timeBAOS,
+      PublicBAOS[] valueBAOSList,
+      PublicBAOS[] bitmapBAOSList,
+      int columnsLength) {
+    ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
+    timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
+    timeBuffer.flip();
+    tsQueryDataSet.setTime(timeBuffer);
+
+    List<ByteBuffer> valueBufferList = new ArrayList<>();
+    List<ByteBuffer> bitmapBufferList = new ArrayList<>();
+    for (int i = 0; i < columnsLength; ++i) {
+      putPBOSToBuffer(valueBAOSList, valueBufferList, i);
+      putPBOSToBuffer(bitmapBAOSList, bitmapBufferList, i);
+    }
+    tsQueryDataSet.setValueList(valueBufferList);
+    tsQueryDataSet.setBitmapList(bitmapBufferList);
+
+    return tsQueryDataSet;
+  }
+
+  public static void putPBOSToBuffer(
+      PublicBAOS[] bitmapBAOSList, List<ByteBuffer> bitmapBufferList, int 
tsIndex) {
+    ByteBuffer bitmapBuffer = 
ByteBuffer.allocate(bitmapBAOSList[tsIndex].size());
+    bitmapBuffer.put(bitmapBAOSList[tsIndex].getBuf(), 0, 
bitmapBAOSList[tsIndex].size());
+    bitmapBuffer.flip();
+    bitmapBufferList.add(bitmapBuffer);
+  }
 }

Reply via email to