This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch patch-2094 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 233f86a13078e1e13703eb08ee35a662c3c8d40b Author: 罗振羽 <[email protected]> AuthorDate: Thu Apr 30 10:44:17 2026 +0000 [TIMECHODB]Fix pipe parsing for null object columns (cherry picked from commit 19a868ffd5d71eba71e181d5a5a714334799578b) --- .../manual/enhanced/IoTDBPipeNullValueIT.java | 93 ++++++++++++++++++++++ .../relational/it/session/IoTDBObjectDeleteIT.java | 11 ++- .../tablet/parser/TabletInsertionEventParser.java | 1 + 3 files changed, 103 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeNullValueIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeNullValueIT.java index 9d635cd8095..b2a6b253063 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeNullValueIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeNullValueIT.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ITableSession; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced; @@ -30,12 +31,17 @@ import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils; import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; @@ -118,6 +124,83 @@ public class IoTDBPipeNullValueIT extends AbstractPipeTableModelDualManualIT { TableModelUtils.assertCountData("test", "test", 400, receiverEnv, handleFailure); } + private void testSessionInsertTabletWithParsingAllNullObjectColumnTemplate(final String realtime) + throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + final Consumer<String> handleFailure = + o -> { + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); + }; + + try (final ITableSession session = senderEnv.getTableSessionConnection()) { + session.executeNonQueryStatement("create database if not exists test"); + session.executeNonQueryStatement("use test"); + session.executeNonQueryStatement( + "create table if not exists object_test(id string tag, reading float field, file object field)"); + } + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map<String, String> extractorAttributes = new HashMap<>(); + final Map<String, String> processorAttributes = new HashMap<>(); + final Map<String, String> connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + extractorAttributes.put("capture.table", "true"); + extractorAttributes.put("realtime-mode", realtime); + extractorAttributes.put("start-time", "2"); + extractorAttributes.put("end-time", "4"); + extractorAttributes.put("database-name", "test"); + extractorAttributes.put("table-name", "object_test"); + extractorAttributes.put("user", "root"); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("test", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + + try (final ITableSession session = senderEnv.getTableSessionConnection()) { + session.executeNonQueryStatement("use test"); + + final Tablet tablet = + new Tablet( + "object_test", + Arrays.asList("id", "reading", "file"), + Arrays.asList(TSDataType.STRING, TSDataType.FLOAT, TSDataType.OBJECT), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD), + 8); + + for (long timestamp = 1; timestamp <= 5; ++timestamp) { + final int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue(rowIndex, 0, "device1"); + tablet.addValue(rowIndex, 1, (float) timestamp); + } + + session.insert(tablet); + session.executeNonQueryStatement("flush"); + } + + TableModelUtils.assertCountData("test", "object_test", 3, receiverEnv, handleFailure); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(*) from object_test where time < 2 or time > 4", + "_col0,", + Collections.singleton("0,"), + "test", + handleFailure); + } + // ---------------------- // // Scenario 1: SQL Insert // // ---------------------- // @@ -183,4 +266,14 @@ public class IoTDBPipeNullValueIT extends AbstractPipeTableModelDualManualIT { public void testSessionInsertTabletWithoutParsingStream() throws Exception { testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, false, "stream"); } + + @Test + public void testSessionInsertTabletWithParsingForcedLogAndAllNullObjectColumn() throws Exception { + testSessionInsertTabletWithParsingAllNullObjectColumnTemplate("forced-log"); + } + + @Test + public void testSessionInsertTabletWithParsingStreamAndAllNullObjectColumn() throws Exception { + testSessionInsertTabletWithParsingAllNullObjectColumnTemplate("stream"); + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java index e2c7fe73359..c80d329cdf5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java @@ -34,6 +34,7 @@ import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.write.record.Tablet; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -51,6 +52,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertNull; @@ -292,8 +294,13 @@ public class IoTDBObjectDeleteIT { session.executeNonQueryStatement("drop database db1"); } - Assert.assertFalse(objectFileExists("object_table", "1", "5", "3", "file", "1.bin")); - Assert.assertFalse(objectFileExists("object_table", "1", "5", "3", "file", "2.bin")); + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted( + () -> { + Assert.assertFalse(objectFileExists("object_table", "1", "5", "3", "file", "1.bin")); + Assert.assertFalse(objectFileExists("object_table", "1", "5", "3", "file", "2.bin")); + }); } @Test diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java index 935131da9dc..9fea9aee690 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java @@ -621,6 +621,7 @@ public abstract class TabletInsertionEventParser { case TEXT: case BLOB: case STRING: + case OBJECT: final Binary[] columns = new Binary[rowSize]; Arrays.fill(columns, Binary.EMPTY_VALUE); valueColumns[columnIndex] = columns;
