This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 6cc25dd375b Pipe: Convert data types when data sync sink metadata does
not match integration test (#13326)
6cc25dd375b is described below
commit 6cc25dd375b9c270eae672d6ed9251a057357261
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Aug 29 21:14:06 2024 +0800
Pipe: Convert data types when data sync sink metadata does not match
integration test (#13326)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../manual/IoTDBPipeTypeConversionISessionIT.java | 804 +++++++++++++++++++++
.../pipe/it/manual/IoTDBPipeTypeConversionIT.java | 608 ++++++++++++++++
.../async/IoTDBDataRegionAsyncConnector.java | 4 +-
.../transform/converter/ValueConverter.java | 78 +-
.../statement/PipeConvertedInsertRowStatement.java | 1 +
.../PipeConvertedInsertTabletStatement.java | 1 +
.../plan/statement/crud/InsertBaseStatement.java | 17 +
.../plan/statement/crud/InsertRowStatement.java | 14 +
.../plan/statement/crud/InsertTabletStatement.java | 14 +
9 files changed, 1524 insertions(+), 17 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
new file mode 100644
index 00000000000..9af757acbde
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java
@@ -0,0 +1,804 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.manual;
+
+import org.apache.iotdb.commons.utils.function.CheckedTriConsumer;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.read.common.RowRecord;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2ManualCreateSchema.class})
+public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeDualManualIT {
+ private static final int generateDataSize = 100;
+
+ @Test
+ public void insertTablet() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ senderSession.insertTablet(tablet);
+ },
+ false);
+ }
+
+ @Test
+ public void insertTabletReceiveByTsFile() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ senderSession.insertTablet(tablet);
+ },
+ true);
+ }
+
+ @Test
+ public void insertAlignedTablet() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ senderSession.insertAlignedTablet(tablet);
+ },
+ false);
+ }
+
+ @Test
+ public void insertAlignedTabletReceiveByTsFile() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ senderSession.insertAlignedTablet(tablet);
+ },
+ true);
+ }
+
+ @Test
+ public void insertRecordsReceiveByTsFile() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<Object>> values =
generateTabletInsertRecordForTable(tablet);
+ senderSession.insertRecords(
+ getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+ },
+ true);
+ }
+
+ @Test
+ public void insertRecord() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<Object>> values =
generateTabletInsertRecordForTable(tablet);
+ for (int i = 0; i < values.size(); i++) {
+ senderSession.insertRecord(
+ tablet.deviceId,
+ timestamps.get(i),
+ pair.left.get(i),
+ pair.right.get(i),
+ values.get(i).toArray());
+ }
+ },
+ false);
+ }
+
+ @Test
+ public void insertRecordReceiveByTsFile() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<Object>> values =
generateTabletInsertRecordForTable(tablet);
+ for (int i = 0; i < values.size(); i++) {
+ senderSession.insertRecord(
+ tablet.deviceId,
+ timestamps.get(i),
+ pair.left.get(i),
+ pair.right.get(i),
+ values.get(i).toArray());
+ }
+ },
+ true);
+ }
+
+ @Test
+ public void insertAlignedRecord() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<Object>> values =
generateTabletInsertRecordForTable(tablet);
+ for (int i = 0; i < values.size(); i++) {
+ senderSession.insertAlignedRecord(
+ tablet.deviceId,
+ timestamps.get(i),
+ pair.left.get(i),
+ pair.right.get(i),
+ values.get(i));
+ }
+ },
+ false);
+ }
+
+ @Test
+ public void insertAlignedRecordReceiveByTsFile() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<Object>> values =
generateTabletInsertRecordForTable(tablet);
+ for (int i = 0; i < values.size(); i++) {
+ senderSession.insertAlignedRecord(
+ tablet.deviceId,
+ timestamps.get(i),
+ pair.left.get(i),
+ pair.right.get(i),
+ values.get(i));
+ }
+ },
+ true);
+ }
+
+ @Test
+ public void insertRecords() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<Object>> values =
generateTabletInsertRecordForTable(tablet);
+ senderSession.insertRecords(
+ getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+ },
+ false);
+ }
+
+ @Test
+ public void insertAlignedRecords() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<Object>> values =
generateTabletInsertRecordForTable(tablet);
+ senderSession.insertAlignedRecords(
+ getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+ },
+ false);
+ }
+
+ @Test
+ public void insertAlignedRecordsReceiveByTsFile() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<Object>> values =
generateTabletInsertRecordForTable(tablet);
+ senderSession.insertAlignedRecords(
+ getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+ },
+ true);
+ }
+
+ @Test
+ public void insertStringRecordsOfOneDevice() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<String>> values =
generateTabletInsertStrRecordForTable(tablet);
+ senderSession.insertStringRecordsOfOneDevice(
+ tablet.deviceId, timestamps, pair.left, values);
+ },
+ false);
+ }
+
+ @Test
+ public void insertStringRecordsOfOneDeviceReceiveByTsFile() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<String>> values =
generateTabletInsertStrRecordForTable(tablet);
+ senderSession.insertStringRecordsOfOneDevice(
+ tablet.deviceId, timestamps, pair.left, values);
+ },
+ true);
+ }
+
+ @Test
+ public void insertAlignedStringRecordsOfOneDevice() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<String>> values =
generateTabletInsertStrRecordForTable(tablet);
+ senderSession.insertAlignedStringRecordsOfOneDevice(
+ tablet.deviceId, timestamps, pair.left, values);
+ },
+ false);
+ }
+
+ @Test
+ public void insertAlignedStringRecordsOfOneDeviceReceiveByTsFile() {
+ prepareTypeConversionTest(
+ (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+ List<Long> timestamps = getTimestampList(tablet);
+ Pair<List<List<String>>, List<List<TSDataType>>> pair =
+ getMeasurementSchemasAndType(tablet);
+ List<List<String>> values =
generateTabletInsertStrRecordForTable(tablet);
+ senderSession.insertAlignedStringRecordsOfOneDevice(
+ tablet.deviceId, timestamps, pair.left, values);
+ },
+ true);
+ }
+
+ private SessionDataSet query(
+ ISession session, List<MeasurementSchema> measurementSchemas, String
deviceId)
+ throws IoTDBConnectionException, StatementExecutionException {
+ String sql = "select ";
+ StringBuffer param = new StringBuffer();
+ for (IMeasurementSchema schema : measurementSchemas) {
+ param.append(schema.getMeasurementId());
+ param.append(',');
+ }
+ sql = sql + param.substring(0, param.length() - 1);
+ sql = sql + " from " + deviceId + " ORDER BY time ASC";
+ return session.executeQueryStatement(sql);
+ }
+
+ private void prepareTypeConversionTest(
+ CheckedTriConsumer<ISession, ISession, Tablet, Exception> consumer,
boolean isTsFile) {
+ List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
+ generateMeasurementSchemas();
+
+ // Generate createTimeSeries in sender and receiver
+ String uuid = RandomStringUtils.random(8, true, false);
+ for (Pair<MeasurementSchema, MeasurementSchema> pair : measurementSchemas)
{
+ createTimeSeries(
+ uuid.toString(), pair.left.getMeasurementId(),
pair.left.getType().name(), senderEnv);
+ createTimeSeries(
+ uuid.toString(), pair.right.getMeasurementId(),
pair.right.getType().name(), receiverEnv);
+ }
+
+ try (ISession senderSession = senderEnv.getSessionConnection();
+ ISession receiverSession = receiverEnv.getSessionConnection()) {
+ Tablet tablet = generateTabletAndMeasurementSchema(measurementSchemas,
"root.test." + uuid);
+ if (isTsFile) {
+ // Send TsFile data to receiver
+ consumer.accept(senderSession, receiverSession, tablet);
+ Thread.sleep(2000);
+ createDataPipe(uuid, true);
+ senderSession.executeNonQueryStatement("flush");
+ } else {
+ // Send Tablet data to receiver
+ createDataPipe(uuid, false);
+ Thread.sleep(2000);
+ // The actual implementation logic of inserting data
+ consumer.accept(senderSession, receiverSession, tablet);
+ senderSession.executeNonQueryStatement("flush");
+ }
+
+ // Verify receiver data
+ long timeoutSeconds = 600;
+ List<List<Object>> expectedValues =
+ generateTabletResultSetForTable(tablet, measurementSchemas);
+ await()
+ .pollInSameThread()
+ .pollDelay(1L, TimeUnit.SECONDS)
+ .pollInterval(1L, TimeUnit.SECONDS)
+ .atMost(timeoutSeconds, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try {
+ validateResultSet(
+ query(receiverSession, tablet.getSchemas(),
tablet.deviceId),
+ expectedValues,
+ tablet.timestamps);
+ } catch (Exception e) {
+ fail();
+ }
+ });
+ senderSession.close();
+ receiverSession.close();
+ tablet.reset();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private void createTimeSeries(String diff, String measurementID, String
dataType, BaseEnv env) {
+ String timeSeriesCreation =
+ String.format(
+ "create timeseries root.test.%s.%s with
datatype=%s,encoding=PLAIN",
+ diff, measurementID, dataType);
+ TestUtils.tryExecuteNonQueriesWithRetry(env,
Collections.singletonList(timeSeriesCreation));
+ }
+
+ private void createDataPipe(String diff, boolean isTSFile) {
+ String sql =
+ String.format(
+ "create pipe test%s"
+ + " with source
('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
+ + " with processor ('processor'='do-nothing-processor')"
+ + " with sink
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
+ diff,
+ isTSFile ? "file" : "forced-log",
+ !isTSFile,
+ isTSFile,
+ receiverEnv.getIP(),
+ receiverEnv.getPort(),
+ isTSFile ? "tsfile" : "tablet");
+ TestUtils.tryExecuteNonQueriesWithRetry(senderEnv,
Collections.singletonList(sql));
+ }
+
+ private void validateResultSet(
+ SessionDataSet dataSet, List<List<Object>> values, long[] timestamps)
+ throws IoTDBConnectionException, StatementExecutionException {
+ int index = 0;
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ List<Field> fields = record.getFields();
+
+ assertEquals(record.getTimestamp(), timestamps[index]);
+ List<Object> rowValues = values.get(index++);
+ for (int i = 0; i < fields.size(); i++) {
+ Field field = fields.get(i);
+ switch (field.getDataType()) {
+ case INT64:
+ case TIMESTAMP:
+ assertEquals(field.getLongV(), (long) rowValues.get(i));
+ break;
+ case DATE:
+ assertEquals(field.getDateV(), rowValues.get(i));
+ break;
+ case BLOB:
+ assertEquals(field.getBinaryV(), rowValues.get(i));
+ break;
+ case TEXT:
+ case STRING:
+ assertEquals(field.getStringValue(), rowValues.get(i));
+ break;
+ case INT32:
+ assertEquals(field.getIntV(), (int) rowValues.get(i));
+ break;
+ case DOUBLE:
+ assertEquals(0, Double.compare(field.getDoubleV(), (double)
rowValues.get(i)));
+ break;
+ case FLOAT:
+ assertEquals(0, Float.compare(field.getFloatV(), (float)
rowValues.get(i)));
+ break;
+ }
+ }
+ }
+ assertEquals(values.size(), index);
+ }
+
+ private boolean[] createTestDataForBoolean() {
+ boolean[] data = new boolean[generateDataSize];
+ Random random = new Random();
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextBoolean();
+ }
+ return data;
+ }
+
+ private int[] createTestDataForInt32() {
+ int[] data = new int[generateDataSize];
+ Random random = new Random();
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextInt();
+ }
+ return data;
+ }
+
+ private long[] createTestDataForInt64() {
+ long[] data = new long[generateDataSize];
+ Random random = new Random();
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextLong();
+ }
+ return data;
+ }
+
+ private float[] createTestDataForFloat() {
+ float[] data = new float[generateDataSize];
+ Random random = new Random();
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextFloat();
+ }
+ return data;
+ }
+
+ private double[] createTestDataForDouble() {
+ double[] data = new double[generateDataSize];
+ Random random = new Random();
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextDouble();
+ }
+ return data;
+ }
+
+ private long[] createTestDataForTimestamp() {
+ long[] data = new long[generateDataSize];
+ long time = new Date().getTime();
+ for (int i = 0; i < data.length; i++) {
+ data[i] = time + i;
+ }
+ return data;
+ }
+
+ private LocalDate[] createTestDataForDate() {
+ LocalDate[] data = new LocalDate[generateDataSize];
+ int year = 2023;
+ int month = 1;
+ int day = 1;
+ for (int i = 0; i < data.length; i++) {
+ data[i] = DateUtils.parseIntToLocalDate(year * 10000 + (month * 100) +
day);
+ // update
+ day++;
+ if (day > 28) {
+ day = 1;
+ month++;
+ if (month > 12) {
+ month = 1;
+ year++;
+ }
+ }
+ }
+ return data;
+ }
+
+ private Binary[] createTestDataForString() {
+ String[] stringData = {
+ "Hello",
+ "Hello World!",
+ "This is a test.",
+ "IoTDB Hello World!!!!",
+ "IoTDB is an excellent time series database!!!!!!!!!",
+ "12345678910!!!!!!!!",
+ "123456",
+ "1234567.123213",
+ "21232131.21",
+ "enable = true",
+ "true",
+ "false",
+ "12345678910",
+ "123231232132131233213123123123123123131312",
+ "123231232132131233213123123123123123131312.212312321312312",
+ };
+ Binary[] data = new Binary[generateDataSize];
+ for (int i = 0; i < data.length; i++) {
+ data[i] =
+ new Binary(stringData[(i %
stringData.length)].getBytes(TSFileConfig.STRING_CHARSET));
+ }
+ return data;
+ }
+
+ private List<Long> getTimestampList(Tablet tablet) {
+ long[] timestamps = tablet.timestamps;
+ List<Long> data = new ArrayList<>(timestamps.length);
+ for (long timestamp : timestamps) {
+ data.add(timestamp);
+ }
+ return data;
+ }
+
+ private Pair<List<List<String>>, List<List<TSDataType>>>
getMeasurementSchemasAndType(
+ Tablet tablet) {
+ List<List<String>> schemaData = new ArrayList<>(tablet.rowSize);
+ List<List<TSDataType>> typeData = new ArrayList<>(tablet.rowSize);
+ List<String> measurementSchemas = new
ArrayList<>(tablet.getSchemas().size());
+ List<TSDataType> types = new ArrayList<>(tablet.rowSize);
+ for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
+ measurementSchemas.add(measurementSchema.getMeasurementId());
+ types.add(measurementSchema.getType());
+ }
+
+ for (int i = 0; i < tablet.rowSize; i++) {
+ schemaData.add(measurementSchemas);
+ typeData.add(types);
+ }
+
+ return new Pair<>(schemaData, typeData);
+ }
+
+ private List<String> getDeviceID(Tablet tablet) {
+ List<String> data = new ArrayList<>(tablet.rowSize);
+ for (int i = 0; i < tablet.rowSize; i++) {
+ data.add(tablet.deviceId);
+ }
+ return data;
+ }
+
+ private List<List<Object>> generateTabletResultSetForTable(
+ final Tablet tablet, List<Pair<MeasurementSchema, MeasurementSchema>>
pairs) {
+ List<List<Object>> insertRecords = new ArrayList<>(tablet.rowSize);
+ final List<MeasurementSchema> schemas = tablet.getSchemas();
+ final Object[] values = tablet.values;
+ for (int i = 0; i < tablet.rowSize; i++) {
+ List<Object> insertRecord = new ArrayList<>();
+ for (int j = 0; j < schemas.size(); j++) {
+ TSDataType sourceType = pairs.get(j).left.getType();
+ TSDataType targetType = pairs.get(j).right.getType();
+ Object value = null;
+ switch (sourceType) {
+ case INT64:
+ case TIMESTAMP:
+ value = ValueConverter.convert(sourceType, targetType, ((long[])
values[j])[i]);
+ insertRecord.add(convert(value, targetType));
+ break;
+ case INT32:
+ value = ValueConverter.convert(sourceType, targetType, ((int[])
values[j])[i]);
+ insertRecord.add(convert(value, targetType));
+ break;
+ case DOUBLE:
+ value = ValueConverter.convert(sourceType, targetType, ((double[])
values[j])[i]);
+ insertRecord.add(convert(value, targetType));
+ break;
+ case FLOAT:
+ value = ValueConverter.convert(sourceType, targetType, ((float[])
values[j])[i]);
+ insertRecord.add(convert(value, targetType));
+ break;
+ case DATE:
+ value =
+ ValueConverter.convert(
+ sourceType,
+ targetType,
+ DateUtils.parseDateExpressionToInt(((LocalDate[])
values[j])[i]));
+ insertRecord.add(convert(value, targetType));
+ break;
+ case TEXT:
+ case STRING:
+ value = ValueConverter.convert(sourceType, targetType, ((Binary[])
values[j])[i]);
+ insertRecord.add(convert(value, targetType));
+ break;
+ case BLOB:
+ value = ValueConverter.convert(sourceType, targetType, ((Binary[])
values[j])[i]);
+ insertRecord.add(convert(value, targetType));
+ break;
+ case BOOLEAN:
+ value = ValueConverter.convert(sourceType, targetType,
((boolean[]) values[j])[i]);
+ insertRecord.add(convert(value, targetType));
+ break;
+ }
+ }
+ insertRecords.add(insertRecord);
+ }
+
+ return insertRecords;
+ }
+
+ private Object convert(Object value, TSDataType targetType) {
+ switch (targetType) {
+ case DATE:
+ return DateUtils.parseIntToLocalDate((Integer) value);
+ case TEXT:
+ case STRING:
+ return new String(((Binary) value).getValues(),
TSFileConfig.STRING_CHARSET);
+ }
+ return value;
+ }
+
+ private List<List<Object>> generateTabletInsertRecordForTable(final Tablet
tablet) {
+ List<List<Object>> insertRecords = new ArrayList<>(tablet.rowSize);
+ final List<MeasurementSchema> schemas = tablet.getSchemas();
+ final Object[] values = tablet.values;
+ for (int i = 0; i < tablet.rowSize; i++) {
+ List<Object> insertRecord = new ArrayList<>();
+ for (int j = 0; j < schemas.size(); j++) {
+ switch (schemas.get(j).getType()) {
+ case INT64:
+ case TIMESTAMP:
+ insertRecord.add(((long[]) values[j])[i]);
+ break;
+ case INT32:
+ insertRecord.add(((int[]) values[j])[i]);
+ break;
+ case DOUBLE:
+ insertRecord.add(((double[]) values[j])[i]);
+ break;
+ case FLOAT:
+ insertRecord.add(((float[]) values[j])[i]);
+ break;
+ case DATE:
+ insertRecord.add(((LocalDate[]) values[j])[i]);
+ break;
+ case TEXT:
+ case STRING:
+ insertRecord.add(
+ new String(((Binary[]) values[j])[i].getValues(),
TSFileConfig.STRING_CHARSET));
+ break;
+ case BLOB:
+ insertRecord.add(((Binary[]) values[j])[i]);
+ break;
+ case BOOLEAN:
+ insertRecord.add(((boolean[]) values[j])[i]);
+ break;
+ }
+ }
+ insertRecords.add(insertRecord);
+ }
+
+ return insertRecords;
+ }
+
+ private List<List<String>> generateTabletInsertStrRecordForTable(Tablet
tablet) {
+ List<List<String>> insertRecords = new ArrayList<>(tablet.rowSize);
+ final List<MeasurementSchema> schemas = tablet.getSchemas();
+ final Object[] values = tablet.values;
+ for (int i = 0; i < tablet.rowSize; i++) {
+ List<String> insertRecord = new ArrayList<>();
+ for (int j = 0; j < schemas.size(); j++) {
+ switch (schemas.get(j).getType()) {
+ case INT64:
+ insertRecord.add(String.valueOf(((long[]) values[j])[i]));
+ break;
+ case TIMESTAMP:
+ insertRecord.add(
+ RpcUtils.formatDatetime("default", "ms", ((long[])
values[j])[i], ZoneOffset.UTC));
+ break;
+ case INT32:
+ insertRecord.add(String.valueOf(((int[]) values[j])[i]));
+ break;
+ case DOUBLE:
+ insertRecord.add(String.valueOf(((double[]) values[j])[i]));
+ break;
+ case FLOAT:
+ insertRecord.add(String.valueOf(((float[]) values[j])[i]));
+ break;
+ case DATE:
+ insertRecord.add(((LocalDate[]) values[j])[i].toString());
+ break;
+ case TEXT:
+ case STRING:
+ insertRecord.add(
+ new String(((Binary[]) values[j])[i].getValues(),
TSFileConfig.STRING_CHARSET));
+ break;
+ case BLOB:
+ String value =
+ BytesUtils.parseBlobByteArrayToString(((Binary[])
values[j])[i].getValues())
+ .substring(2);
+ insertRecord.add(String.format("X'%s'", value));
+ break;
+ case BOOLEAN:
+ insertRecord.add(String.valueOf(((boolean[]) values[j])[i]));
+ break;
+ }
+ }
+ insertRecords.add(insertRecord);
+ }
+
+ return insertRecords;
+ }
+
+ private Tablet generateTabletAndMeasurementSchema(
+ List<Pair<MeasurementSchema, MeasurementSchema>> pairs, String deviceId)
{
+ long[] timestamp = createTestDataForTimestamp();
+ Object[] objects = new Object[pairs.size()];
+ List<MeasurementSchema> measurementSchemas = new ArrayList<>(pairs.size());
+ BitMap[] bitMaps = new BitMap[pairs.size()];
+ for (int i = 0; i < bitMaps.length; i++) {
+ bitMaps[i] = new BitMap(generateDataSize);
+ }
+ for (int i = 0; i < objects.length; i++) {
+ MeasurementSchema schema = pairs.get(i).left;
+ measurementSchemas.add(schema);
+ switch (schema.getType()) {
+ case INT64:
+ objects[i] = createTestDataForInt64();
+ break;
+ case INT32:
+ objects[i] = createTestDataForInt32();
+ break;
+ case TIMESTAMP:
+ objects[i] = createTestDataForTimestamp();
+ break;
+ case DOUBLE:
+ objects[i] = createTestDataForDouble();
+ break;
+ case FLOAT:
+ objects[i] = createTestDataForFloat();
+ break;
+ case DATE:
+ objects[i] = createTestDataForDate();
+ break;
+ case STRING:
+ case BLOB:
+ case TEXT:
+ objects[i] = createTestDataForString();
+ break;
+ case BOOLEAN:
+ objects[i] = createTestDataForBoolean();
+ break;
+ }
+ }
+ return new Tablet(deviceId, measurementSchemas, timestamp, objects,
bitMaps, generateDataSize);
+ }
+
+ private List<Pair<MeasurementSchema, MeasurementSchema>>
generateMeasurementSchemas() {
+ TSDataType[] dataTypes = {
+ TSDataType.STRING,
+ TSDataType.TEXT,
+ TSDataType.BLOB,
+ TSDataType.TIMESTAMP,
+ TSDataType.BOOLEAN,
+ TSDataType.DATE,
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT32,
+ TSDataType.INT64
+ };
+ List<Pair<MeasurementSchema, MeasurementSchema>> pairs = new ArrayList<>();
+
+ for (TSDataType type : dataTypes) {
+ for (TSDataType dataType : dataTypes) {
+ String id = String.format("%s2%s", type.name(), dataType.name());
+ pairs.add(new Pair<>(new MeasurementSchema(id, type), new
MeasurementSchema(id, dataType)));
+ }
+ }
+ return pairs;
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionIT.java
new file mode 100644
index 00000000000..8dfde314dfa
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionIT.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.manual;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2ManualCreateSchema.class})
+public class IoTDBPipeTypeConversionIT extends AbstractPipeDualManualIT {
+
+ private static final int generateDataSize = 100;
+
+ // Test for converting BOOLEAN to OtherType
+ @Test
+ public void testBooleanToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.INT32);
+ executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.INT64);
+ executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.FLOAT);
+ executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.DOUBLE);
+ executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.TEXT);
+ executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.TIMESTAMP);
+ executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.BLOB);
+ executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.STRING);
+ executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.DATE);
+ }
+
+ // Test for converting INT32 to OtherType
+ @Test
+ public void testInt32ToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.BOOLEAN);
+ executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.INT64);
+ executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.FLOAT);
+ executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.DOUBLE);
+ executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.TEXT);
+ executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.TIMESTAMP);
+ executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.BLOB);
+ executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.STRING);
+ executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.DATE);
+ }
+
+ // Test for converting INT64 to OtherType
+ @Test
+ public void testInt64ToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.BOOLEAN);
+ executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.INT32);
+ executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.FLOAT);
+ executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.DOUBLE);
+ executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.TEXT);
+ executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.TIMESTAMP);
+ executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.BLOB);
+ executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.STRING);
+ executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.DATE);
+ }
+
+ // Test for converting FLOAT to OtherType
+ @Test
+ public void testFloatToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.BOOLEAN);
+ executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.INT32);
+ executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.INT64);
+ executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.DOUBLE);
+ executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.TEXT);
+ executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.TIMESTAMP);
+ executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.BLOB);
+ executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.STRING);
+ executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.DATE);
+ }
+
+ // Test for converting DOUBLE to OtherType
+ @Test
+ public void testDoubleToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.BOOLEAN);
+ executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.INT32);
+ executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.INT64);
+ executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.FLOAT);
+ executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.TEXT);
+ executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.TIMESTAMP);
+ executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.BLOB);
+ executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.STRING);
+ executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.DATE);
+ }
+
+ // Test for converting TEXT to OtherType
+ @Test
+ public void testTextToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.BLOB);
+ executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.STRING);
+ executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.BOOLEAN);
+ executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.INT32);
+ executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.INT64);
+ executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.FLOAT);
+ executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.DOUBLE);
+ executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.TIMESTAMP);
+ executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.DATE);
+ }
+
+ // Test for converting TIMESTAMP to OtherType
+ @Test
+ public void testTimestampToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.BOOLEAN);
+ executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.INT32);
+ executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.INT64);
+ executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.FLOAT);
+ executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.DOUBLE);
+ executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.TEXT);
+ executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.BLOB);
+ executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.STRING);
+ executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.DATE);
+ }
+
+ // Test for converting DATE to OtherType
+ @Test
+ public void testDateToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.BOOLEAN);
+ executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.INT32);
+ executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.INT64);
+ executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.FLOAT);
+ executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.DOUBLE);
+ executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.TEXT);
+ executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.STRING);
+ executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.TIMESTAMP);
+ }
+
+ // Test for converting BLOB to OtherType
+ @Test
+ public void testBlobToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.TEXT);
+ executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.STRING);
+ executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.BOOLEAN);
+ executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.INT32);
+ executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.INT64);
+ executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.FLOAT);
+ executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.DOUBLE);
+ executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.TIMESTAMP);
+ executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.DATE);
+ }
+
+ // Test for converting STRING to OtherType
+ @Test
+ public void testStringToOtherTypeConversion() {
+ executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.TEXT);
+ executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.BLOB);
+ executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.BOOLEAN);
+ executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.INT32);
+ executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.INT64);
+ executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.FLOAT);
+ executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.DOUBLE);
+ executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.TIMESTAMP);
+ }
+
+ private void executeAndVerifyTypeConversion(TSDataType source, TSDataType
target) {
+ List<Pair> pairs = prepareTypeConversionTest(source, target);
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ String.format("select * from root.%s2%s.**", source.name(),
target.name()),
+ String.format("Time,root.%s2%s.test.status,", source.name(),
target.name()),
+ createExpectedResultSet(pairs, source, target),
+ 30);
+ }
+
+ private List<Pair> prepareTypeConversionTest(TSDataType sourceType,
TSDataType targetType) {
+ String sourceTypeName = sourceType.name();
+ String targetTypeName = targetType.name();
+
+ createTimeSeries(sourceTypeName, targetTypeName, sourceTypeName,
senderEnv);
+ createTimeSeries(sourceTypeName, targetTypeName, targetTypeName,
receiverEnv);
+
+ createDataPipe(sourceTypeName, targetTypeName);
+
+ List<Pair> pairs = createTestDataForType(sourceTypeName);
+
+ // wait pipe start
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ executeDataInsertions(pairs, sourceType, targetType);
+ return pairs;
+ }
+
+ private void createTimeSeries(
+ String sourceTypeName, String targetTypeName, String dataType, BaseEnv
env) {
+ String timeSeriesCreationQuery =
+ String.format(
+ "create timeseries root.%s2%s.test.status with
datatype=%s,encoding=PLAIN",
+ sourceTypeName, targetTypeName, dataType);
+ TestUtils.tryExecuteNonQueriesWithRetry(
+ env, Collections.singletonList(timeSeriesCreationQuery));
+ }
+
+ private void createDataPipe(String sourceTypeName, String targetTypeName) {
+ String sql =
+ String.format(
+ "create pipe %s2%s"
+ + " with source
('source'='iotdb-source','source.path'='root.%s2%s.**','realtime.mode'='forced-log','realtime.enable'='true','history.enable'='false')"
+ + " with processor ('processor'='do-nothing-processor')"
+ + " with sink
('node-urls'='%s:%s','batch.enable'='false','sink.format'='tablet')",
+ sourceTypeName,
+ targetTypeName,
+ sourceTypeName,
+ targetTypeName,
+ receiverEnv.getIP(),
+ receiverEnv.getPort());
+ TestUtils.tryExecuteNonQueriesWithRetry(senderEnv,
Collections.singletonList(sql));
+ }
+
+ private List<Pair> createTestDataForType(String sourceType) {
+ switch (sourceType) {
+ case "BOOLEAN":
+ return createTestDataForBoolean();
+ case "INT32":
+ return createTestDataForInt32();
+ case "INT64":
+ return createTestDataForInt64();
+ case "FLOAT":
+ return createTestDataForFloat();
+ case "DOUBLE":
+ return createTestDataForDouble();
+ case "TEXT":
+ return createTestDataForText();
+ case "TIMESTAMP":
+ return createTestDataForTimestamp();
+ case "DATE":
+ return createTestDataForDate();
+ case "BLOB":
+ return createTestDataForBlob();
+ case "STRING":
+ return createTestDataForString();
+ default:
+ throw new UnsupportedOperationException("Unsupported data type: " +
sourceType);
+ }
+ }
+
+ private void executeDataInsertions(
+ List<Pair> testData, TSDataType sourceType, TSDataType targetType) {
+ switch (sourceType) {
+ case STRING:
+ case TEXT:
+ TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ createInsertStatementsForString(testData, sourceType.name(),
targetType.name()));
+ return;
+ case TIMESTAMP:
+ TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ createInsertStatementsForTimestamp(testData, sourceType.name(),
targetType.name()));
+ return;
+ case DATE:
+ TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ createInsertStatementsForLocalDate(testData, sourceType.name(),
targetType.name()));
+ return;
+ case BLOB:
+ TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ createInsertStatementsForBlob(testData, sourceType.name(),
targetType.name()));
+ return;
+ default:
+ TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ createInsertStatementsForNumeric(testData, sourceType.name(),
targetType.name()));
+ }
+ }
+
+ private List<String> createInsertStatementsForString(
+ List<Pair> testData, String sourceType, String targetType) {
+ List<String> executes = new ArrayList<>();
+ for (Pair pair : testData) {
+ executes.add(
+ String.format(
+ "insert into root.%s2%s.test(timestamp,status) values
(%s,'%s');",
+ sourceType,
+ targetType,
+ pair.left,
+ new String(((Binary) (pair.right)).getValues(),
StandardCharsets.UTF_8)));
+ }
+ executes.add("flush");
+ return executes;
+ }
+
+ private List<String> createInsertStatementsForNumeric(
+ List<Pair> testData, String sourceType, String targetType) {
+ List<String> executes = new ArrayList<>();
+ for (Pair pair : testData) {
+ executes.add(
+ String.format(
+ "insert into root.%s2%s.test(timestamp,status) values (%s,%s);",
+ sourceType, targetType, pair.left, pair.right));
+ }
+ executes.add("flush");
+ return executes;
+ }
+
+ private List<String> createInsertStatementsForTimestamp(
+ List<Pair> testData, String sourceType, String targetType) {
+ List<String> executes = new ArrayList<>();
+ for (Pair pair : testData) {
+ executes.add(
+ String.format(
+ "insert into root.%s2%s.test(timestamp,status) values (%s,%s);",
+ sourceType, targetType, pair.left, pair.right));
+ }
+ executes.add("flush");
+ return executes;
+ }
+
+ private List<String> createInsertStatementsForLocalDate(
+ List<Pair> testData, String sourceType, String targetType) {
+ List<String> executes = new ArrayList<>();
+ for (Pair pair : testData) {
+ executes.add(
+ String.format(
+ "insert into root.%s2%s.test(timestamp,status) values
(%s,'%s');",
+ sourceType, targetType, pair.left,
DateUtils.formatDate((Integer) pair.right)));
+ }
+ executes.add("flush");
+ return executes;
+ }
+
+ private List<String> createInsertStatementsForBlob(
+ List<Pair> testData, String sourceType, String targetType) {
+ List<String> executes = new ArrayList<>();
+ for (Pair pair : testData) {
+ String value = BytesUtils.parseBlobByteArrayToString(((Binary)
pair.right).getValues());
+ executes.add(
+ String.format(
+ "insert into root.%s2%s.test(timestamp,status) values
(%s,X'%s');",
+ sourceType, targetType, pair.left, value.substring(2)));
+ }
+ executes.add("flush");
+ return executes;
+ }
+
+ private Set<String> createExpectedResultSet(
+ List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+ switch (targetType) {
+ case TIMESTAMP:
+ return generateTimestampResultSet(pairs, sourceType, targetType);
+ case DATE:
+ return generateLocalDateResultSet(pairs, sourceType, targetType);
+ case BLOB:
+ return generateBlobResultSet(pairs, sourceType, targetType);
+ case TEXT:
+ case STRING:
+ return generateStringResultSet(pairs, sourceType, targetType);
+ default:
+ HashSet<String> resultSet = new HashSet<>();
+ for (Pair pair : pairs) {
+ resultSet.add(
+ String.format(
+ "%s,%s,", pair.left, ValueConverter.convert(sourceType,
targetType, pair.right)));
+ }
+ return resultSet;
+ }
+ }
+
+ private Set<String> generateTimestampResultSet(
+ List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+ HashSet<String> resultSet = new HashSet<>();
+ for (Pair pair : pairs) {
+ resultSet.add(
+ String.format(
+ "%s,%s,", pair.left, ValueConverter.convert(sourceType,
targetType, pair.right)));
+ }
+ return resultSet;
+ }
+
+ private Set<String> generateLocalDateResultSet(
+ List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+ HashSet<String> resultSet = new HashSet<>();
+ for (Pair pair : pairs) {
+ resultSet.add(
+ String.format(
+ "%s,%s,",
+ pair.left,
+ DateUtils.formatDate(
+ (Integer) ValueConverter.convert(sourceType, targetType,
pair.right))));
+ }
+ return resultSet;
+ }
+
+ private Set<String> generateBlobResultSet(
+ List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+ HashSet<String> resultSet = new HashSet<>();
+ for (Pair pair : pairs) {
+ resultSet.add(
+ String.format(
+ "%s,%s,",
+ pair.left,
+ BytesUtils.parseBlobByteArrayToString(
+ ((Binary) ValueConverter.convert(sourceType, targetType,
pair.right))
+ .getValues())));
+ }
+ return resultSet;
+ }
+
+ private Set<String> generateStringResultSet(
+ List<Pair> pairs, TSDataType sourceType, TSDataType targetType) {
+ HashSet<String> resultSet = new HashSet<>();
+ for (Pair pair : pairs) {
+ resultSet.add(
+ String.format(
+ "%s,%s,",
+ pair.left,
+ new String(
+ ((Binary) ValueConverter.convert(sourceType, targetType,
pair.right)).getValues(),
+ StandardCharsets.UTF_8)));
+ }
+ return resultSet;
+ }
+
+ private List<Pair> createTestDataForBoolean() {
+ List<Pair> pairs = new java.util.ArrayList<>();
+ Random random = new Random();
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, random.nextBoolean()));
+ }
+ return pairs;
+ }
+
+ private List<Pair> createTestDataForInt32() {
+ List<Pair> pairs = new ArrayList<>();
+ Random random = new Random();
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, random.nextInt()));
+ }
+ pairs.add(new Pair<>(generateDataSize + 1, -1));
+ pairs.add(new Pair<>(generateDataSize + 2, -2));
+ pairs.add(new Pair<>(generateDataSize + 3, -3));
+ return pairs;
+ }
+
+ private List<Pair> createTestDataForInt64() {
+ List<Pair> pairs = new ArrayList<>();
+ Random random = new Random();
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, random.nextLong()));
+ }
+ pairs.add(new Pair<>(generateDataSize + 1, -1L));
+ pairs.add(new Pair<>(generateDataSize + 2, -2L));
+ pairs.add(new Pair<>(generateDataSize + 3, -3L));
+ return pairs;
+ }
+
+ private List<Pair> createTestDataForFloat() {
+ List<Pair> pairs = new ArrayList<>();
+ Random random = new Random();
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, random.nextFloat()));
+ }
+ return pairs;
+ }
+
+ private List<Pair> createTestDataForDouble() {
+ List<Pair> pairs = new ArrayList<>();
+ Random random = new Random();
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, random.nextDouble()));
+ }
+ return pairs;
+ }
+
+ private List<Pair> createTestDataForText() {
+ List<Pair> pairs = new ArrayList<>();
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, new
Binary((String.valueOf(i)).getBytes(StandardCharsets.UTF_8))));
+ }
+ pairs.add(new Pair(generateDataSize + 1, new
Binary("Hello".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 2, new Binary("Hello
World!".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 3, new Binary("This is a
test.".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 4,
+ new Binary("IoTDB Hello
World!!!!".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 5,
+ new Binary(
+ "IoTDB is an excellent time series database!!!!!!!!!"
+ .getBytes(StandardCharsets.UTF_8))));
+ return pairs;
+ }
+
+ private List<Pair> createTestDataForTimestamp() {
+ List<Pair> pairs = new ArrayList<>();
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, new Date().getTime() + i));
+ }
+ return pairs;
+ }
+
+ private List<Pair> createTestDataForDate() {
+ List<Pair> pairs = new ArrayList<>();
+ int year = 2023;
+ int month = 1;
+ int day = 1;
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, year * 10000 + (month * 100) + day));
+
+ // update
+ day++;
+ if (day > 28) {
+ day = 1;
+ month++;
+ if (month > 12) {
+ month = 1;
+ year++;
+ }
+ }
+ }
+ return pairs;
+ }
+
+ private List<Pair> createTestDataForBlob() {
+ List<Pair> pairs = new ArrayList<>();
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, new
Binary((String.valueOf(i)).getBytes(StandardCharsets.UTF_8))));
+ }
+ pairs.add(new Pair(generateDataSize + 1, new
Binary("Hello".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 2, new Binary("Hello
World!".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 3, new Binary("This is a
test.".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 4,
+ new Binary("IoTDB Hello
World!!!!".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 5,
+ new Binary(
+ "IoTDB is an excellent time series database!!!!!!!!!"
+ .getBytes(StandardCharsets.UTF_8))));
+ return pairs;
+ }
+
+ private List<Pair> createTestDataForString() {
+ List<Pair> pairs = new ArrayList<>();
+ for (long i = 0; i < generateDataSize; i++) {
+ pairs.add(new Pair<>(i, new
Binary((String.valueOf(i)).getBytes(StandardCharsets.UTF_8))));
+ }
+ pairs.add(new Pair(generateDataSize + 1, new
Binary("Hello".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 2, new Binary("Hello
World!".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 3, new Binary("This is a
test.".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 4,
+ new Binary("IoTDB Hello
World!!!!".getBytes(StandardCharsets.UTF_8))));
+ pairs.add(
+ new Pair(
+ generateDataSize + 5,
+ new Binary(
+ "IoTDB is an excellent time series database!!!!!!!!!"
+ .getBytes(StandardCharsets.UTF_8))));
+ return pairs;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 394e2e95d2b..083f12485ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -518,7 +518,9 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
@Override
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+ if (isTabletBatchModeEnabled) {
+ tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+ }
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
index 098792fd0b3..db38102ca9d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java
@@ -26,8 +26,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.DateUtils;
-import java.time.ZoneId;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
public class ValueConverter {
@@ -299,6 +303,11 @@ public class ValueConverter {
private static final Binary BINARY_TRUE =
parseString(Boolean.TRUE.toString());
private static final Binary BINARY_FALSE =
parseString(Boolean.FALSE.toString());
+ private static final int TRUE_DATE =
DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 2));
+ private static final int FALSE_DATE =
+ DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 1));
+ private static final int DEFAULT_DATE =
+ DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 1));
public static int convertBooleanToInt32(final boolean value) {
return value ? 1 : 0;
@@ -325,7 +334,7 @@ public class ValueConverter {
}
public static int convertBooleanToDate(final boolean value) {
- return value ? 1 : 0;
+ return value ? TRUE_DATE : FALSE_DATE;
}
public static Binary convertBooleanToBlob(final boolean value) {
@@ -363,7 +372,12 @@ public class ValueConverter {
}
public static int convertInt32ToDate(final int value) {
- return value;
+ try {
+ DateUtils.parseIntToLocalDate(value);
+ return value;
+ } catch (Exception e) {
+ return DEFAULT_DATE;
+ }
}
public static Binary convertInt32ToBlob(final int value) {
@@ -401,7 +415,13 @@ public class ValueConverter {
}
public static int convertInt64ToDate(final long value) {
- return (int) value;
+ try {
+ int data = (int) value;
+ DateUtils.parseIntToLocalDate(data);
+ return data;
+ } catch (Exception e) {
+ return DEFAULT_DATE;
+ }
}
public static Binary convertInt64ToBlob(final long value) {
@@ -439,7 +459,13 @@ public class ValueConverter {
}
public static int convertFloatToDate(final float value) {
- return (int) value;
+ try {
+ int data = (int) value;
+ DateUtils.parseIntToLocalDate(data);
+ return data;
+ } catch (Exception e) {
+ return DEFAULT_DATE;
+ }
}
public static Binary convertFloatToBlob(final float value) {
@@ -477,7 +503,13 @@ public class ValueConverter {
}
public static int convertDoubleToDate(final double value) {
- return (int) value;
+ try {
+ int data = (int) value;
+ DateUtils.parseIntToLocalDate(data);
+ return data;
+ } catch (Exception e) {
+ return DEFAULT_DATE;
+ }
}
public static Binary convertDoubleToBlob(final double value) {
@@ -553,7 +585,12 @@ public class ValueConverter {
}
public static int convertTimestampToDate(final long value) {
- return (int) value;
+ try {
+ Instant instant = Instant.ofEpochMilli(value);
+ return
DateUtils.parseDateExpressionToInt(instant.atZone(ZoneOffset.UTC).toLocalDate());
+ } catch (Exception e) {
+ return DEFAULT_DATE;
+ }
}
public static Binary convertTimestampToBlob(final long value) {
@@ -567,7 +604,7 @@ public class ValueConverter {
///////////// DATE //////////////
public static boolean convertDateToBoolean(final int value) {
- return value != 0;
+ return value != FALSE_DATE;
}
public static int convertDateToInt32(final int value) {
@@ -591,7 +628,14 @@ public class ValueConverter {
}
public static long convertDateToTimestamp(final int value) {
- return value;
+ try {
+ LocalDate date = DateUtils.parseIntToLocalDate(value);
+ ZonedDateTime dateTime = date.atStartOfDay(ZoneOffset.UTC);
+ Instant instant = dateTime.toInstant();
+ return instant.toEpochMilli();
+ } catch (Exception e) {
+ return 0L;
+ }
}
public static Binary convertDateToBlob(final int value) {
@@ -753,8 +797,7 @@ public class ValueConverter {
try {
return TypeInferenceUtils.isNumber(value)
? Long.parseLong(value)
- : DateTimeUtils.parseDateTimeExpressionToLong(
- StringUtils.trim(value), ZoneId.systemDefault());
+ :
DateTimeUtils.parseDateTimeExpressionToLong(StringUtils.trim(value),
ZoneOffset.UTC);
} catch (final Exception e) {
return 0L;
}
@@ -762,14 +805,17 @@ public class ValueConverter {
private static int parseDate(final String value) {
if (value == null || value.isEmpty()) {
- return 0;
+ return DEFAULT_DATE;
}
try {
- return TypeInferenceUtils.isNumber(value)
- ? Integer.parseInt(value)
- : DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value));
+ if (TypeInferenceUtils.isNumber(value)) {
+ int date = Integer.parseInt(value);
+ DateUtils.parseIntToLocalDate(date);
+ return date;
+ }
+ return DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value));
} catch (final Exception e) {
- return 0;
+ return DEFAULT_DATE;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
index 3df76b79ce4..6ae0e2d9fa4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -42,6 +42,7 @@ public class PipeConvertedInsertRowStatement extends
InsertRowStatement {
// Statement
isDebug = insertRowStatement.isDebug();
// InsertBaseStatement
+ insertRowStatement.removeAllFailedMeasurementMarks();
devicePath = insertRowStatement.getDevicePath();
isAligned = insertRowStatement.isAligned();
measurementSchemas = insertRowStatement.getMeasurementSchemas();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
index 2481034cbae..a5013725061 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -36,6 +36,7 @@ public class PipeConvertedInsertTabletStatement extends
InsertTabletStatement {
// Statement
isDebug = insertTabletStatement.isDebug();
// InsertBaseStatement
+ insertTabletStatement.removeAllFailedMeasurementMarks();
devicePath = insertTabletStatement.getDevicePath();
isAligned = insertTabletStatement.isAligned();
measurementSchemas = insertTabletStatement.getMeasurementSchemas();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 88e5b7735cf..048e75afebc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -229,6 +229,11 @@ public abstract class InsertBaseStatement extends
Statement {
throw new UnsupportedOperationException();
}
+ /** * Resets the state of all measurements marked as failed, clearing the
failure records. */
+ public void removeAllFailedMeasurementMarks() {
+ throw new UnsupportedOperationException();
+ }
+
public boolean hasValidMeasurements() {
for (Object o : measurements) {
if (o != null) {
@@ -290,6 +295,18 @@ public abstract class InsertBaseStatement extends
Statement {
this.value = value;
this.cause = cause;
}
+
+ public String getMeasurement() {
+ return measurement;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ public Object getValue() {
+ return value;
+ }
}
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index bb67086456a..c754290b959 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -261,6 +261,20 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
values[index] = null;
}
+ @Override
+ public void removeAllFailedMeasurementMarks() {
+ if (failedMeasurementIndex2Info == null) {
+ return;
+ }
+ failedMeasurementIndex2Info.forEach(
+ (index, info) -> {
+ measurements[index] = info.getMeasurement();
+ dataTypes[index] = info.getDataType();
+ values[index] = info.getValue();
+ });
+ failedMeasurementIndex2Info.clear();
+ }
+
@Override
public void semanticCheck() {
super.semanticCheck();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 329fe77203b..ed65e417782 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -194,6 +194,20 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
columns[index] = null;
}
+ @Override
+ public void removeAllFailedMeasurementMarks() {
+ if (failedMeasurementIndex2Info == null) {
+ return;
+ }
+ failedMeasurementIndex2Info.forEach(
+ (index, info) -> {
+ measurements[index] = info.getMeasurement();
+ dataTypes[index] = info.getDataType();
+ columns[index] = info.getValue();
+ });
+ failedMeasurementIndex2Info.clear();
+ }
+
@Override
public void semanticCheck() {
super.semanticCheck();