This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch patch-2094
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 233f86a13078e1e13703eb08ee35a662c3c8d40b
Author: 罗振羽 <[email protected]>
AuthorDate: Thu Apr 30 10:44:17 2026 +0000

    [TIMECHODB]Fix pipe parsing for null object columns
    
    (cherry picked from commit 19a868ffd5d71eba71e181d5a5a714334799578b)
---
 .../manual/enhanced/IoTDBPipeNullValueIT.java      | 93 ++++++++++++++++++++++
 .../relational/it/session/IoTDBObjectDeleteIT.java | 11 ++-
 .../tablet/parser/TabletInsertionEventParser.java  |  1 +
 3 files changed, 103 insertions(+), 2 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeNullValueIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeNullValueIT.java
index 9d635cd8095..b2a6b253063 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeNullValueIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeNullValueIT.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ITableSession;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
@@ -30,12 +31,17 @@ import 
org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
 import 
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Consumer;
@@ -118,6 +124,83 @@ public class IoTDBPipeNullValueIT extends 
AbstractPipeTableModelDualManualIT {
     TableModelUtils.assertCountData("test", "test", 400, receiverEnv, 
handleFailure);
   }
 
+  private void 
testSessionInsertTabletWithParsingAllNullObjectColumnTemplate(final String 
realtime)
+      throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+    final Consumer<String> handleFailure =
+        o -> {
+          TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+          TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+        };
+
+    try (final ITableSession session = senderEnv.getTableSessionConnection()) {
+      session.executeNonQueryStatement("create database if not exists test");
+      session.executeNonQueryStatement("use test");
+      session.executeNonQueryStatement(
+          "create table if not exists object_test(id string tag, reading float 
field, file object field)");
+    }
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      extractorAttributes.put("capture.table", "true");
+      extractorAttributes.put("realtime-mode", realtime);
+      extractorAttributes.put("start-time", "2");
+      extractorAttributes.put("end-time", "4");
+      extractorAttributes.put("database-name", "test");
+      extractorAttributes.put("table-name", "object_test");
+      extractorAttributes.put("user", "root");
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("test", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+    }
+
+    try (final ITableSession session = senderEnv.getTableSessionConnection()) {
+      session.executeNonQueryStatement("use test");
+
+      final Tablet tablet =
+          new Tablet(
+              "object_test",
+              Arrays.asList("id", "reading", "file"),
+              Arrays.asList(TSDataType.STRING, TSDataType.FLOAT, 
TSDataType.OBJECT),
+              Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, 
ColumnCategory.FIELD),
+              8);
+
+      for (long timestamp = 1; timestamp <= 5; ++timestamp) {
+        final int rowIndex = tablet.getRowSize();
+        tablet.addTimestamp(rowIndex, timestamp);
+        tablet.addValue(rowIndex, 0, "device1");
+        tablet.addValue(rowIndex, 1, (float) timestamp);
+      }
+
+      session.insert(tablet);
+      session.executeNonQueryStatement("flush");
+    }
+
+    TableModelUtils.assertCountData("test", "object_test", 3, receiverEnv, 
handleFailure);
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select count(*) from object_test where time < 2 or time > 4",
+        "_col0,",
+        Collections.singleton("0,"),
+        "test",
+        handleFailure);
+  }
+
   // ---------------------- //
   // Scenario 1: SQL Insert //
   // ---------------------- //
@@ -183,4 +266,14 @@ public class IoTDBPipeNullValueIT extends 
AbstractPipeTableModelDualManualIT {
   public void testSessionInsertTabletWithoutParsingStream() throws Exception {
     testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, false, 
"stream");
   }
+
+  @Test
+  public void 
testSessionInsertTabletWithParsingForcedLogAndAllNullObjectColumn() throws 
Exception {
+    
testSessionInsertTabletWithParsingAllNullObjectColumnTemplate("forced-log");
+  }
+
+  @Test
+  public void testSessionInsertTabletWithParsingStreamAndAllNullObjectColumn() 
throws Exception {
+    testSessionInsertTabletWithParsingAllNullObjectColumnTemplate("stream");
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java
index e2c7fe73359..c80d329cdf5 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java
@@ -34,6 +34,7 @@ import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.write.record.Tablet;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -51,6 +52,7 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertNull;
 
@@ -292,8 +294,13 @@ public class IoTDBObjectDeleteIT {
       session.executeNonQueryStatement("drop database db1");
     }
 
-    Assert.assertFalse(objectFileExists("object_table", "1", "5", "3", "file", 
"1.bin"));
-    Assert.assertFalse(objectFileExists("object_table", "1", "5", "3", "file", 
"2.bin"));
+    Awaitility.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .untilAsserted(
+            () -> {
+              Assert.assertFalse(objectFileExists("object_table", "1", "5", 
"3", "file", "1.bin"));
+              Assert.assertFalse(objectFileExists("object_table", "1", "5", 
"3", "file", "2.bin"));
+            });
   }
 
   @Test
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
index 935131da9dc..9fea9aee690 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
@@ -621,6 +621,7 @@ public abstract class TabletInsertionEventParser {
       case TEXT:
       case BLOB:
       case STRING:
+      case OBJECT:
         final Binary[] columns = new Binary[rowSize];
         Arrays.fill(columns, Binary.EMPTY_VALUE);
         valueColumns[columnIndex] = columns;

Reply via email to