This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch index-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/index-13 by this push:
     new 3b03f881362 [To dev/1.3] Pipe: Fixed the bug that a realtime only pipe 
will not transfer historical data when it is altered to a historical pipe 
(#17223)
3b03f881362 is described below

commit 3b03f881362971c0975c2813f2612987b2bb7335
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 27 15:58:54 2026 +0800

    [To dev/1.3] Pipe: Fixed the bug that a realtime only pipe will not 
transfer historical data when it is altered to a historical pipe (#17223)
---
 .../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java | 45 ++++++++++++++++++++++
 .../impl/pipe/task/AlterPipeProcedureV2.java       |  8 +++-
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
index 3e4200daf32..b9644159386 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
@@ -534,4 +534,49 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
     TestUtils.assertDataEventuallyOnEnv(
         receiverEnv, "count timeSeries", "count(timeseries),", 
Collections.singleton("3,"));
   }
+
+  @Test
+  public void testAlterPipeRealtime() {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // Insert data on sender
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2), 
(2000, 3), (2500, 4), (3000, 5)",
+            "flush"),
+        null);
+
+    // Create pipe
+    final String sql =
+        String.format(
+            "create pipe a2b with source ('history.enable'='false') with sink 
('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    TestUtils.assertDataAlwaysOnEnv(
+        receiverEnv,
+        "count timeSeries root.db.**",
+        "count(timeseries),",
+        Collections.singleton("0,"));
+
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b modify source 
('history.enable'='true')");
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "count timeSeries root.db.**",
+        "count(timeseries),",
+        Collections.singleton("1,"));
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 5a7b9b8a5b2..dcebb0b3d33 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
@@ -169,7 +170,12 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                 updatedConsensusGroupIdToTaskMetaMap.put(
                     regionGroupId.getId(),
                     new PipeTaskMeta(
-                        currentPipeTaskMeta.getProgressIndex(),
+                        PipeTaskAgent.isRealtimeOnlyPipe(
+                                    
currentPipeStaticMeta.getExtractorParameters())
+                                && !PipeTaskAgent.isRealtimeOnlyPipe(
+                                    
updatedPipeStaticMeta.getExtractorParameters())
+                            ? MinimumProgressIndex.INSTANCE
+                            : currentPipeTaskMeta.getProgressIndex(),
                         
PipeTaskMeta.isNewlyAdded(currentPipeTaskMeta.getLeaderNodeId())
                                 && !(!PipeTaskAgent.isHistoryOnlyPipe(
                                         
currentPipeStaticMeta.getExtractorParameters())

Reply via email to