This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch NonAlignedTablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 448ed1ceeaa9d6ef8171c488e2d793afb2f30953
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Mar 1 20:42:15 2022 +0800

    Support NonAlignedTablet
---
 .../apache/iotdb/tsfile/TsFileWriteWithTablet.java |  32 ++-
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |  28 +++
 .../chunk/NonAlignedChunkGroupWriterImpl.java      |  72 ++++++
 .../tsfile/write/record/NonAlignedTablet.java      | 280 +++++++++++++++++++++
 4 files changed, 411 insertions(+), 1 deletion(-)

diff --git 
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java
 
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java
index c454e0c..2bb19d8 100644
--- 
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java
+++ 
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.NonAlignedTablet;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -69,7 +70,7 @@ public class TsFileWriteWithTablet {
         writeMeasurementScheams.add(measurementSchemas.get(0));
         writeMeasurementScheams.add(measurementSchemas.get(1));
         writeMeasurementScheams.add(measurementSchemas.get(2));
-        writeWithTablet(tsFileWriter, DEVICE_1, writeMeasurementScheams, 
10000, 0, 0);
+        writeWithNonAlignedTablet(tsFileWriter, DEVICE_1, 
writeMeasurementScheams, 10000, 0, 0);
       }
     } catch (Exception e) {
       logger.error("meet error in TsFileWrite with tablet", e);
@@ -108,4 +109,33 @@ public class TsFileWriteWithTablet {
       tablet.reset();
     }
   }
+
+  private static void writeWithNonAlignedTablet(
+      TsFileWriter tsFileWriter,
+      String deviceId,
+      List<MeasurementSchema> schemas,
+      long rowNum,
+      long startTime,
+      long startValue)
+      throws IOException, WriteProcessException {
+    NonAlignedTablet tablet = new NonAlignedTablet(deviceId, schemas);
+
+    long sensorNum = schemas.size();
+    for (long r = 0; r < rowNum; r++, startValue++) {
+      for (int i = 0; i < sensorNum; i++) {
+        tablet.addValue(
+            schemas.get(i).getMeasurementId(), startTime++, new 
Binary("testString........."));
+      }
+      // write
+      if (tablet.maxRowSize == tablet.getMaxRowNumber()) {
+        tsFileWriter.write(tablet);
+        tablet.reset();
+      }
+    }
+    // write
+    if (tablet.maxRowSize != 0) {
+      tsFileWriter.write(tablet);
+      tablet.reset();
+    }
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index f2ced03..5e8cdfb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.utils.MeasurementGroup;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkGroupWriter;
 import org.apache.iotdb.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
+import org.apache.iotdb.tsfile.write.record.NonAlignedTablet;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -380,6 +381,24 @@ public class TsFileWriter implements AutoCloseable {
     }
   }
 
+  private void checkIsTimeseriesExist(NonAlignedTablet tablet) throws 
WriteProcessException {
+    IChunkGroupWriter groupWriter = tryToInitialGroupWriter(tablet.deviceId, 
false);
+
+    Path devicePath = new Path(tablet.deviceId);
+    List<MeasurementSchema> schemas = tablet.getSchemas();
+    if (schema.containsDevice(devicePath)) {
+      checkIsAllMeasurementsInGroup(schema.getSeriesSchema(devicePath), 
schemas, false);
+      groupWriter.tryToAddSeriesWriter(schemas);
+    } else if (schema.getSchemaTemplates() != null && 
schema.getSchemaTemplates().size() == 1) {
+      MeasurementGroup measurementGroup =
+          schema.getSchemaTemplates().entrySet().iterator().next().getValue();
+      checkIsAllMeasurementsInGroup(measurementGroup, schemas, false);
+      groupWriter.tryToAddSeriesWriter(schemas);
+    } else {
+      throw new NoMeasurementException("input devicePath is invalid: " + 
devicePath);
+    }
+  }
+
   /**
    * If it's aligned, then all measurementSchemas should be contained in the 
measurementGroup, or it
    * will throw exception. If it's nonAligned, then remove the 
measurementSchema that is not
@@ -513,6 +532,15 @@ public class TsFileWriter implements AutoCloseable {
     return checkMemorySizeAndMayFlushChunks();
   }
 
+  public boolean write(NonAlignedTablet tablet) throws IOException, 
WriteProcessException {
+    // make sure the ChunkGroupWriter for this Tablet exist
+    checkIsTimeseriesExist(tablet);
+    // get corresponding ChunkGroupWriter and write this Tablet
+    recordCount +=
+        ((NonAlignedChunkGroupWriterImpl) 
groupWriters.get(tablet.deviceId)).write(tablet);
+    return checkMemorySizeAndMayFlushChunks();
+  }
+
   public boolean writeAligned(Tablet tablet) throws IOException, 
WriteProcessException {
     // make sure the ChunkGroupWriter for this Tablet exist
     checkIsTimeseriesExist(tablet, true);
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
index 8b6038e..9d65318 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.NonAlignedTablet;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -129,6 +130,77 @@ public class NonAlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
     return pointCount;
   }
 
+  public int write(NonAlignedTablet tablet) throws WriteProcessException {
+    int pointCount = 0;
+    List<MeasurementSchema> timeseries = tablet.getSchemas();
+    for (int column = 0; column < timeseries.size(); column++) {
+      String measurementId = timeseries.get(column).getMeasurementId();
+      int rowSize = tablet.rowSize[column];
+      pointCount = Math.max(pointCount, rowSize);
+      long[] timestamps = tablet.timestamps[column];
+      switch (timeseries.get(column).getType()) {
+        case INT32:
+          int[] intValues = (int[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, intValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case INT64:
+          long[] longValues = (long[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, longValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case FLOAT:
+          float[] floatValues = (float[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, floatValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case DOUBLE:
+          double[] doubleValues = (double[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, doubleValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case BOOLEAN:
+          boolean[] booleanValues = (boolean[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, booleanValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case TEXT:
+          Binary[] binaryValues = (Binary[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, binaryValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", 
timeseries.get(column).getType()));
+      }
+    }
+    return pointCount;
+  }
+
   @Override
   public long flushToFileWriter(TsFileIOWriter fileWriter) throws IOException {
     LOG.debug("start flush device id:{}", deviceId);
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/NonAlignedTablet.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/NonAlignedTablet.java
new file mode 100644
index 0000000..f0be58f
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/NonAlignedTablet.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.record;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.*;
+
+public class NonAlignedTablet {
+
+  private static final int DEFAULT_SIZE = 1024;
+  private static final String NOT_SUPPORT_DATATYPE = "Data type %s is not 
supported.";
+
+  /** deviceId of this tablet */
+  public String deviceId;
+
+  /** the list of measurement schemas for creating the tablet */
+  private List<MeasurementSchema> schemas;
+
+  /** measurementId->indexOf(measurementSchema) */
+  private final Map<String, Integer> measurementIndex;
+
+  /** timestamps in this tablet */
+  public long[][] timestamps;
+  /** each object is a primitive type array, which represents values of one 
measurement */
+  public Object[] values;
+
+  /** the number of rows for each sensor to include in this tablet */
+  public int[] rowSize;
+  /** the max number of rows in each sensor */
+  public int maxRowSize;
+  /** the maximum number of rows for this tablet */
+  private final int maxRowNumber;
+
+  /**
+   * Return a tablet with default specified row number. This is the standard 
constructor (all Tablet
+   * should be the same size).
+   *
+   * @param deviceId the name of the device specified to be written in
+   * @param schemas the list of measurement schemas for creating the tablet, 
only measurementId and
+   *     type take effects
+   */
+  public NonAlignedTablet(String deviceId, List<MeasurementSchema> schemas) {
+    this(deviceId, schemas, DEFAULT_SIZE);
+  }
+
+  /**
+   * Return a tablet with the specified number of rows (maxBatchSize). Only 
call this constructor
+   * directly for testing purposes. Tablet should normally always be default 
size.
+   *
+   * @param deviceId the name of the device specified to be written in
+   * @param schemas the list of measurement schemas for creating the row 
batch, only measurementId
+   *     and type take effects
+   * @param maxRowNumber the maximum number of rows for this tablet
+   */
+  public NonAlignedTablet(String deviceId, List<MeasurementSchema> schemas, 
int maxRowNumber) {
+    this.deviceId = deviceId;
+    this.schemas = new ArrayList<>(schemas);
+    this.maxRowNumber = maxRowNumber;
+    measurementIndex = new HashMap<>();
+
+    int indexInSchema = 0;
+    for (MeasurementSchema schema : schemas) {
+      if (schema.getType() == TSDataType.VECTOR) {
+        for (String measurementId : schema.getSubMeasurementsList()) {
+          measurementIndex.put(measurementId, indexInSchema);
+        }
+      } else {
+        measurementIndex.put(schema.getMeasurementId(), indexInSchema);
+      }
+      indexInSchema++;
+    }
+
+    createColumns();
+
+    reset();
+  }
+
+  public void setDeviceId(String deviceId) {
+    this.deviceId = deviceId;
+  }
+
+  public void setSchemas(List<MeasurementSchema> schemas) {
+    this.schemas = schemas;
+  }
+
+  public void addValue(String measurementId, long time, Object value) {
+    if (value == null) {
+      return;
+    }
+    int indexOfSchema = measurementIndex.get(measurementId);
+    MeasurementSchema measurementSchema = schemas.get(indexOfSchema);
+    addValueOfDataType(
+        measurementSchema.getType(), rowSize[indexOfSchema]++, indexOfSchema, 
time, value);
+    maxRowSize = Math.max(maxRowSize, rowSize[indexOfSchema]);
+  }
+
+  private void addValueOfDataType(
+      TSDataType dataType, int rowIndex, int indexOfSchema, long time, Object 
value) {
+    timestamps[indexOfSchema][rowIndex] = time;
+    switch (dataType) {
+      case TEXT:
+        {
+          Binary[] sensor = (Binary[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (Binary) value : 
Binary.EMPTY_VALUE;
+          break;
+        }
+      case FLOAT:
+        {
+          float[] sensor = (float[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (float) value : Float.MIN_VALUE;
+          break;
+        }
+      case INT32:
+        {
+          int[] sensor = (int[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (int) value : Integer.MIN_VALUE;
+          break;
+        }
+      case INT64:
+        {
+          long[] sensor = (long[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (long) value : Long.MIN_VALUE;
+          break;
+        }
+      case DOUBLE:
+        {
+          double[] sensor = (double[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (double) value : Double.MIN_VALUE;
+          break;
+        }
+      case BOOLEAN:
+        {
+          boolean[] sensor = (boolean[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null && (boolean) value;
+          break;
+        }
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType));
+    }
+  }
+
+  public List<MeasurementSchema> getSchemas() {
+    return schemas;
+  }
+
+  /** Return the maximum number of rows for this tablet */
+  public int getMaxRowNumber() {
+    return maxRowNumber;
+  }
+
+  /** Reset Tablet to the default state - set the rowSize to 0 and reset 
bitMaps */
+  public void reset() {
+    maxRowSize = 0;
+    if (rowSize == null) {
+      rowSize = new int[schemas.size()];
+    } else {
+      Arrays.fill(rowSize, 0);
+    }
+  }
+
+  private void createColumns() {
+    // create timestamp column
+    timestamps = new long[schemas.size()][maxRowNumber];
+
+    // calculate total value column size
+    int valueColumnsSize = schemas.size();
+
+    // value column
+    values = new Object[valueColumnsSize];
+    int columnIndex = 0;
+    for (MeasurementSchema schema : schemas) {
+      TSDataType dataType = schema.getType();
+      values[columnIndex] = createValueColumnOfDataType(dataType);
+      columnIndex++;
+    }
+  }
+
+  private Object createValueColumnOfDataType(TSDataType dataType) {
+
+    Object valueColumn;
+    switch (dataType) {
+      case INT32:
+        valueColumn = new int[maxRowNumber];
+        break;
+      case INT64:
+        valueColumn = new long[maxRowNumber];
+        break;
+      case FLOAT:
+        valueColumn = new float[maxRowNumber];
+        break;
+      case DOUBLE:
+        valueColumn = new double[maxRowNumber];
+        break;
+      case BOOLEAN:
+        valueColumn = new boolean[maxRowNumber];
+        break;
+      case TEXT:
+        valueColumn = new Binary[maxRowNumber];
+        break;
+      default:
+        throw new 
UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType));
+    }
+    return valueColumn;
+  }
+
+  //    public int getTimeBytesSize() {
+  //        return rowSize * 8;
+  //    }
+  //
+  //    /**
+  //     * @return total bytes of values
+  //     */
+  //    public int getTotalValueOccupation() {
+  //        int valueOccupation = 0;
+  //        int columnIndex = 0;
+  //        for (MeasurementSchema schema : schemas) {
+  //            valueOccupation += calOccupationOfOneColumn(schema.getType(), 
columnIndex);
+  //            columnIndex++;
+  //        }
+  //        // add bitmap size if the tablet has bitMaps
+  //        if (bitMaps != null) {
+  //            for (BitMap bitMap : bitMaps) {
+  //                // marker byte
+  //                valueOccupation++;
+  //                if (bitMap != null && !bitMap.isAllUnmarked()) {
+  //                    valueOccupation += rowSize / Byte.SIZE + 1;
+  //                }
+  //            }
+  //        }
+  //        return valueOccupation;
+  //    }
+
+  //    private int calOccupationOfOneColumn(TSDataType dataType, int 
columnIndex) {
+  //        int valueOccupation = 0;
+  //        switch (dataType) {
+  //            case BOOLEAN:
+  //                valueOccupation += rowSize;
+  //                break;
+  //            case INT32:
+  //            case FLOAT:
+  //                valueOccupation += rowSize * 4;
+  //                break;
+  //            case INT64:
+  //            case DOUBLE:
+  //                valueOccupation += rowSize * 8;
+  //                break;
+  //            case TEXT:
+  //                valueOccupation += rowSize * 4;
+  //                Binary[] binaries = (Binary[]) values[columnIndex];
+  //                for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+  //                    valueOccupation += binaries[rowIndex].getLength();
+  //                }
+  //                break;
+  //            default:
+  //                throw new 
UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE,
+  // dataType));
+  //        }
+  //        return valueOccupation;
+  //    }
+}

Reply via email to