This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/tsFile_v4 by this push:
     new a1151813 add tablet check
a1151813 is described below

commit a11518139f1f764561b7c7d862e0bcb9b1052274
Author: jt2594838 <[email protected]>
AuthorDate: Sun Apr 7 17:27:34 2024 +0800

    add tablet check
---
 .../exception/write/ConflictDataTypeException.java |  30 +++
 .../tsfile/exception/write/NoTableException.java   |   8 +
 .../java/org/apache/tsfile/write/TsFileWriter.java |  30 +++
 .../org/apache/tsfile/write/record/Tablet.java     | 210 ++++++++++++++-------
 4 files changed, 212 insertions(+), 66 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/tsfile/exception/write/ConflictDataTypeException.java
 
b/tsfile/src/main/java/org/apache/tsfile/exception/write/ConflictDataTypeException.java
new file mode 100644
index 00000000..0523c234
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/tsfile/exception/write/ConflictDataTypeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.exception.write;
+
+import org.apache.tsfile.enums.TSDataType;
+
+public class ConflictDataTypeException extends WriteProcessException {
+
+  public ConflictDataTypeException(TSDataType writing, TSDataType registered) {
+    super(String.format("Conflict data type: %s (writing) and %s 
(registered)", writing,
+        registered));
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/exception/write/NoTableException.java 
b/tsfile/src/main/java/org/apache/tsfile/exception/write/NoTableException.java
new file mode 100644
index 00000000..43a80796
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/tsfile/exception/write/NoTableException.java
@@ -0,0 +1,8 @@
+package org.apache.tsfile.exception.write;
+
+public class NoTableException extends WriteProcessException{
+
+  public NoTableException(String tableName) {
+    super(String.format("Table %s not found", tableName));
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java 
b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index 8749f588..315dd1ae 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -20,10 +20,13 @@ package org.apache.tsfile.write;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.exception.write.ConflictDataTypeException;
 import org.apache.tsfile.exception.write.NoMeasurementException;
+import org.apache.tsfile.exception.write.NoTableException;
 import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.utils.MeasurementGroup;
 import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
@@ -350,6 +353,25 @@ public class TsFileWriter implements AutoCloseable {
     return true;
   }
 
+  private void checkIsTableExist(Tablet tablet) throws WriteProcessException {
+    String tableName = tablet.deviceId;
+    final TableSchema tableSchema = 
getSchema().getTableSchemaMap().get(tableName);
+    if (tableSchema == null) {
+      throw new NoTableException(tableName);
+    }
+
+    for (MeasurementSchema writingColumnSchema : tablet.getSchemas()) {
+      final int columnIndex = 
tableSchema.findColumnIndex(writingColumnSchema.getMeasurementId());
+      if (columnIndex < 0) {
+        throw new 
NoMeasurementException(writingColumnSchema.getMeasurementId());
+      }
+      final MeasurementSchema registeredColumnSchema = 
tableSchema.getColumnSchemas().get(columnIndex);
+      if 
(!writingColumnSchema.getType().equals(registeredColumnSchema.getType())) {
+        throw new ConflictDataTypeException(writingColumnSchema.getType(), 
registeredColumnSchema.getType());
+      }
+    }
+  }
+
   private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
       throws WriteProcessException, IOException {
     IChunkGroupWriter groupWriter =
@@ -652,4 +674,12 @@ public class TsFileWriter implements AutoCloseable {
   public Schema getSchema() {
     return fileWriter.getSchema();
   }
+
+  public boolean writeTable(Tablet tablet) throws IOException, 
WriteProcessException {
+    // make sure the ChunkGroupWriter for this Tablet exist
+    checkIsTableExist(tablet);
+    // get corresponding ChunkGroupWriter and write this Tablet
+    recordCount += groupWriters.get(new 
PlainDeviceID(tablet.deviceId)).write(tablet);
+    return checkMemorySizeAndMayFlushChunks();
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java 
b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
index 61ab2e41..1ef2ac6c 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
@@ -53,24 +53,50 @@ public class Tablet {
   private static final int DEFAULT_SIZE = 1024;
   private static final String NOT_SUPPORT_DATATYPE = "Data type %s is not 
supported.";
 
-  /** DeviceId of this {@link Tablet} */
+  /**
+   * DeviceId of this {@link Tablet}
+   */
   public String deviceId;
 
-  /** The list of {@link MeasurementSchema}s for creating the {@link Tablet} */
+  /**
+   * The list of {@link MeasurementSchema}s for creating the {@link Tablet}
+   */
   private List<MeasurementSchema> schemas;
+  /**
+   * Marking the type of each column, namely ID or MEASUREMENT.
+   * Notice: the ID columns must be the FIRST ones.
+   */
+  private List<ColumnType> columnTypes;
 
-  /** MeasurementId->indexOf({@link MeasurementSchema}) */
+  /**
+   * Columns in [0, idColumnRange) are all ID columns.
+   */
+  private int idColumnRange;
+
+  /**
+   * MeasurementId->indexOf({@link MeasurementSchema})
+   */
   private final Map<String, Integer> measurementIndex;
 
-  /** Timestamps in this {@link Tablet} */
+  /**
+   * Timestamps in this {@link Tablet}
+   */
   public long[] timestamps;
-  /** Each object is a primitive type array, which represents values of one 
measurement */
+  /**
+   * Each object is a primitive type array, which represents values of one 
measurement
+   */
   public Object[] values;
-  /** Each {@link BitMap} represents the existence of each value in the 
current column. */
+  /**
+   * Each {@link BitMap} represents the existence of each value in the current 
column.
+   */
   public BitMap[] bitMaps;
-  /** The number of rows to include in this {@link Tablet} */
+  /**
+   * The number of rows to include in this {@link Tablet}
+   */
   public int rowSize;
-  /** The maximum number of rows for this {@link Tablet} */
+  /**
+   * The maximum number of rows for this {@link Tablet}
+   */
   private final int maxRowNumber;
 
   /**
@@ -79,7 +105,7 @@ public class Tablet {
    *
    * @param deviceId the name of the device specified to be written in
    * @param schemas the list of {@link MeasurementSchema}s for creating the 
tablet, only
-   *     measurementId and type take effects
+   * measurementId and type take effects
    */
   public Tablet(String deviceId, List<MeasurementSchema> schemas) {
     this(deviceId, schemas, DEFAULT_SIZE);
@@ -92,12 +118,13 @@ public class Tablet {
    *
    * @param deviceId the name of the device specified to be written in
    * @param schemas the list of {@link MeasurementSchema}s for creating the 
row batch, only
-   *     measurementId and type take effects
+   * measurementId and type take effects
    * @param maxRowNumber the maximum number of rows for this tablet
    */
   public Tablet(String deviceId, List<MeasurementSchema> schemas, int 
maxRowNumber) {
     this.deviceId = deviceId;
     this.schemas = new ArrayList<>(schemas);
+    setColumnTypes(ColumnType.nCopy(ColumnType.MEASUREMENT, schemas.size()));
     this.maxRowNumber = maxRowNumber;
     measurementIndex = new HashMap<>();
     constructMeasurementIndexMap();
@@ -113,7 +140,7 @@ public class Tablet {
    *
    * @param deviceId the name of the device specified to be written in
    * @param schemas the list of {@link MeasurementSchema}s for creating the 
row batch, only
-   *     measurementId and type take effects
+   * measurementId and type take effects
    * @param timestamps given timestamps
    * @param values given values
    * @param bitMaps given {@link BitMap}s
@@ -126,8 +153,21 @@ public class Tablet {
       Object[] values,
       BitMap[] bitMaps,
       int maxRowNumber) {
+    this(deviceId, schemas, ColumnType.nCopy(ColumnType.MEASUREMENT, 
schemas.size()), timestamps,
+        values, bitMaps, maxRowNumber);
+  }
+
+  public Tablet(
+      String deviceId,
+      List<MeasurementSchema> schemas,
+      List<ColumnType> columnTypes,
+      long[] timestamps,
+      Object[] values,
+      BitMap[] bitMaps,
+      int maxRowNumber) {
     this.deviceId = deviceId;
     this.schemas = schemas;
+    setColumnTypes(columnTypes);
     this.timestamps = timestamps;
     this.values = values;
     this.bitMaps = bitMaps;
@@ -138,6 +178,7 @@ public class Tablet {
     constructMeasurementIndexMap();
   }
 
+
   private void constructMeasurementIndexMap() {
     int indexInSchema = 0;
     for (MeasurementSchema schema : schemas) {
@@ -186,49 +227,43 @@ public class Tablet {
       bitMaps[indexOfSchema].mark(rowIndex);
     }
     switch (dataType) {
-      case TEXT:
-        {
-          Binary[] sensor = (Binary[]) values[indexOfSchema];
-          if (value instanceof Binary) {
-            sensor[rowIndex] = (Binary) value;
-          } else {
-            sensor[rowIndex] =
-                value != null
-                    ? new Binary((String) value, TSFileConfig.STRING_CHARSET)
-                    : Binary.EMPTY_VALUE;
-          }
-          break;
-        }
-      case FLOAT:
-        {
-          float[] sensor = (float[]) values[indexOfSchema];
-          sensor[rowIndex] = value != null ? (float) value : Float.MIN_VALUE;
-          break;
-        }
-      case INT32:
-        {
-          int[] sensor = (int[]) values[indexOfSchema];
-          sensor[rowIndex] = value != null ? (int) value : Integer.MIN_VALUE;
-          break;
-        }
-      case INT64:
-        {
-          long[] sensor = (long[]) values[indexOfSchema];
-          sensor[rowIndex] = value != null ? (long) value : Long.MIN_VALUE;
-          break;
-        }
-      case DOUBLE:
-        {
-          double[] sensor = (double[]) values[indexOfSchema];
-          sensor[rowIndex] = value != null ? (double) value : Double.MIN_VALUE;
-          break;
-        }
-      case BOOLEAN:
-        {
-          boolean[] sensor = (boolean[]) values[indexOfSchema];
-          sensor[rowIndex] = value != null && (boolean) value;
-          break;
+      case TEXT: {
+        Binary[] sensor = (Binary[]) values[indexOfSchema];
+        if (value instanceof Binary) {
+          sensor[rowIndex] = (Binary) value;
+        } else {
+          sensor[rowIndex] =
+              value != null
+                  ? new Binary((String) value, TSFileConfig.STRING_CHARSET)
+                  : Binary.EMPTY_VALUE;
         }
+        break;
+      }
+      case FLOAT: {
+        float[] sensor = (float[]) values[indexOfSchema];
+        sensor[rowIndex] = value != null ? (float) value : Float.MIN_VALUE;
+        break;
+      }
+      case INT32: {
+        int[] sensor = (int[]) values[indexOfSchema];
+        sensor[rowIndex] = value != null ? (int) value : Integer.MIN_VALUE;
+        break;
+      }
+      case INT64: {
+        long[] sensor = (long[]) values[indexOfSchema];
+        sensor[rowIndex] = value != null ? (long) value : Long.MIN_VALUE;
+        break;
+      }
+      case DOUBLE: {
+        double[] sensor = (double[]) values[indexOfSchema];
+        sensor[rowIndex] = value != null ? (double) value : Double.MIN_VALUE;
+        break;
+      }
+      case BOOLEAN: {
+        boolean[] sensor = (boolean[]) values[indexOfSchema];
+        sensor[rowIndex] = value != null && (boolean) value;
+        break;
+      }
       default:
         throw new 
UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType));
     }
@@ -238,12 +273,16 @@ public class Tablet {
     return schemas;
   }
 
-  /** Return the maximum number of rows for this tablet */
+  /**
+   * Return the maximum number of rows for this tablet
+   */
   public int getMaxRowNumber() {
     return maxRowNumber;
   }
 
-  /** Reset Tablet to the default state - set the rowSize to 0 and reset 
bitMaps */
+  /**
+   * Reset Tablet to the default state - set the rowSize to 0 and reset bitMaps
+   */
   public void reset() {
     rowSize = 0;
     if (bitMaps != null) {
@@ -304,7 +343,9 @@ public class Tablet {
     return rowSize * 8;
   }
 
-  /** @return Total bytes of values */
+  /**
+   * @return Total bytes of values
+   */
   public int getTotalValueOccupation() {
     int valueOccupation = 0;
     int columnIndex = 0;
@@ -352,7 +393,9 @@ public class Tablet {
     return valueOccupation;
   }
 
-  /** Serialize {@link Tablet} */
+  /**
+   * Serialize {@link Tablet}
+   */
   public ByteBuffer serialize() throws IOException {
     try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
         DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
@@ -370,7 +413,9 @@ public class Tablet {
     writeValues(stream);
   }
 
-  /** Serialize {@link MeasurementSchema}s */
+  /**
+   * Serialize {@link MeasurementSchema}s
+   */
   private void writeMeasurementSchemas(DataOutputStream stream) throws 
IOException {
     ReadWriteIOUtils.write(BytesUtils.boolToByte(schemas != null), stream);
     if (schemas != null) {
@@ -395,7 +440,9 @@ public class Tablet {
     }
   }
 
-  /** Serialize {@link BitMap}s */
+  /**
+   * Serialize {@link BitMap}s
+   */
   private void writeBitMaps(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream);
     if (bitMaps != null) {
@@ -412,7 +459,9 @@ public class Tablet {
     }
   }
 
-  /** Serialize values */
+  /**
+   * Serialize values
+   */
   private void writeValues(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(BytesUtils.boolToByte(values != null), stream);
     if (values != null) {
@@ -475,7 +524,9 @@ public class Tablet {
     }
   }
 
-  /** Deserialize Tablet */
+  /**
+   * Deserialize Tablet
+   */
   public static Tablet deserialize(ByteBuffer byteBuffer) {
     String deviceId = ReadWriteIOUtils.readString(byteBuffer);
     int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
@@ -524,7 +575,9 @@ public class Tablet {
     return tablet;
   }
 
-  /** deserialize bitmaps */
+  /**
+   * deserialize bitmaps
+   */
   public static BitMap[] readBitMapsFromBuffer(ByteBuffer byteBuffer, int 
columns) {
     BitMap[] bitMaps = new BitMap[columns];
     for (int i = 0; i < columns; i++) {
@@ -609,10 +662,11 @@ public class Tablet {
   }
 
   /**
-   * Note that the function will judge 2 {@link Tablet}s to be equal when 
their contents are logically the
-   * same. Namely, a {@link Tablet} with {@link BitMap} "null" may be equal to 
another {@link Tablet} with 3 columns and
-   * {@link BitMap "[null, null, null]", and a {@link Tablet} with rowSize 2 
is judged identical to other {@link Tablet}s
-   * regardless of any timeStamps with indexes larger than or equal to 2.
+   * Note that the function will judge 2 {@link Tablet}s to be equal when 
their contents are
+   * logically the same. Namely, a {@link Tablet} with {@link BitMap} "null" 
may be equal to another
+   * {@link Tablet} with 3 columns and
+   * {@link BitMap "[null, null, null]", and a {@link Tablet} with rowSize 2 
is judged identical to
+   * other {@link Tablet}s regardless of any timeStamps with indexes larger 
than or equal to 2.
    *
    * @param o the tablet to compare
    * @return {@code true} if the tablets are logically equal
@@ -791,4 +845,28 @@ public class Tablet {
     }
     return true;
   }
+
+  public void setColumnTypes(List<ColumnType> columnTypes) {
+    this.columnTypes = columnTypes;
+    idColumnRange = 0;
+    for (int i = 0; i < columnTypes.size(); i++) {
+      if (columnTypes.get(i).equals(ColumnType.MEASUREMENT)) {
+        break;
+      }
+      idColumnRange ++;
+    }
+  }
+
+  public enum ColumnType {
+    ID,
+    MEASUREMENT;
+
+    public static List<ColumnType> nCopy(ColumnType type, int n) {
+      List<ColumnType> result = new ArrayList<>(n);
+      for (int i = 0; i < n; i++) {
+        result.add(type);
+      }
+      return result;
+    }
+  }
 }

Reply via email to