This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d1d28141b8 Pipe: Enable 'sink.format'='tablet' to force tsfiles to be 
parsed into tablets (#12809)
3d1d28141b8 is described below

commit 3d1d28141b82a35ae7099be8e47145ddae691060
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 27 00:08:28 2024 +0800

    Pipe: Enable 'sink.format'='tablet' to force tsfiles to be parsed into 
tablets (#12809)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    | 11 ++++++++++-
 .../pipe/task/builder/PipeDataNodeTaskBuilder.java | 23 +++++++++++++++++-----
 .../pipe/task/connection/PipeEventCollector.java   |  8 ++++++--
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 20 ++++++++++---------
 4 files changed, 45 insertions(+), 17 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index d7149a401b6..7fda1250b1b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -99,8 +99,17 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
     }
   }
 
+  @Test
+  public void testSinkTabletFormat() throws Exception {
+    testSinkFormat("tablet");
+  }
+
   @Test
   public void testSinkTsFileFormat() throws Exception {
+    testSinkFormat("tsfile");
+  }
+
+  private void testSinkFormat(final String format) throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -127,7 +136,7 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
       connectorAttributes.put("connector.batch.enable", "false");
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
-      connectorAttributes.put("connector.format", "tsfile");
+      connectorAttributes.put("connector.format", format);
       connectorAttributes.put("connector.realtime-first", "false");
 
       Assert.assertEquals(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
index 04d5ece6e3b..31f46e73e3a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
@@ -34,9 +34,16 @@ import 
org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
 import 
org.apache.iotdb.db.subscription.task.stage.SubscriptionTaskConnectorStage;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
+import java.util.Arrays;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
+
 public class PipeDataNodeTaskBuilder {
 
   private final PipeStaticMeta pipeStaticMeta;
@@ -48,7 +55,7 @@ public class PipeDataNodeTaskBuilder {
 
   static {
     PROCESSOR_EXECUTOR = 
PipeSubtaskExecutorManager.getInstance().getProcessorExecutor();
-    CONNECTOR_EXECUTOR_MAP = new HashMap<>();
+    CONNECTOR_EXECUTOR_MAP = new EnumMap<>(PipeType.class);
     CONNECTOR_EXECUTOR_MAP.put(
         PipeType.USER, 
PipeSubtaskExecutorManager.getInstance().getConnectorExecutor());
     CONNECTOR_EXECUTOR_MAP.put(
@@ -60,7 +67,7 @@ public class PipeDataNodeTaskBuilder {
   protected final Map<String, String> systemParameters = new HashMap<>();
 
   public PipeDataNodeTaskBuilder(
-      PipeStaticMeta pipeStaticMeta, int regionId, PipeTaskMeta pipeTaskMeta) {
+      final PipeStaticMeta pipeStaticMeta, final int regionId, final 
PipeTaskMeta pipeTaskMeta) {
     this.pipeStaticMeta = pipeStaticMeta;
     this.regionId = regionId;
     this.pipeTaskMeta = pipeTaskMeta;
@@ -80,7 +87,7 @@ public class PipeDataNodeTaskBuilder {
             pipeTaskMeta);
 
     final PipeTaskConnectorStage connectorStage;
-    PipeType pipeType = pipeStaticMeta.getPipeType();
+    final PipeType pipeType = pipeStaticMeta.getPipeType();
     if (PipeType.SUBSCRIPTION.equals(pipeType)) {
       connectorStage =
           new SubscriptionTaskConnectorStage(
@@ -109,7 +116,13 @@ public class PipeDataNodeTaskBuilder {
             extractorStage.getEventSupplier(),
             connectorStage.getPipeConnectorPendingQueue(),
             PROCESSOR_EXECUTOR,
-            pipeTaskMeta);
+            pipeTaskMeta,
+            pipeStaticMeta
+                .getConnectorParameters()
+                .getStringOrDefault(
+                    Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
+                    CONNECTOR_FORMAT_HYBRID_VALUE)
+                .equals(CONNECTOR_FORMAT_TABLET_VALUE));
 
     return new PipeDataNodeTask(
         pipeStaticMeta.getPipeName(), regionId, extractorStage, 
processorStage, connectorStage);
@@ -121,7 +134,7 @@ public class PipeDataNodeTaskBuilder {
     }
   }
 
-  private PipeParameters blendUserAndSystemParameters(PipeParameters 
userParameters) {
+  private PipeParameters blendUserAndSystemParameters(final PipeParameters 
userParameters) {
     // Deep copy the user parameters to avoid modification of the original 
parameters.
     // If the original parameters are modified, progress index report will be 
affected.
     final Map<String, String> blendedParameters = new 
HashMap<>(userParameters.getAttribute());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index c97257cae61..f760d6cb4fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -52,15 +52,19 @@ public class PipeEventCollector implements EventCollector {
 
   private final int regionId;
 
+  private final boolean forceTabletFormat;
+
   private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
 
   public PipeEventCollector(
       final UnboundedBlockingPendingQueue<Event> pendingQueue,
       final long creationTime,
-      final int regionId) {
+      final int regionId,
+      final boolean forceTabletFormat) {
     this.pendingQueue = pendingQueue;
     this.creationTime = creationTime;
     this.regionId = regionId;
+    this.forceTabletFormat = forceTabletFormat;
   }
 
   @Override
@@ -118,7 +122,7 @@ public class PipeEventCollector implements EventCollector {
       return;
     }
 
-    if (!sourceEvent.shouldParseTimeOrPattern()) {
+    if (!forceTabletFormat && !sourceEvent.shouldParseTimeOrPattern()) {
       collectEvent(sourceEvent);
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 37d3e14ddc5..e7ab0cb1997 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -60,14 +60,15 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
    *     {@link PipeProcessor#customize(PipeParameters, 
PipeProcessorRuntimeConfiguration)}}
    */
   public PipeTaskProcessorStage(
-      String pipeName,
-      long creationTime,
-      PipeParameters pipeProcessorParameters,
-      int regionId,
-      EventSupplier pipeExtractorInputEventSupplier,
-      UnboundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue,
-      PipeProcessorSubtaskExecutor executor,
-      PipeTaskMeta pipeTaskMeta) {
+      final String pipeName,
+      final long creationTime,
+      final PipeParameters pipeProcessorParameters,
+      final int regionId,
+      final EventSupplier pipeExtractorInputEventSupplier,
+      final UnboundedBlockingPendingQueue<Event> 
pipeConnectorOutputPendingQueue,
+      final PipeProcessorSubtaskExecutor executor,
+      final PipeTaskMeta pipeTaskMeta,
+      final boolean forceTabletFormat) {
     final PipeProcessorRuntimeConfiguration runtimeConfiguration =
         new PipeTaskRuntimeConfiguration(
             new PipeTaskProcessorRuntimeEnvironment(
@@ -98,7 +99,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     // old one, so we need creationTime to make their hash code different in 
the map.
     final String taskId = pipeName + "_" + regionId + "_" + creationTime;
     final PipeEventCollector pipeConnectorOutputEventCollector =
-        new PipeEventCollector(pipeConnectorOutputPendingQueue, creationTime, 
regionId);
+        new PipeEventCollector(
+            pipeConnectorOutputPendingQueue, creationTime, regionId, 
forceTabletFormat);
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,

Reply via email to