This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ceaae51b35a Pipe: Fix tablet event alignment and row count handling 
(#18034)
ceaae51b35a is described below

commit ceaae51b35a734a42d5a7e749db9c44de23d68f1
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:11:46 2026 +0800

    Pipe: Fix tablet event alignment and row count handling (#18034)
    
    * Pipe: Fix tablet event alignment and row count handling (#17999)
    
    * Pipe: Fix tablet event alignment and row count handling
    
    * Pipe: Add IT for tablet time range filtering
    
    * Update IoTDBPipeSourceIT.java
    
    (cherry picked from commit f1b4508802980f1f09aca3dd1ac204b1692a9873)
    
    * Fix pipe source IT tablet schema type
    
    * Fix pipe source tablet row count test
---
 .../pipe/it/autocreate/IoTDBPipeSourceIT.java      | 53 ++++++++++++----
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  8 ++-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  5 +-
 .../event/common/tablet/PipeTabletCollector.java   |  6 ++
 .../tablet/TabletInsertionDataContainer.java       | 15 ++++-
 .../pipe/event/PipeTabletInsertionEventTest.java   | 73 ++++++++++++++++++++++
 6 files changed, 141 insertions(+), 19 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
index bb160a6f295..2bd290bfc81 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -33,6 +34,9 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
 import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -791,11 +795,10 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualAutoIT {
                   .setProcessorAttributes(processorAttributes));
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
 
+      final Map<String, String> expectedCountResult = new HashMap<>();
+      expectedCountResult.put("count(root.db.d1.at1)", "3");
       TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          "select count(*) from root.**",
-          "count(root.db.d1.at1),",
-          Collections.singleton("3,"));
+          receiverEnv, "select count(*) from root.db.**", expectedCountResult);
 
       // Insert realtime data that overlapped with time range
       TestUtils.executeNonQueries(
@@ -806,11 +809,29 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualAutoIT {
               "flush"),
           null);
 
+      expectedCountResult.put("count(root.db.d3.at1)", "3");
       TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          "select count(*) from root.**",
-          "count(root.db.d1.at1),count(root.db.d3.at1),",
-          Collections.singleton("3,3,"));
+          receiverEnv, "select count(*) from root.db.**", expectedCountResult);
+
+      // Session Tablet can have unused timestamp slots when rowSize is 
smaller than maxRowNumber.
+      // The pipe source time range filter should ignore the unused zero tail.
+      final List<MeasurementSchema> schemas =
+          Collections.singletonList(new MeasurementSchema("at1", 
TSDataType.INT32));
+      final Tablet tabletWithUnusedTail = new Tablet("root.db.d5", schemas, 5);
+      for (int time = 2000; time <= 4000; time += 1000) {
+        final int rowIndex = tabletWithUnusedTail.rowSize++;
+        tabletWithUnusedTail.addTimestamp(rowIndex, time);
+        tabletWithUnusedTail.addValue("at1", rowIndex, time / 1000);
+      }
+      Assert.assertEquals(3, tabletWithUnusedTail.rowSize);
+      Assert.assertEquals(5, tabletWithUnusedTail.timestamps.length);
+      try (final ISession session = senderEnv.getSessionConnection()) {
+        session.insertTablet(tabletWithUnusedTail);
+      }
+
+      expectedCountResult.put("count(root.db.d5.at1)", "3");
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv, "select count(*) from root.db.**", expectedCountResult);
 
       // Insert realtime data that does not overlap with time range
       TestUtils.executeNonQueries(
@@ -823,9 +844,19 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualAutoIT {
 
       TestUtils.assertDataAlwaysOnEnv(
           receiverEnv,
-          "select count(*) from root.**",
-          "count(root.db.d1.at1),count(root.db.d3.at1),",
-          Collections.singleton("3,3,"));
+          "select count(at1) from root.db.d1, root.db.d3, root.db.d5",
+          "count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),",
+          Collections.singleton("3,3,3,"));
+      TestUtils.assertDataAlwaysOnEnv(
+          receiverEnv,
+          "show timeseries root.db.d2.**",
+          
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+          Collections.emptySet());
+      TestUtils.assertDataAlwaysOnEnv(
+          receiverEnv,
+          "show timeseries root.db.d4.**",
+          
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+          Collections.emptySet());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index bc7040a0598..84a0f533e23 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -214,12 +214,14 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       }
 
       if (insertNode instanceof InsertTabletNode) {
-        final long[] timestamps = ((InsertTabletNode) insertNode).getTimes();
-        if (Objects.isNull(timestamps) || timestamps.length == 0) {
+        final InsertTabletNode insertTabletNode = (InsertTabletNode) 
insertNode;
+        final long[] timestamps = insertTabletNode.getTimes();
+        final int rowCount = insertTabletNode.getRowCount();
+        if (Objects.isNull(timestamps) || rowCount <= 0) {
           return false;
         }
         // We assume that `timestamps` is ordered.
-        return startTime <= timestamps[timestamps.length - 1] && timestamps[0] 
<= endTime;
+        return startTime <= timestamps[rowCount - 1] && timestamps[0] <= 
endTime;
       }
 
       if (insertNode instanceof InsertRowsNode) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 6d6995f0231..f47544ab64f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -318,11 +318,12 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public boolean mayEventTimeOverlappedWithTimeRange() {
     final long[] timestamps = tablet.timestamps;
-    if (Objects.isNull(timestamps) || timestamps.length == 0) {
+    final int rowSize = tablet.rowSize;
+    if (Objects.isNull(timestamps) || rowSize <= 0) {
       return false;
     }
     // We assume that `timestamps` is ordered.
-    return startTime <= timestamps[timestamps.length - 1] && timestamps[0] <= 
endTime;
+    return startTime <= timestamps[rowSize - 1] && timestamps[0] <= endTime;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
index b9da59f3111..a6a69144028 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
@@ -31,6 +31,12 @@ public class PipeTabletCollector extends 
PipeRawTabletEventConverter implements
     super(pipeTaskMeta, sourceEvent);
   }
 
+  public PipeTabletCollector(
+      PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, boolean isAligned) 
{
+    this(pipeTaskMeta, sourceEvent);
+    this.isAligned = isAligned;
+  }
+
   public PipeTabletCollector(
       PipeTaskMeta pipeTaskMeta,
       EnrichedEvent sourceEvent,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index d8c2bccaa97..a2a8c27ef26 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -218,8 +218,13 @@ public class TabletInsertionDataContainer {
     this.isAligned = insertTabletNode.isAligned();
 
     final long[] originTimestampColumn = insertTabletNode.getTimes();
-    final List<Integer> rowIndexList = 
generateRowIndexList(originTimestampColumn);
-    this.timestampColumn = rowIndexList.stream().mapToLong(i -> 
originTimestampColumn[i]).toArray();
+    final int originRowCount = insertTabletNode.getRowCount();
+    final long[] actualTimestampColumn =
+        originTimestampColumn.length == originRowCount
+            ? originTimestampColumn
+            : Arrays.copyOf(originTimestampColumn, originRowCount);
+    final List<Integer> rowIndexList = 
generateRowIndexList(actualTimestampColumn);
+    this.timestampColumn = rowIndexList.stream().mapToLong(i -> 
actualTimestampColumn[i]).toArray();
 
     generateColumnIndexMapper(
         insertTabletNode.getMeasurements(),
@@ -407,6 +412,9 @@ public class TabletInsertionDataContainer {
 
   private List<Integer> generateRowIndexList(final long[] 
originTimestampColumn) {
     final int rowCount = originTimestampColumn.length;
+    if (rowCount == 0) {
+      return generateFullRowIndexList(rowCount);
+    }
     if (Objects.isNull(sourceEvent) || !sourceEvent.shouldParseTime()) {
       return generateFullRowIndexList(rowCount);
     }
@@ -680,7 +688,8 @@ public class TabletInsertionDataContainer {
 
   public List<TabletInsertionEvent> processTabletWithCollect(
       BiConsumer<Tablet, TabletCollector> consumer) {
-    final PipeTabletCollector tabletCollector = new 
PipeTabletCollector(pipeTaskMeta, sourceEvent);
+    final PipeTabletCollector tabletCollector =
+        new PipeTabletCollector(pipeTaskMeta, sourceEvent, isAligned);
     consumer.accept(convertToTablet(), tabletCollector);
     return tabletCollector.convertToTabletInsertionEvents(shouldReport);
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 8a290bd1803..b46a86e9944 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -24,12 +24,14 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
 import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow;
 import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.TabletInsertionDataContainer;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
@@ -42,6 +44,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.time.LocalDate;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -323,6 +326,30 @@ public class PipeTabletInsertionEventTest {
     Assert.assertTrue(isAligned4);
   }
 
+  @Test
+  public void processAlignedTabletWithCollectPreservesAlignmentForTest() {
+    final PipeRawTabletInsertionEvent event =
+        new PipeRawTabletInsertionEvent(
+            tabletForInsertTabletNode, true, new PrefixPipePattern(pattern));
+
+    final List<TabletInsertionEvent> events = new ArrayList<>();
+    event
+        .processTabletWithCollect(
+            (tablet, collector) -> {
+              try {
+                collector.collectTablet(tablet);
+              } catch (final Exception e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .forEach(events::add);
+
+    Assert.assertEquals(1, events.size());
+    final PipeRawTabletInsertionEvent collectedEvent = 
(PipeRawTabletInsertionEvent) events.get(0);
+    Assert.assertEquals(tabletForInsertTabletNode, 
collectedEvent.convertToTablet());
+    Assert.assertTrue(collectedEvent.isAligned());
+  }
+
   @Test
   public void collectRowWithOverriddenTreeDatabaseForTest() {
     final PipeRowCollector rowCollector = new PipeRowCollector(null, null, 
"root.test.sg_0", false);
@@ -449,4 +476,50 @@ public class PipeTabletInsertionEventTest {
     event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 115L, 
Long.MAX_VALUE);
     Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
   }
+
+  @Test
+  public void isEventTimeOverlappedWithTimeRangeUsesActualRowSizeForTest() 
throws Exception {
+    final long[] timestamps = new long[] {110L, 111L, 112L, 0L, 0L};
+
+    final Tablet partialTablet = new Tablet(deviceId, Arrays.asList(schemas), 
times.length);
+    partialTablet.timestamps = timestamps;
+    partialTablet.rowSize = 3;
+
+    PipeRawTabletInsertionEvent rawEvent =
+        new PipeRawTabletInsertionEvent(partialTablet, 111L, 112L);
+    Assert.assertTrue(rawEvent.mayEventTimeOverlappedWithTimeRange());
+    rawEvent = new PipeRawTabletInsertionEvent(partialTablet, 113L, 
Long.MAX_VALUE);
+    Assert.assertFalse(rawEvent.mayEventTimeOverlappedWithTimeRange());
+
+    final InsertTabletNode partialInsertTabletNode =
+        new InsertTabletNode(
+            new PlanNodeId("partial tablet node"),
+            new PartialPath(deviceId),
+            false,
+            measurementIds,
+            dataTypes,
+            schemas,
+            timestamps,
+            null,
+            insertTabletNode.getColumns(),
+            3);
+
+    final Tablet convertedTablet =
+        new TabletInsertionDataContainer(partialInsertTabletNode, new 
PrefixPipePattern(pattern))
+            .convertToTablet();
+    Assert.assertEquals(3, convertedTablet.rowSize);
+    Assert.assertArrayEquals(
+        new long[] {110L, 111L, 112L},
+        Arrays.copyOf(convertedTablet.timestamps, convertedTablet.rowSize));
+
+    PipeInsertNodeTabletInsertionEvent insertNodeEvent =
+        new PipeInsertNodeTabletInsertionEvent(partialInsertTabletNode)
+            .shallowCopySelfAndBindPipeTaskMetaForProgressReport(null, 0, 
null, null, 111L, 112L);
+    Assert.assertTrue(insertNodeEvent.mayEventTimeOverlappedWithTimeRange());
+    insertNodeEvent =
+        new PipeInsertNodeTabletInsertionEvent(partialInsertTabletNode)
+            .shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+                null, 0, null, null, 113L, Long.MAX_VALUE);
+    Assert.assertFalse(insertNodeEvent.mayEventTimeOverlappedWithTimeRange());
+  }
 }

Reply via email to