This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 6cc25dd375b Pipe: Convert data types when data sync sink metadata does 
not match integration test (#13326)
6cc25dd375b is described below

commit 6cc25dd375b9c270eae672d6ed9251a057357261
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Aug 29 21:14:06 2024 +0800

    Pipe: Convert data types when data sync sink metadata does not match 
integration test (#13326)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../manual/IoTDBPipeTypeConversionISessionIT.java  | 804 +++++++++++++++++++++
 .../pipe/it/manual/IoTDBPipeTypeConversionIT.java  | 608 ++++++++++++++++
 .../async/IoTDBDataRegionAsyncConnector.java       |   4 +-
 .../transform/converter/ValueConverter.java        |  78 +-
 .../statement/PipeConvertedInsertRowStatement.java |   1 +
 .../PipeConvertedInsertTabletStatement.java        |   1 +
 .../plan/statement/crud/InsertBaseStatement.java   |  17 +
 .../plan/statement/crud/InsertRowStatement.java    |  14 +
 .../plan/statement/crud/InsertTabletStatement.java |  14 +
 9 files changed, 1524 insertions(+), 17 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
new file mode 100644
index 00000000000..9af757acbde
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
@@ -0,0 +1,804 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.manual;
+
+import org.apache.iotdb.commons.utils.function.CheckedTriConsumer;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.read.common.RowRecord;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2ManualCreateSchema.class})
+public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeDualManualIT {
+  private static final int generateDataSize = 100;
+
+  @Test
+  public void insertTablet() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          senderSession.insertTablet(tablet);
+        },
+        false);
+  }
+
+  @Test
+  public void insertTabletReceiveByTsFile() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          senderSession.insertTablet(tablet);
+        },
+        true);
+  }
+
+  @Test
+  public void insertAlignedTablet() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          senderSession.insertAlignedTablet(tablet);
+        },
+        false);
+  }
+
+  @Test
+  public void insertAlignedTabletReceiveByTsFile() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          senderSession.insertAlignedTablet(tablet);
+        },
+        true);
+  }
+
+  @Test
+  public void insertRecordsReceiveByTsFile() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          senderSession.insertRecords(
+              getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+        },
+        true);
+  }
+
+  @Test
+  public void insertRecord() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          for (int i = 0; i < values.size(); i++) {
+            senderSession.insertRecord(
+                tablet.deviceId,
+                timestamps.get(i),
+                pair.left.get(i),
+                pair.right.get(i),
+                values.get(i).toArray());
+          }
+        },
+        false);
+  }
+
+  @Test
+  public void insertRecordReceiveByTsFile() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          for (int i = 0; i < values.size(); i++) {
+            senderSession.insertRecord(
+                tablet.deviceId,
+                timestamps.get(i),
+                pair.left.get(i),
+                pair.right.get(i),
+                values.get(i).toArray());
+          }
+        },
+        true);
+  }
+
+  @Test
+  public void insertAlignedRecord() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          for (int i = 0; i < values.size(); i++) {
+            senderSession.insertAlignedRecord(
+                tablet.deviceId,
+                timestamps.get(i),
+                pair.left.get(i),
+                pair.right.get(i),
+                values.get(i));
+          }
+        },
+        false);
+  }
+
+  @Test
+  public void insertAlignedRecordReceiveByTsFile() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          for (int i = 0; i < values.size(); i++) {
+            senderSession.insertAlignedRecord(
+                tablet.deviceId,
+                timestamps.get(i),
+                pair.left.get(i),
+                pair.right.get(i),
+                values.get(i));
+          }
+        },
+        true);
+  }
+
+  @Test
+  public void insertRecords() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          senderSession.insertRecords(
+              getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+        },
+        false);
+  }
+
+  @Test
+  public void insertAlignedRecords() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          senderSession.insertAlignedRecords(
+              getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+        },
+        false);
+  }
+
+  @Test
+  public void insertAlignedRecordsReceiveByTsFile() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          senderSession.insertAlignedRecords(
+              getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+        },
+        true);
+  }
+
+  @Test
+  public void insertStringRecordsOfOneDevice() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<String>> values = 
generateTabletInsertStrRecordForTable(tablet);
+          senderSession.insertStringRecordsOfOneDevice(
+              tablet.deviceId, timestamps, pair.left, values);
+        },
+        false);
+  }
+
+  @Test
+  public void insertStringRecordsOfOneDeviceReceiveByTsFile() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<String>> values = 
generateTabletInsertStrRecordForTable(tablet);
+          senderSession.insertStringRecordsOfOneDevice(
+              tablet.deviceId, timestamps, pair.left, values);
+        },
+        true);
+  }
+
+  @Test
+  public void insertAlignedStringRecordsOfOneDevice() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<String>> values = 
generateTabletInsertStrRecordForTable(tablet);
+          senderSession.insertAlignedStringRecordsOfOneDevice(
+              tablet.deviceId, timestamps, pair.left, values);
+        },
+        false);
+  }
+
+  @Test
+  public void insertAlignedStringRecordsOfOneDeviceReceiveByTsFile() {
+    prepareTypeConversionTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<String>> values = 
generateTabletInsertStrRecordForTable(tablet);
+          senderSession.insertAlignedStringRecordsOfOneDevice(
+              tablet.deviceId, timestamps, pair.left, values);
+        },
+        true);
+  }
+
+  private SessionDataSet query(
+      ISession session, List<MeasurementSchema> measurementSchemas, String 
deviceId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    String sql = "select ";
+    StringBuffer param = new StringBuffer();
+    for (IMeasurementSchema schema : measurementSchemas) {
+      param.append(schema.getMeasurementId());
+      param.append(',');
+    }
+    sql = sql + param.substring(0, param.length() - 1);
+    sql = sql + " from " + deviceId + " ORDER BY time ASC";
+    return session.executeQueryStatement(sql);
+  }
+
+  private void prepareTypeConversionTest(
+      CheckedTriConsumer<ISession, ISession, Tablet, Exception> consumer, 
boolean isTsFile) {
+    List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
+        generateMeasurementSchemas();
+
+    // Generate createTimeSeries in sender and receiver
+    String uuid = RandomStringUtils.random(8, true, false);
+    for (Pair<MeasurementSchema, MeasurementSchema> pair : measurementSchemas) 
{
+      createTimeSeries(
+          uuid.toString(), pair.left.getMeasurementId(), 
pair.left.getType().name(), senderEnv);
+      createTimeSeries(
+          uuid.toString(), pair.right.getMeasurementId(), 
pair.right.getType().name(), receiverEnv);
+    }
+
+    try (ISession senderSession = senderEnv.getSessionConnection();
+        ISession receiverSession = receiverEnv.getSessionConnection()) {
+      Tablet tablet = generateTabletAndMeasurementSchema(measurementSchemas, 
"root.test." + uuid);
+      if (isTsFile) {
+        // Send TsFile data to receiver
+        consumer.accept(senderSession, receiverSession, tablet);
+        Thread.sleep(2000);
+        createDataPipe(uuid, true);
+        senderSession.executeNonQueryStatement("flush");
+      } else {
+        // Send Tablet data to receiver
+        createDataPipe(uuid, false);
+        Thread.sleep(2000);
+        // The actual implementation logic of inserting data
+        consumer.accept(senderSession, receiverSession, tablet);
+        senderSession.executeNonQueryStatement("flush");
+      }
+
+      // Verify receiver data
+      long timeoutSeconds = 600;
+      List<List<Object>> expectedValues =
+          generateTabletResultSetForTable(tablet, measurementSchemas);
+      await()
+          .pollInSameThread()
+          .pollDelay(1L, TimeUnit.SECONDS)
+          .pollInterval(1L, TimeUnit.SECONDS)
+          .atMost(timeoutSeconds, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                try {
+                  validateResultSet(
+                      query(receiverSession, tablet.getSchemas(), 
tablet.deviceId),
+                      expectedValues,
+                      tablet.timestamps);
+                } catch (Exception e) {
+                  fail();
+                }
+              });
+      senderSession.close();
+      receiverSession.close();
+      tablet.reset();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void createTimeSeries(String diff, String measurementID, String 
dataType, BaseEnv env) {
+    String timeSeriesCreation =
+        String.format(
+            "create timeseries root.test.%s.%s with 
datatype=%s,encoding=PLAIN",
+            diff, measurementID, dataType);
+    TestUtils.tryExecuteNonQueriesWithRetry(env, 
Collections.singletonList(timeSeriesCreation));
+  }
+
+  private void createDataPipe(String diff, boolean isTSFile) {
+    String sql =
+        String.format(
+            "create pipe test%s"
+                + " with source 
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
+                + " with processor ('processor'='do-nothing-processor')"
+                + " with sink 
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
+            diff,
+            isTSFile ? "file" : "forced-log",
+            !isTSFile,
+            isTSFile,
+            receiverEnv.getIP(),
+            receiverEnv.getPort(),
+            isTSFile ? "tsfile" : "tablet");
+    TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, 
Collections.singletonList(sql));
+  }
+
+  private void validateResultSet(
+      SessionDataSet dataSet, List<List<Object>> values, long[] timestamps)
+      throws IoTDBConnectionException, StatementExecutionException {
+    int index = 0;
+    while (dataSet.hasNext()) {
+      RowRecord record = dataSet.next();
+      List<Field> fields = record.getFields();
+
+      assertEquals(record.getTimestamp(), timestamps[index]);
+      List<Object> rowValues = values.get(index++);
+      for (int i = 0; i < fields.size(); i++) {
+        Field field = fields.get(i);
+        switch (field.getDataType()) {
+          case INT64:
+          case TIMESTAMP:
+            assertEquals(field.getLongV(), (long) rowValues.get(i));
+            break;
+          case DATE:
+            assertEquals(field.getDateV(), rowValues.get(i));
+            break;
+          case BLOB:
+            assertEquals(field.getBinaryV(), rowValues.get(i));
+            break;
+          case TEXT:
+          case STRING:
+            assertEquals(field.getStringValue(), rowValues.get(i));
+            break;
+          case INT32:
+            assertEquals(field.getIntV(), (int) rowValues.get(i));
+            break;
+          case DOUBLE:
+            assertEquals(0, Double.compare(field.getDoubleV(), (double) 
rowValues.get(i)));
+            break;
+          case FLOAT:
+            assertEquals(0, Float.compare(field.getFloatV(), (float) 
rowValues.get(i)));
+            break;
+        }
+      }
+    }
+    assertEquals(values.size(), index);
+  }
+
+  private boolean[] createTestDataForBoolean() {
+    boolean[] data = new boolean[generateDataSize];
+    Random random = new Random();
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextBoolean();
+    }
+    return data;
+  }
+
+  private int[] createTestDataForInt32() {
+    int[] data = new int[generateDataSize];
+    Random random = new Random();
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt();
+    }
+    return data;
+  }
+
+  private long[] createTestDataForInt64() {
+    long[] data = new long[generateDataSize];
+    Random random = new Random();
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextLong();
+    }
+    return data;
+  }
+
+  private float[] createTestDataForFloat() {
+    float[] data = new float[generateDataSize];
+    Random random = new Random();
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextFloat();
+    }
+    return data;
+  }
+
+  private double[] createTestDataForDouble() {
+    double[] data = new double[generateDataSize];
+    Random random = new Random();
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextDouble();
+    }
+    return data;
+  }
+
+  private long[] createTestDataForTimestamp() {
+    long[] data = new long[generateDataSize];
+    long time = new Date().getTime();
+    for (int i = 0; i < data.length; i++) {
+      data[i] = time + i;
+    }
+    return data;
+  }
+
+  private LocalDate[] createTestDataForDate() {
+    LocalDate[] data = new LocalDate[generateDataSize];
+    int year = 2023;
+    int month = 1;
+    int day = 1;
+    for (int i = 0; i < data.length; i++) {
+      data[i] = DateUtils.parseIntToLocalDate(year * 10000 + (month * 100) + 
day);
+      // update
+      day++;
+      if (day > 28) {
+        day = 1;
+        month++;
+        if (month > 12) {
+          month = 1;
+          year++;
+        }
+      }
+    }
+    return data;
+  }
+
+  private Binary[] createTestDataForString() {
+    String[] stringData = {
+      "Hello",
+      "Hello World!",
+      "This is a test.",
+      "IoTDB Hello World!!!!",
+      "IoTDB is an excellent time series database!!!!!!!!!",
+      "12345678910!!!!!!!!",
+      "123456",
+      "1234567.123213",
+      "21232131.21",
+      "enable =  true",
+      "true",
+      "false",
+      "12345678910",
+      "123231232132131233213123123123123123131312",
+      "123231232132131233213123123123123123131312.212312321312312",
+    };
+    Binary[] data = new Binary[generateDataSize];
+    for (int i = 0; i < data.length; i++) {
+      data[i] =
+          new Binary(stringData[(i % 
stringData.length)].getBytes(TSFileConfig.STRING_CHARSET));
+    }
+    return data;
+  }
+
+  private List<Long> getTimestampList(Tablet tablet) {
+    long[] timestamps = tablet.timestamps;
+    List<Long> data = new ArrayList<>(timestamps.length);
+    for (long timestamp : timestamps) {
+      data.add(timestamp);
+    }
+    return data;
+  }
+
+  private Pair<List<List<String>>, List<List<TSDataType>>> 
getMeasurementSchemasAndType(
+      Tablet tablet) {
+    List<List<String>> schemaData = new ArrayList<>(tablet.rowSize);
+    List<List<TSDataType>> typeData = new ArrayList<>(tablet.rowSize);
+    List<String> measurementSchemas = new 
ArrayList<>(tablet.getSchemas().size());
+    List<TSDataType> types = new ArrayList<>(tablet.rowSize);
+    for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
+      measurementSchemas.add(measurementSchema.getMeasurementId());
+      types.add(measurementSchema.getType());
+    }
+
+    for (int i = 0; i < tablet.rowSize; i++) {
+      schemaData.add(measurementSchemas);
+      typeData.add(types);
+    }
+
+    return new Pair<>(schemaData, typeData);
+  }
+
+  private List<String> getDeviceID(Tablet tablet) {
+    List<String> data = new ArrayList<>(tablet.rowSize);
+    for (int i = 0; i < tablet.rowSize; i++) {
+      data.add(tablet.deviceId);
+    }
+    return data;
+  }
+
+  private List<List<Object>> generateTabletResultSetForTable(
+      final Tablet tablet, List<Pair<MeasurementSchema, MeasurementSchema>> 
pairs) {
+    List<List<Object>> insertRecords = new ArrayList<>(tablet.rowSize);
+    final List<MeasurementSchema> schemas = tablet.getSchemas();
+    final Object[] values = tablet.values;
+    for (int i = 0; i < tablet.rowSize; i++) {
+      List<Object> insertRecord = new ArrayList<>();
+      for (int j = 0; j < schemas.size(); j++) {
+        TSDataType sourceType = pairs.get(j).left.getType();
+        TSDataType targetType = pairs.get(j).right.getType();
+        Object value = null;
+        switch (sourceType) {
+          case INT64:
+          case TIMESTAMP:
+            value = ValueConverter.convert(sourceType, targetType, ((long[]) 
values[j])[i]);
+            insertRecord.add(convert(value, targetType));
+            break;
+          case INT32:
+            value = ValueConverter.convert(sourceType, targetType, ((int[]) 
values[j])[i]);
+            insertRecord.add(convert(value, targetType));
+            break;
+          case DOUBLE:
+            value = ValueConverter.convert(sourceType, targetType, ((double[]) 
values[j])[i]);
+            insertRecord.add(convert(value, targetType));
+            break;
+          case FLOAT:
+            value = ValueConverter.convert(sourceType, targetType, ((float[]) 
values[j])[i]);
+            insertRecord.add(convert(value, targetType));
+            break;
+          case DATE:
+            value =
+                ValueConverter.convert(
+                    sourceType,
+                    targetType,
+                    DateUtils.parseDateExpressionToInt(((LocalDate[]) 
values[j])[i]));
+            insertRecord.add(convert(value, targetType));
+            break;
+          case TEXT:
+          case STRING:
+            value = ValueConverter.convert(sourceType, targetType, ((Binary[]) 
values[j])[i]);
+            insertRecord.add(convert(value, targetType));
+            break;
+          case BLOB:
+            value = ValueConverter.convert(sourceType, targetType, ((Binary[]) 
values[j])[i]);
+            insertRecord.add(convert(value, targetType));
+            break;
+          case BOOLEAN:
+            value = ValueConverter.convert(sourceType, targetType, 
((boolean[]) values[j])[i]);
+            insertRecord.add(convert(value, targetType));
+            break;
+        }
+      }
+      insertRecords.add(insertRecord);
+    }
+
+    return insertRecords;
+  }
+
+  private Object convert(Object value, TSDataType targetType) {
+    switch (targetType) {
+      case DATE:
+        return DateUtils.parseIntToLocalDate((Integer) value);
+      case TEXT:
+      case STRING:
+        return new String(((Binary) value).getValues(), 
TSFileConfig.STRING_CHARSET);
+    }
+    return value;
+  }
+
+  private List<List<Object>> generateTabletInsertRecordForTable(final Tablet 
tablet) {
+    List<List<Object>> insertRecords = new ArrayList<>(tablet.rowSize);
+    final List<MeasurementSchema> schemas = tablet.getSchemas();
+    final Object[] values = tablet.values;
+    for (int i = 0; i < tablet.rowSize; i++) {
+      List<Object> insertRecord = new ArrayList<>();
+      for (int j = 0; j < schemas.size(); j++) {
+        switch (schemas.get(j).getType()) {
+          case INT64:
+          case TIMESTAMP:
+            insertRecord.add(((long[]) values[j])[i]);
+            break;
+          case INT32:
+            insertRecord.add(((int[]) values[j])[i]);
+            break;
+          case DOUBLE:
+            insertRecord.add(((double[]) values[j])[i]);
+            break;
+          case FLOAT:
+            insertRecord.add(((float[]) values[j])[i]);
+            break;
+          case DATE:
+            insertRecord.add(((LocalDate[]) values[j])[i]);
+            break;
+          case TEXT:
+          case STRING:
+            insertRecord.add(
+                new String(((Binary[]) values[j])[i].getValues(), 
TSFileConfig.STRING_CHARSET));
+            break;
+          case BLOB:
+            insertRecord.add(((Binary[]) values[j])[i]);
+            break;
+          case BOOLEAN:
+            insertRecord.add(((boolean[]) values[j])[i]);
+            break;
+        }
+      }
+      insertRecords.add(insertRecord);
+    }
+
+    return insertRecords;
+  }
+
+  private List<List<String>> generateTabletInsertStrRecordForTable(Tablet 
tablet) {
+    List<List<String>> insertRecords = new ArrayList<>(tablet.rowSize);
+    final List<MeasurementSchema> schemas = tablet.getSchemas();
+    final Object[] values = tablet.values;
+    for (int i = 0; i < tablet.rowSize; i++) {
+      List<String> insertRecord = new ArrayList<>();
+      for (int j = 0; j < schemas.size(); j++) {
+        switch (schemas.get(j).getType()) {
+          case INT64:
+            insertRecord.add(String.valueOf(((long[]) values[j])[i]));
+            break;
+          case TIMESTAMP:
+            insertRecord.add(
+                RpcUtils.formatDatetime("default", "ms", ((long[]) 
values[j])[i], ZoneOffset.UTC));
+            break;
+          case INT32:
+            insertRecord.add(String.valueOf(((int[]) values[j])[i]));
+            break;
+          case DOUBLE:
+            insertRecord.add(String.valueOf(((double[]) values[j])[i]));
+            break;
+          case FLOAT:
+            insertRecord.add(String.valueOf(((float[]) values[j])[i]));
+            break;
+          case DATE:
+            insertRecord.add(((LocalDate[]) values[j])[i].toString());
+            break;
+          case TEXT:
+          case STRING:
+            insertRecord.add(
+                new String(((Binary[]) values[j])[i].getValues(), 
TSFileConfig.STRING_CHARSET));
+            break;
+          case BLOB:
+            String value =
+                BytesUtils.parseBlobByteArrayToString(((Binary[]) 
values[j])[i].getValues())
+                    .substring(2);
+            insertRecord.add(String.format("X'%s'", value));
+            break;
+          case BOOLEAN:
+            insertRecord.add(String.valueOf(((boolean[]) values[j])[i]));
+            break;
+        }
+      }
+      insertRecords.add(insertRecord);
+    }
+
+    return insertRecords;
+  }
+
+  private Tablet generateTabletAndMeasurementSchema(
+      List<Pair<MeasurementSchema, MeasurementSchema>> pairs, String deviceId) 
{
+    long[] timestamp = createTestDataForTimestamp();
+    Object[] objects = new Object[pairs.size()];
+    List<MeasurementSchema> measurementSchemas = new ArrayList<>(pairs.size());
+    BitMap[] bitMaps = new BitMap[pairs.size()];
+    for (int i = 0; i < bitMaps.length; i++) {
+      bitMaps[i] = new BitMap(generateDataSize);
+    }
+    for (int i = 0; i < objects.length; i++) {
+      MeasurementSchema schema = pairs.get(i).left;
+      measurementSchemas.add(schema);
+      switch (schema.getType()) {
+        case INT64:
+          objects[i] = createTestDataForInt64();
+          break;
+        case INT32:
+          objects[i] = createTestDataForInt32();
+          break;
+        case TIMESTAMP:
+          objects[i] = createTestDataForTimestamp();
+          break;
+        case DOUBLE:
+          objects[i] = createTestDataForDouble();
+          break;
+        case FLOAT:
+          objects[i] = createTestDataForFloat();
+          break;
+        case DATE:
+          objects[i] = createTestDataForDate();
+          break;
+        case STRING:
+        case BLOB:
+        case TEXT:
+          objects[i] = createTestDataForString();
+          break;
+        case BOOLEAN:
+          objects[i] = createTestDataForBoolean();
+          break;
+      }
+    }
+    return new Tablet(deviceId, measurementSchemas, timestamp, objects, 
bitMaps, generateDataSize);
+  }
+
+  private List<Pair<MeasurementSchema, MeasurementSchema>> 
generateMeasurementSchemas() {
+    TSDataType[] dataTypes = {
+      TSDataType.STRING,
+      TSDataType.TEXT,
+      TSDataType.BLOB,
+      TSDataType.TIMESTAMP,
+      TSDataType.BOOLEAN,
+      TSDataType.DATE,
+      TSDataType.DOUBLE,
+      TSDataType.FLOAT,
+      TSDataType.INT32,
+      TSDataType.INT64
+    };
+    List<Pair<MeasurementSchema, MeasurementSchema>> pairs = new ArrayList<>();
+
+    for (TSDataType type : dataTypes) {
+      for (TSDataType dataType : dataTypes) {
+        String id = String.format("%s2%s", type.name(), dataType.name());
+        pairs.add(new Pair<>(new MeasurementSchema(id, type), new 
MeasurementSchema(id, dataType)));
+      }
+    }
+    return pairs;
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionIT.java
new file mode 100644
index 00000000000..8dfde314dfa
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionIT.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.manual;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2ManualCreateSchema.class})
+public class IoTDBPipeTypeConversionIT extends AbstractPipeDualManualIT {
+
+  private static final int generateDataSize = 100;
+
+  // Test for converting BOOLEAN to OtherType
+  @Test
+  public void testBooleanToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.INT32);
+    executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.INT64);
+    executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.FLOAT);
+    executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.DOUBLE);
+    executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.TEXT);
+    executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.TIMESTAMP);
+    executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.BLOB);
+    executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.STRING);
+    executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.DATE);
+  }
+
+  // Test for converting INT32 to OtherType
+  @Test
+  public void testInt32ToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.BOOLEAN);
+    executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.INT64);
+    executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.FLOAT);
+    executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.DOUBLE);
+    executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.TEXT);
+    executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.TIMESTAMP);
+    executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.BLOB);
+    executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.STRING);
+    executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.DATE);
+  }
+
+  // Test for converting INT64 to OtherType
+  @Test
+  public void testInt64ToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.BOOLEAN);
+    executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.INT32);
+    executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.FLOAT);
+    executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.DOUBLE);
+    executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.TEXT);
+    executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.TIMESTAMP);
+    executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.BLOB);
+    executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.STRING);
+    executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.DATE);
+  }
+
+  // Test for converting FLOAT to OtherType
+  @Test
+  public void testFloatToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.BOOLEAN);
+    executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.INT32);
+    executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.INT64);
+    executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.DOUBLE);
+    executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.TEXT);
+    executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.TIMESTAMP);
+    executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.BLOB);
+    executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.STRING);
+    executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.DATE);
+  }
+
+  // Test for converting DOUBLE to OtherType
+  @Test
+  public void testDoubleToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.BOOLEAN);
+    executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.INT32);
+    executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.INT64);
+    executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.FLOAT);
+    executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.TEXT);
+    executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.TIMESTAMP);
+    executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.BLOB);
+    executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.STRING);
+    executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.DATE);
+  }
+
+  // Test for converting TEXT to OtherType
+  @Test
+  public void testTextToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.BLOB);
+    executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.STRING);
+    executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.BOOLEAN);
+    executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.INT32);
+    executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.INT64);
+    executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.FLOAT);
+    executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.DOUBLE);
+    executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.TIMESTAMP);
+    executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.DATE);
+  }
+
+  // Test for converting TIMESTAMP to OtherType
+  @Test
+  public void testTimestampToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.BOOLEAN);
+    executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.INT32);
+    executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.INT64);
+    executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.FLOAT);
+    executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.DOUBLE);
+    executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.TEXT);
+    executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.BLOB);
+    executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.STRING);
+    executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.DATE);
+  }
+
+  // Test for converting DATE to OtherType
+  @Test
+  public void testDateToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.BOOLEAN);
+    executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.INT32);
+    executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.INT64);
+    executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.FLOAT);
+    executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.DOUBLE);
+    executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.TEXT);
+    executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.STRING);
+    executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.TIMESTAMP);
+  }
+
+  // Test for converting BLOB to OtherType
+  @Test
+  public void testBlobToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.TEXT);
+    executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.STRING);
+    executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.BOOLEAN);
+    executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.INT32);
+    executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.INT64);
+    executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.FLOAT);
+    executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.DOUBLE);
+    executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.TIMESTAMP);
+    executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.DATE);
+  }
+
+  // Test for converting STRING to OtherType
+  @Test
+  public void testStringToOtherTypeConversion() {
+    executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.TEXT);
+    executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.BLOB);
+    executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.BOOLEAN);
+    executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.INT32);
+    executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.INT64);
+    executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.FLOAT);
+    executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.DOUBLE);
+    executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.TIMESTAMP);
+  }
+
+  private void executeAndVerifyTypeConversion(TSDataType source, TSDataType 
target) {
+    List<Pair> pairs = prepareTypeConversionTest(source, target);
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        String.format("select * from root.%s2%s.**", source.name(), 
target.name()),
+        String.format("Time,root.%s2%s.test.status,", source.name(), 
target.name()),
+        createExpectedResultSet(pairs, source, target),
+        30);
+  }
+
+  private List<Pair> prepareTypeConversionTest(TSDataType sourceType, 
TSDataType targetType) {
+    String sourceTypeName = sourceType.name();
+    String targetTypeName = targetType.name();
+
+    createTimeSeries(sourceTypeName, targetTypeName, sourceTypeName, 
senderEnv);
+    createTimeSeries(sourceTypeName, targetTypeName, targetTypeName, 
receiverEnv);
+
+    createDataPipe(sourceTypeName, targetTypeName);
+
+    List<Pair> pairs = createTestDataForType(sourceTypeName);
+
+    // wait pipe start
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    executeDataInsertions(pairs, sourceType, targetType);
+    return pairs;
+  }
+
+  private void createTimeSeries(
+      String sourceTypeName, String targetTypeName, String dataType, BaseEnv 
env) {
+    String timeSeriesCreationQuery =
+        String.format(
+            "create timeseries root.%s2%s.test.status with 
datatype=%s,encoding=PLAIN",
+            sourceTypeName, targetTypeName, dataType);
+    TestUtils.tryExecuteNonQueriesWithRetry(
+        env, Collections.singletonList(timeSeriesCreationQuery));
+  }
+
+  private void createDataPipe(String sourceTypeName, String targetTypeName) {
+    String sql =
+        String.format(
+            "create pipe %s2%s"
+                + " with source 
('source'='iotdb-source','source.path'='root.%s2%s.**','realtime.mode'='forced-log','realtime.enable'='true','history.enable'='false')"
+                + " with processor ('processor'='do-nothing-processor')"
+                + " with sink 
('node-urls'='%s:%s','batch.enable'='false','sink.format'='tablet')",
+            sourceTypeName,
+            targetTypeName,
+            sourceTypeName,
+            targetTypeName,
+            receiverEnv.getIP(),
+            receiverEnv.getPort());
+    TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, 
Collections.singletonList(sql));
+  }
+
+  private List<Pair> createTestDataForType(String sourceType) {
+    switch (sourceType) {
+      case "BOOLEAN":
+        return createTestDataForBoolean();
+      case "INT32":
+        return createTestDataForInt32();
+      case "INT64":
+        return createTestDataForInt64();
+      case "FLOAT":
+        return createTestDataForFloat();
+      case "DOUBLE":
+        return createTestDataForDouble();
+      case "TEXT":
+        return createTestDataForText();
+      case "TIMESTAMP":
+        return createTestDataForTimestamp();
+      case "DATE":
+        return createTestDataForDate();
+      case "BLOB":
+        return createTestDataForBlob();
+      case "STRING":
+        return createTestDataForString();
+      default:
+        throw new UnsupportedOperationException("Unsupported data type: " + 
sourceType);
+    }
+  }
+
+  private void executeDataInsertions(
+      List<Pair> testData, TSDataType sourceType, TSDataType targetType) {
+    switch (sourceType) {
+      case STRING:
+      case TEXT:
+        TestUtils.tryExecuteNonQueriesWithRetry(
+            senderEnv,
+            createInsertStatementsForString(testData, sourceType.name(), 
targetType.name()));
+        return;
+      case TIMESTAMP:
+        TestUtils.tryExecuteNonQueriesWithRetry(
+            senderEnv,
+            createInsertStatementsForTimestamp(testData, sourceType.name(), 
targetType.name()));
+        return;
+      case DATE:
+        TestUtils.tryExecuteNonQueriesWithRetry(
+            senderEnv,
+            createInsertStatementsForLocalDate(testData, sourceType.name(), 
targetType.name()));
+        return;
+      case BLOB:
+        TestUtils.tryExecuteNonQueriesWithRetry(
+            senderEnv,
+            createInsertStatementsForBlob(testData, sourceType.name(), 
targetType.name()));
+        return;
+      default:
+        TestUtils.tryExecuteNonQueriesWithRetry(
+            senderEnv,
+            createInsertStatementsForNumeric(testData, sourceType.name(), 
targetType.name()));
+    }
+  }
+
+  private List<String> createInsertStatementsForString(
+      List<Pair> testData, String sourceType, String targetType) {
+    List<String> executes = new ArrayList<>();
+    for (Pair pair : testData) {
+      executes.add(
+          String.format(
+              "insert into root.%s2%s.test(timestamp,status) values 
(%s,'%s');",
+              sourceType,
+              targetType,
+              pair.left,
+              new String(((Binary) (pair.right)).getValues(), 
StandardCharsets.UTF_8)));
+    }
+    executes.add("flush");
+    return executes;
+  }
+
+  private List<String> createInsertStatementsForNumeric(
+      List<Pair> testData, String sourceType, String targetType) {
+    List<String> executes = new ArrayList<>();
+    for (Pair pair : testData) {
+      executes.add(
+          String.format(
+              "insert into root.%s2%s.test(timestamp,status) values (%s,%s);",
+              sourceType, targetType, pair.left, pair.right));
+    }
+    executes.add("flush");
+    return executes;
+  }
+
+  private List<String> createInsertStatementsForTimestamp(
+      List<Pair> testData, String sourceType, String targetType) {
+    List<String> executes = new ArrayList<>();
+    for (Pair pair : testData) {
+      executes.add(
+          String.format(
+              "insert into root.%s2%s.test(timestamp,status) values (%s,%s);",
+              sourceType, targetType, pair.left, pair.right));
+    }
+    executes.add("flush");
+    return executes;
+  }
+
+  private List<String> createInsertStatementsForLocalDate(
+      List<Pair> testData, String sourceType, String targetType) {
+    List<String> executes = new ArrayList<>();
+    for (Pair pair : testData) {
+      executes.add(
+          String.format(
+              "insert into root.%s2%s.test(timestamp,status) values 
(%s,'%s');",
+              sourceType, targetType, pair.left, 
DateUtils.formatDate((Integer) pair.right)));
+    }
+    executes.add("flush");
+    return executes;
+  }
+
+  private List<String> createInsertStatementsForBlob(
+      List<Pair> testData, String sourceType, String targetType) {
+    List<String> executes = new ArrayList<>();
+    for (Pair pair : testData) {
+      String value = BytesUtils.parseBlobByteArrayToString(((Binary) 
pair.right).getValues());
+      executes.add(
+          String.format(
+              "insert into root.%s2%s.test(timestamp,status) values 
(%s,X'%s');",
+              sourceType, targetType, pair.left, value.substring(2)));
+    }
+    executes.add("flush");
+    return executes;
+  }
+
+  private Set<String> createExpectedResultSet(
+      List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+    switch (targetType) {
+      case TIMESTAMP:
+        return generateTimestampResultSet(pairs, sourceType, targetType);
+      case DATE:
+        return generateLocalDateResultSet(pairs, sourceType, targetType);
+      case BLOB:
+        return generateBlobResultSet(pairs, sourceType, targetType);
+      case TEXT:
+      case STRING:
+        return generateStringResultSet(pairs, sourceType, targetType);
+      default:
+        HashSet<String> resultSet = new HashSet<>();
+        for (Pair pair : pairs) {
+          resultSet.add(
+              String.format(
+                  "%s,%s,", pair.left, ValueConverter.convert(sourceType, 
targetType, pair.right)));
+        }
+        return resultSet;
+    }
+  }
+
+  private Set<String> generateTimestampResultSet(
+      List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+    HashSet<String> resultSet = new HashSet<>();
+    for (Pair pair : pairs) {
+      resultSet.add(
+          String.format(
+              "%s,%s,", pair.left, ValueConverter.convert(sourceType, 
targetType, pair.right)));
+    }
+    return resultSet;
+  }
+
+  private Set<String> generateLocalDateResultSet(
+      List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+    HashSet<String> resultSet = new HashSet<>();
+    for (Pair pair : pairs) {
+      resultSet.add(
+          String.format(
+              "%s,%s,",
+              pair.left,
+              DateUtils.formatDate(
+                  (Integer) ValueConverter.convert(sourceType, targetType, 
pair.right))));
+    }
+    return resultSet;
+  }
+
+  private Set<String> generateBlobResultSet(
+      List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+    HashSet<String> resultSet = new HashSet<>();
+    for (Pair pair : pairs) {
+      resultSet.add(
+          String.format(
+              "%s,%s,",
+              pair.left,
+              BytesUtils.parseBlobByteArrayToString(
+                  ((Binary) ValueConverter.convert(sourceType, targetType, 
pair.right))
+                      .getValues())));
+    }
+    return resultSet;
+  }
+
+  private Set<String> generateStringResultSet(
+      List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+    HashSet<String> resultSet = new HashSet<>();
+    for (Pair pair : pairs) {
+      resultSet.add(
+          String.format(
+              "%s,%s,",
+              pair.left,
+              new String(
+                  ((Binary) ValueConverter.convert(sourceType, targetType, 
pair.right)).getValues(),
+                  StandardCharsets.UTF_8)));
+    }
+    return resultSet;
+  }
+
+  private List<Pair> createTestDataForBoolean() {
+    List<Pair> pairs = new java.util.ArrayList<>();
+    Random random = new Random();
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, random.nextBoolean()));
+    }
+    return pairs;
+  }
+
+  private List<Pair> createTestDataForInt32() {
+    List<Pair> pairs = new ArrayList<>();
+    Random random = new Random();
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, random.nextInt()));
+    }
+    pairs.add(new Pair<>(generateDataSize + 1, -1));
+    pairs.add(new Pair<>(generateDataSize + 2, -2));
+    pairs.add(new Pair<>(generateDataSize + 3, -3));
+    return pairs;
+  }
+
+  private List<Pair> createTestDataForInt64() {
+    List<Pair> pairs = new ArrayList<>();
+    Random random = new Random();
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, random.nextLong()));
+    }
+    pairs.add(new Pair<>(generateDataSize + 1, -1L));
+    pairs.add(new Pair<>(generateDataSize + 2, -2L));
+    pairs.add(new Pair<>(generateDataSize + 3, -3L));
+    return pairs;
+  }
+
+  private List<Pair> createTestDataForFloat() {
+    List<Pair> pairs = new ArrayList<>();
+    Random random = new Random();
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, random.nextFloat()));
+    }
+    return pairs;
+  }
+
+  private List<Pair> createTestDataForDouble() {
+    List<Pair> pairs = new ArrayList<>();
+    Random random = new Random();
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, random.nextDouble()));
+    }
+    return pairs;
+  }
+
+  private List<Pair> createTestDataForText() {
+    List<Pair> pairs = new ArrayList<>();
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, new 
Binary((String.valueOf(i)).getBytes(StandardCharsets.UTF_8))));
+    }
+    pairs.add(new Pair(generateDataSize + 1, new 
Binary("Hello".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 2, new Binary("Hello 
World!".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 3, new Binary("This is a 
test.".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 4,
+            new Binary("IoTDB Hello 
World!!!!".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 5,
+            new Binary(
+                "IoTDB is an excellent time series database!!!!!!!!!"
+                    .getBytes(StandardCharsets.UTF_8))));
+    return pairs;
+  }
+
+  private List<Pair> createTestDataForTimestamp() {
+    List<Pair> pairs = new ArrayList<>();
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, new Date().getTime() + i));
+    }
+    return pairs;
+  }
+
+  private List<Pair> createTestDataForDate() {
+    List<Pair> pairs = new ArrayList<>();
+    int year = 2023;
+    int month = 1;
+    int day = 1;
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, year * 10000 + (month * 100) + day));
+
+      // update
+      day++;
+      if (day > 28) {
+        day = 1;
+        month++;
+        if (month > 12) {
+          month = 1;
+          year++;
+        }
+      }
+    }
+    return pairs;
+  }
+
+  private List<Pair> createTestDataForBlob() {
+    List<Pair> pairs = new ArrayList<>();
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, new 
Binary((String.valueOf(i)).getBytes(StandardCharsets.UTF_8))));
+    }
+    pairs.add(new Pair(generateDataSize + 1, new 
Binary("Hello".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 2, new Binary("Hello 
World!".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 3, new Binary("This is a 
test.".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 4,
+            new Binary("IoTDB Hello 
World!!!!".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 5,
+            new Binary(
+                "IoTDB is an excellent time series database!!!!!!!!!"
+                    .getBytes(StandardCharsets.UTF_8))));
+    return pairs;
+  }
+
+  private List<Pair> createTestDataForString() {
+    List<Pair> pairs = new ArrayList<>();
+    for (long i = 0; i < generateDataSize; i++) {
+      pairs.add(new Pair<>(i, new 
Binary((String.valueOf(i)).getBytes(StandardCharsets.UTF_8))));
+    }
+    pairs.add(new Pair(generateDataSize + 1, new 
Binary("Hello".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 2, new Binary("Hello 
World!".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 3, new Binary("This is a 
test.".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 4,
+            new Binary("IoTDB Hello 
World!!!!".getBytes(StandardCharsets.UTF_8))));
+    pairs.add(
+        new Pair(
+            generateDataSize + 5,
+            new Binary(
+                "IoTDB is an excellent time series database!!!!!!!!!"
+                    .getBytes(StandardCharsets.UTF_8))));
+    return pairs;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 394e2e95d2b..083f12485ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -518,7 +518,9 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
   @Override
   public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
-    tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+    if (isTabletBatchModeEnabled) {
+      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+    }
     retryEventQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
index 098792fd0b3..db38102ca9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
@@ -26,8 +26,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.DateUtils;
 
-import java.time.ZoneId;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
 
 public class ValueConverter {
 
@@ -299,6 +303,11 @@ public class ValueConverter {
 
   private static final Binary BINARY_TRUE = 
parseString(Boolean.TRUE.toString());
   private static final Binary BINARY_FALSE = 
parseString(Boolean.FALSE.toString());
+  private static final int TRUE_DATE = 
DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 2));
+  private static final int FALSE_DATE =
+      DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 1));
+  private static final int DEFAULT_DATE =
+      DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 1));
 
   public static int convertBooleanToInt32(final boolean value) {
     return value ? 1 : 0;
@@ -325,7 +334,7 @@ public class ValueConverter {
   }
 
   public static int convertBooleanToDate(final boolean value) {
-    return value ? 1 : 0;
+    return value ? TRUE_DATE : FALSE_DATE;
   }
 
   public static Binary convertBooleanToBlob(final boolean value) {
@@ -363,7 +372,12 @@ public class ValueConverter {
   }
 
   public static int convertInt32ToDate(final int value) {
-    return value;
+    try {
+      DateUtils.parseIntToLocalDate(value);
+      return value;
+    } catch (Exception e) {
+      return DEFAULT_DATE;
+    }
   }
 
   public static Binary convertInt32ToBlob(final int value) {
@@ -401,7 +415,13 @@ public class ValueConverter {
   }
 
   public static int convertInt64ToDate(final long value) {
-    return (int) value;
+    try {
+      int data = (int) value;
+      DateUtils.parseIntToLocalDate(data);
+      return data;
+    } catch (Exception e) {
+      return DEFAULT_DATE;
+    }
   }
 
   public static Binary convertInt64ToBlob(final long value) {
@@ -439,7 +459,13 @@ public class ValueConverter {
   }
 
   public static int convertFloatToDate(final float value) {
-    return (int) value;
+    try {
+      int data = (int) value;
+      DateUtils.parseIntToLocalDate(data);
+      return data;
+    } catch (Exception e) {
+      return DEFAULT_DATE;
+    }
   }
 
   public static Binary convertFloatToBlob(final float value) {
@@ -477,7 +503,13 @@ public class ValueConverter {
   }
 
   public static int convertDoubleToDate(final double value) {
-    return (int) value;
+    try {
+      int data = (int) value;
+      DateUtils.parseIntToLocalDate(data);
+      return data;
+    } catch (Exception e) {
+      return DEFAULT_DATE;
+    }
   }
 
   public static Binary convertDoubleToBlob(final double value) {
@@ -553,7 +585,12 @@ public class ValueConverter {
   }
 
   public static int convertTimestampToDate(final long value) {
-    return (int) value;
+    try {
+      Instant instant = Instant.ofEpochMilli(value);
+      return 
DateUtils.parseDateExpressionToInt(instant.atZone(ZoneOffset.UTC).toLocalDate());
+    } catch (Exception e) {
+      return DEFAULT_DATE;
+    }
   }
 
   public static Binary convertTimestampToBlob(final long value) {
@@ -567,7 +604,7 @@ public class ValueConverter {
   ///////////// DATE //////////////
 
   public static boolean convertDateToBoolean(final int value) {
-    return value != 0;
+    return value != FALSE_DATE;
   }
 
   public static int convertDateToInt32(final int value) {
@@ -591,7 +628,14 @@ public class ValueConverter {
   }
 
   public static long convertDateToTimestamp(final int value) {
-    return value;
+    try {
+      LocalDate date = DateUtils.parseIntToLocalDate(value);
+      ZonedDateTime dateTime = date.atStartOfDay(ZoneOffset.UTC);
+      Instant instant = dateTime.toInstant();
+      return instant.toEpochMilli();
+    } catch (Exception e) {
+      return 0L;
+    }
   }
 
   public static Binary convertDateToBlob(final int value) {
@@ -753,8 +797,7 @@ public class ValueConverter {
     try {
       return TypeInferenceUtils.isNumber(value)
           ? Long.parseLong(value)
-          : DateTimeUtils.parseDateTimeExpressionToLong(
-              StringUtils.trim(value), ZoneId.systemDefault());
+          : 
DateTimeUtils.parseDateTimeExpressionToLong(StringUtils.trim(value), 
ZoneOffset.UTC);
     } catch (final Exception e) {
       return 0L;
     }
@@ -762,14 +805,17 @@ public class ValueConverter {
 
   private static int parseDate(final String value) {
     if (value == null || value.isEmpty()) {
-      return 0;
+      return DEFAULT_DATE;
     }
     try {
-      return TypeInferenceUtils.isNumber(value)
-          ? Integer.parseInt(value)
-          : DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value));
+      if (TypeInferenceUtils.isNumber(value)) {
+        int date = Integer.parseInt(value);
+        DateUtils.parseIntToLocalDate(date);
+        return date;
+      }
+      return DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value));
     } catch (final Exception e) {
-      return 0;
+      return DEFAULT_DATE;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
index 3df76b79ce4..6ae0e2d9fa4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -42,6 +42,7 @@ public class PipeConvertedInsertRowStatement extends 
InsertRowStatement {
     // Statement
     isDebug = insertRowStatement.isDebug();
     // InsertBaseStatement
+    insertRowStatement.removeAllFailedMeasurementMarks();
     devicePath = insertRowStatement.getDevicePath();
     isAligned = insertRowStatement.isAligned();
     measurementSchemas = insertRowStatement.getMeasurementSchemas();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
index 2481034cbae..a5013725061 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -36,6 +36,7 @@ public class PipeConvertedInsertTabletStatement extends 
InsertTabletStatement {
     // Statement
     isDebug = insertTabletStatement.isDebug();
     // InsertBaseStatement
+    insertTabletStatement.removeAllFailedMeasurementMarks();
     devicePath = insertTabletStatement.getDevicePath();
     isAligned = insertTabletStatement.isAligned();
     measurementSchemas = insertTabletStatement.getMeasurementSchemas();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 88e5b7735cf..048e75afebc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -229,6 +229,11 @@ public abstract class InsertBaseStatement extends 
Statement {
     throw new UnsupportedOperationException();
   }
 
+  /** * Resets the state of all measurements marked as failed, clearing the 
failure records. */
+  public void removeAllFailedMeasurementMarks() {
+    throw new UnsupportedOperationException();
+  }
+
   public boolean hasValidMeasurements() {
     for (Object o : measurements) {
       if (o != null) {
@@ -290,6 +295,18 @@ public abstract class InsertBaseStatement extends 
Statement {
       this.value = value;
       this.cause = cause;
     }
+
+    public String getMeasurement() {
+      return measurement;
+    }
+
+    public TSDataType getDataType() {
+      return dataType;
+    }
+
+    public Object getValue() {
+      return value;
+    }
   }
 
   // endregion
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index bb67086456a..c754290b959 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -261,6 +261,20 @@ public class InsertRowStatement extends 
InsertBaseStatement implements ISchemaVa
     values[index] = null;
   }
 
+  @Override
+  public void removeAllFailedMeasurementMarks() {
+    if (failedMeasurementIndex2Info == null) {
+      return;
+    }
+    failedMeasurementIndex2Info.forEach(
+        (index, info) -> {
+          measurements[index] = info.getMeasurement();
+          dataTypes[index] = info.getDataType();
+          values[index] = info.getValue();
+        });
+    failedMeasurementIndex2Info.clear();
+  }
+
   @Override
   public void semanticCheck() {
     super.semanticCheck();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 329fe77203b..ed65e417782 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -194,6 +194,20 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     columns[index] = null;
   }
 
+  @Override
+  public void removeAllFailedMeasurementMarks() {
+    if (failedMeasurementIndex2Info == null) {
+      return;
+    }
+    failedMeasurementIndex2Info.forEach(
+        (index, info) -> {
+          measurements[index] = info.getMeasurement();
+          dataTypes[index] = info.getDataType();
+          columns[index] = info.getValue();
+        });
+    failedMeasurementIndex2Info.clear();
+  }
+
   @Override
   public void semanticCheck() {
     super.semanticCheck();

Reply via email to