This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3d1d28141b8 Pipe: Enable 'sink.format'='tablet' to force tsfiles to be
parsed into tablets (#12809)
3d1d28141b8 is described below
commit 3d1d28141b82a35ae7099be8e47145ddae691060
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 27 00:08:28 2024 +0800
Pipe: Enable 'sink.format'='tablet' to force tsfiles to be parsed into
tablets (#12809)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 11 ++++++++++-
.../pipe/task/builder/PipeDataNodeTaskBuilder.java | 23 +++++++++++++++++-----
.../pipe/task/connection/PipeEventCollector.java | 8 ++++++--
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 20 ++++++++++---------
4 files changed, 45 insertions(+), 17 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index d7149a401b6..7fda1250b1b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -99,8 +99,17 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
}
}
+ @Test
+ public void testSinkTabletFormat() throws Exception {
+ testSinkFormat("tablet");
+ }
+
@Test
public void testSinkTsFileFormat() throws Exception {
+ testSinkFormat("tsfile");
+ }
+
+ private void testSinkFormat(final String format) throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -127,7 +136,7 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
- connectorAttributes.put("connector.format", "tsfile");
+ connectorAttributes.put("connector.format", format);
connectorAttributes.put("connector.realtime-first", "false");
Assert.assertEquals(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
index 04d5ece6e3b..31f46e73e3a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
@@ -34,9 +34,16 @@ import
org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
import
org.apache.iotdb.db.subscription.task.stage.SubscriptionTaskConnectorStage;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import java.util.Arrays;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
+
public class PipeDataNodeTaskBuilder {
private final PipeStaticMeta pipeStaticMeta;
@@ -48,7 +55,7 @@ public class PipeDataNodeTaskBuilder {
static {
PROCESSOR_EXECUTOR =
PipeSubtaskExecutorManager.getInstance().getProcessorExecutor();
- CONNECTOR_EXECUTOR_MAP = new HashMap<>();
+ CONNECTOR_EXECUTOR_MAP = new EnumMap<>(PipeType.class);
CONNECTOR_EXECUTOR_MAP.put(
PipeType.USER,
PipeSubtaskExecutorManager.getInstance().getConnectorExecutor());
CONNECTOR_EXECUTOR_MAP.put(
@@ -60,7 +67,7 @@ public class PipeDataNodeTaskBuilder {
protected final Map<String, String> systemParameters = new HashMap<>();
public PipeDataNodeTaskBuilder(
- PipeStaticMeta pipeStaticMeta, int regionId, PipeTaskMeta pipeTaskMeta) {
+ final PipeStaticMeta pipeStaticMeta, final int regionId, final
PipeTaskMeta pipeTaskMeta) {
this.pipeStaticMeta = pipeStaticMeta;
this.regionId = regionId;
this.pipeTaskMeta = pipeTaskMeta;
@@ -80,7 +87,7 @@ public class PipeDataNodeTaskBuilder {
pipeTaskMeta);
final PipeTaskConnectorStage connectorStage;
- PipeType pipeType = pipeStaticMeta.getPipeType();
+ final PipeType pipeType = pipeStaticMeta.getPipeType();
if (PipeType.SUBSCRIPTION.equals(pipeType)) {
connectorStage =
new SubscriptionTaskConnectorStage(
@@ -109,7 +116,13 @@ public class PipeDataNodeTaskBuilder {
extractorStage.getEventSupplier(),
connectorStage.getPipeConnectorPendingQueue(),
PROCESSOR_EXECUTOR,
- pipeTaskMeta);
+ pipeTaskMeta,
+ pipeStaticMeta
+ .getConnectorParameters()
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
+ CONNECTOR_FORMAT_HYBRID_VALUE)
+ .equals(CONNECTOR_FORMAT_TABLET_VALUE));
return new PipeDataNodeTask(
pipeStaticMeta.getPipeName(), regionId, extractorStage,
processorStage, connectorStage);
@@ -121,7 +134,7 @@ public class PipeDataNodeTaskBuilder {
}
}
- private PipeParameters blendUserAndSystemParameters(PipeParameters
userParameters) {
+ private PipeParameters blendUserAndSystemParameters(final PipeParameters
userParameters) {
// Deep copy the user parameters to avoid modification of the original
parameters.
// If the original parameters are modified, progress index report will be
affected.
final Map<String, String> blendedParameters = new
HashMap<>(userParameters.getAttribute());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index c97257cae61..f760d6cb4fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -52,15 +52,19 @@ public class PipeEventCollector implements EventCollector {
private final int regionId;
+ private final boolean forceTabletFormat;
+
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
public PipeEventCollector(
final UnboundedBlockingPendingQueue<Event> pendingQueue,
final long creationTime,
- final int regionId) {
+ final int regionId,
+ final boolean forceTabletFormat) {
this.pendingQueue = pendingQueue;
this.creationTime = creationTime;
this.regionId = regionId;
+ this.forceTabletFormat = forceTabletFormat;
}
@Override
@@ -118,7 +122,7 @@ public class PipeEventCollector implements EventCollector {
return;
}
- if (!sourceEvent.shouldParseTimeOrPattern()) {
+ if (!forceTabletFormat && !sourceEvent.shouldParseTimeOrPattern()) {
collectEvent(sourceEvent);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 37d3e14ddc5..e7ab0cb1997 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -60,14 +60,15 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
* {@link PipeProcessor#customize(PipeParameters,
PipeProcessorRuntimeConfiguration)}}
*/
public PipeTaskProcessorStage(
- String pipeName,
- long creationTime,
- PipeParameters pipeProcessorParameters,
- int regionId,
- EventSupplier pipeExtractorInputEventSupplier,
- UnboundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue,
- PipeProcessorSubtaskExecutor executor,
- PipeTaskMeta pipeTaskMeta) {
+ final String pipeName,
+ final long creationTime,
+ final PipeParameters pipeProcessorParameters,
+ final int regionId,
+ final EventSupplier pipeExtractorInputEventSupplier,
+ final UnboundedBlockingPendingQueue<Event>
pipeConnectorOutputPendingQueue,
+ final PipeProcessorSubtaskExecutor executor,
+ final PipeTaskMeta pipeTaskMeta,
+ final boolean forceTabletFormat) {
final PipeProcessorRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
new PipeTaskProcessorRuntimeEnvironment(
@@ -98,7 +99,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
// old one, so we need creationTime to make their hash code different in
the map.
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
final PipeEventCollector pipeConnectorOutputEventCollector =
- new PipeEventCollector(pipeConnectorOutputPendingQueue, creationTime,
regionId);
+ new PipeEventCollector(
+ pipeConnectorOutputPendingQueue, creationTime, regionId,
forceTabletFormat);
this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,