This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 904a2a52695 [IOTDB-5940] Pipe: support 1.2 -> 1.1 sync connector
(#10107)
904a2a52695 is described below
commit 904a2a52695dc0f62071cb1b723a4c2488f8bc48
Author: Caideyipi <[email protected]>
AuthorDate: Sun Jun 11 01:23:58 2023 +0800
[IOTDB-5940] Pipe: support 1.2 -> 1.1 sync connector (#10107)
Co-authored-by: yschengzi <[email protected]>
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../persistence/pipe/PipePluginInfo.java | 8 +-
.../org/apache/iotdb/pipe/api/PipeCollector.java | 7 +-
.../org/apache/iotdb/pipe/api/PipeConnector.java | 6 +-
.../org/apache/iotdb/pipe/api/PipeProcessor.java | 6 +-
.../iotdb/pipe/api/customizer/PipeStrategy.java | 28 ---
.../PipeCollectorRuntimeConfiguration.java} | 10 +-
.../PipeConnectorRuntimeConfiguration.java} | 6 +-
.../PipeProcessorRuntimeConfiguration.java} | 6 +-
.../PipeRuntimeConfiguration.java} | 7 +-
.../PipeRuntimeEnvironment.java} | 9 +-
.../PipeConnectorRuntimeConfiguration.java | 84 -------
.../retry/EqualRetryIntervalStrategy.java | 63 -----
.../retry/ExponentialRetryIntervalStrategy.java | 70 ------
.../customizer/connector/retry/RetryStrategy.java | 34 ---
.../{ => parameter}/PipeParameterValidator.java | 2 +-
.../customizer/{ => parameter}/PipeParameters.java | 6 +-
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 4 +-
.../plugin/builtin/collector/IoTDBCollector.java | 6 +-
.../builtin/connector/DoNothingConnector.java | 6 +-
...tConnector.java => IoTDBSyncConnectorV1_1.java} | 16 +-
.../builtin/connector/IoTDBThriftConnector.java | 6 +-
.../builtin/processor/DoNothingProcessor.java | 6 +-
.../commons/pipe/task/meta/PipeStaticMeta.java | 2 +-
.../db/pipe/agent/plugin/PipePluginAgent.java | 8 +-
.../pipe/collector/IoTDBDataRegionCollector.java | 68 ++----
.../PipeHistoricalDataRegionTsFileCollector.java | 61 +++--
.../realtime/PipeRealtimeDataRegionCollector.java | 26 +-
.../PipeRealtimeDataRegionFakeCollector.java | 12 +-
.../PipeRealtimeDataRegionHybridCollector.java | 8 +-
.../PipeRealtimeDataRegionLogCollector.java | 7 +-
.../PipeRealtimeDataRegionTsFileCollector.java | 7 +-
.../{ => constant}/PipeCollectorConstant.java | 4 +-
.../{ => constant}/PipeConnectorConstant.java | 7 +-
.../{ => constant}/PipeProcessorConstant.java | 2 +-
.../configuraion/PipeTaskRuntimeConfiguration.java | 42 ++++
.../env/PipeTaskCollectorRuntimeEnvironment.java | 33 ++-
.../plugin/env/PipeTaskRuntimeEnvironment.java | 25 +-
.../lagacy/IoTDBSyncConnectorImplV1_1.java | 268 +++++++++++++++++++++
.../pipe/connector/v1/IoTDBThriftConnectorV1.java | 33 ++-
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 2 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 12 +
.../common/tablet/PipeRawTabletInsertionEvent.java | 2 +-
.../db/pipe/processor/PipeDoNothingProcessor.java | 8 +-
.../pipe/resource/wal/PipeWALResourceManager.java | 5 +-
.../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 16 +-
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 62 ++---
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 10 +-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 29 +--
.../task/subtask/PipeConnectorSubtaskManager.java | 43 ++--
.../collector/CachedSchemaPatternMatcherTest.java | 31 +--
.../db/pipe/collector/PipeRealtimeCollectTest.java | 38 +--
51 files changed, 652 insertions(+), 615 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 22f0cb97943..784bf7df744 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -34,10 +34,10 @@ import
org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl
import org.apache.iotdb.confignode.consensus.response.udf.JarResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
index 724c1595571..8771febead4 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.pipe.api;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
/**
@@ -48,7 +48,6 @@ import org.apache.iotdb.pipe.api.event.Event;
* cancelled (the `DROP PIPE` command is executed).
* </ul>
*/
-// TODO: support event lifecycle management
public interface PipeCollector extends PipePlugin {
/**
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 6d74847e763..083590c1aef 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.pipe.api;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index e94384d5f23..3c35e8b9e13 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.pipe.api;
import org.apache.iotdb.pipe.api.collector.EventCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java
deleted file mode 100644
index 433ccc83512..00000000000
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer;
-
-import org.apache.iotdb.pipe.api.exception.PipeStrategyNotValidException;
-
-public interface PipeStrategy {
-
- /** @throws PipeStrategyNotValidException if invalid strategy is set */
- void check() throws PipeStrategyNotValidException;
-}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
similarity index 75%
rename from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java
rename to
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
index c75f85bb7a7..071bfeb60c6 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java
@@ -17,12 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.customizer;
+package org.apache.iotdb.pipe.api.customizer.configuration;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-
-public interface PipeRuntimeConfiguration {
-
- /** @throws PipeException if invalid runtime configuration is set */
- void check() throws PipeException;
-}
+public interface PipeCollectorRuntimeConfiguration extends
PipeRuntimeConfiguration {}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeConnectorRuntimeConfiguration.java
similarity index 82%
copy from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
copy to
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeConnectorRuntimeConfiguration.java
index a103b1db97a..4fc6813bef8 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeConnectorRuntimeConfiguration.java
@@ -17,8 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.customizer.connector.reuse;
+package org.apache.iotdb.pipe.api.customizer.configuration;
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
-
-public interface ReuseStrategy extends PipeStrategy {}
+public interface PipeConnectorRuntimeConfiguration extends
PipeRuntimeConfiguration {}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeProcessorRuntimeConfiguration.java
similarity index 82%
copy from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
copy to
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeProcessorRuntimeConfiguration.java
index a103b1db97a..04c6f89ddbc 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeProcessorRuntimeConfiguration.java
@@ -17,8 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.customizer.connector.reuse;
+package org.apache.iotdb.pipe.api.customizer.configuration;
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
-
-public interface ReuseStrategy extends PipeStrategy {}
+public interface PipeProcessorRuntimeConfiguration extends
PipeRuntimeConfiguration {}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeConfiguration.java
similarity index 82%
rename from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
rename to
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeConfiguration.java
index a103b1db97a..827d9f93ae7 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeConfiguration.java
@@ -17,8 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.customizer.connector.reuse;
+package org.apache.iotdb.pipe.api.customizer.configuration;
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
+public interface PipeRuntimeConfiguration {
-public interface ReuseStrategy extends PipeStrategy {}
+ PipeRuntimeEnvironment getRuntimeEnvironment();
+}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
similarity index 81%
rename from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java
rename to
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
index 3df93c6c5cd..455d293dccc 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
@@ -17,8 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.customizer.connector.parallel;
+package org.apache.iotdb.pipe.api.customizer.configuration;
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
+public interface PipeRuntimeEnvironment {
-public interface ParallelStrategy extends PipeStrategy {}
+ String getPipeName();
+
+ long getCreationTime();
+}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java
deleted file mode 100644
index 08417800108..00000000000
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer.connector;
-
-import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
-import
org.apache.iotdb.pipe.api.customizer.connector.parallel.ParallelStrategy;
-import org.apache.iotdb.pipe.api.customizer.connector.retry.RetryStrategy;
-import org.apache.iotdb.pipe.api.customizer.connector.reuse.ReuseStrategy;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-
-/**
- * Used in {@link PipeConnector#customize(PipeParameters,
PipeConnectorRuntimeConfiguration)} to customize
- * the runtime behavior of the PipeConnector.
- * <p>
- * Supports calling methods in a chain.
- * <p>
- * Sample code:
- * <pre>{@code
- * @Override
- * public void beforeStart(PipeParameters params,
PipeConnectorRuntimeConfiguration configs) {
- * configs
- * .reuseStrategy(X)
- * .parallelStrategy(Y)
- * .retryStrategy(Z);
- * }</pre>
- */
-public class PipeConnectorRuntimeConfiguration implements
PipeRuntimeConfiguration {
-
- private ReuseStrategy reuseStrategy;
- private ParallelStrategy parallelStrategy;
- private RetryStrategy retryStrategy;
-
- public PipeConnectorRuntimeConfiguration reuseStrategy(ReuseStrategy
reuseStrategy) {
- this.reuseStrategy = reuseStrategy;
- return this;
- }
-
- public PipeConnectorRuntimeConfiguration parallelStrategy(ParallelStrategy
parallelStrategy) {
- this.parallelStrategy = parallelStrategy;
- return this;
- }
-
- public PipeConnectorRuntimeConfiguration retryStrategy(RetryStrategy
retryStrategy) {
- this.retryStrategy = retryStrategy;
- return this;
- }
-
- @Override
- public void check() throws PipeException {
- if (reuseStrategy == null) {
- throw new PipeException("ReuseStrategy is not set!");
- }
- reuseStrategy.check();
-
- if (parallelStrategy == null) {
- throw new PipeException("ParallelStrategy is not set!");
- }
- parallelStrategy.check();
-
- if (retryStrategy == null) {
- throw new PipeException("RetryStrategy is not set!");
- }
- retryStrategy.check();
- }
-}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java
deleted file mode 100644
index fe693cc6fac..00000000000
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer.connector.retry;
-
-import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.exception.PipeStrategyNotValidException;
-
-/**
- * Used in {@link PipeConnector#customize(PipeParameters,
PipeConnectorRuntimeConfiguration)}. When
- * the PipeConnector fails to connect to the sink, it will try to reconnect by
the specified
- * strategy.
- *
- * <p>When PipeConnector is set to {@link EqualRetryIntervalStrategy}, the
interval of waiting time
- * for retrying will be the same.
- *
- * @see PipeConnector
- * @see PipeConnectorRuntimeConfiguration
- */
-public class EqualRetryIntervalStrategy implements RetryStrategy {
-
- private final int maxRetryTimes;
- private final long retryInterval;
-
- /**
- * @param maxRetryTimes maxRetryTimes > 0
- * @param retryInterval retryInterval > 0
- */
- public EqualRetryIntervalStrategy(int maxRetryTimes, long retryInterval) {
- this.maxRetryTimes = maxRetryTimes;
- this.retryInterval = retryInterval;
- }
-
- @Override
- public void check() {
- if (maxRetryTimes <= 0) {
- throw new PipeStrategyNotValidException(
- String.format("Parameter maxRetryTimes(%d) should be greater than
zero.", maxRetryTimes));
- }
- if (retryInterval <= 0) {
- throw new PipeStrategyNotValidException(
- String.format("Parameter retryInterval(%d) should be greater than
zero.", retryInterval));
- }
- }
-}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java
deleted file mode 100644
index 797372158cd..00000000000
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer.connector.retry;
-
-import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-
-/**
- * Used in {@link PipeConnector#customize(PipeParameters,
PipeConnectorRuntimeConfiguration)}. When
- * the PipeConnector fails to connect to the sink, it will try to reconnect by
the specified
- * strategy.
- *
- * <p>When PipeConnector is set to {@link ExponentialRetryIntervalStrategy},
the interval of waiting
- * time for retrying will be increased exponentially each time.
- *
- * @see PipeConnector
- * @see PipeConnectorRuntimeConfiguration
- */
-public class ExponentialRetryIntervalStrategy implements RetryStrategy {
-
- private final int maxRetryTimes;
- private final long initInterval;
- private final double backOffFactor;
-
- /**
- * @param maxRetryTimes maxRetryTimes > 0
- * @param initInterval retryInterval > 0
- * @param backOffFactor backOffFactor > 0
- */
- public ExponentialRetryIntervalStrategy(
- int maxRetryTimes, long initInterval, double backOffFactor) {
- this.maxRetryTimes = maxRetryTimes;
- this.initInterval = initInterval;
- this.backOffFactor = backOffFactor;
- }
-
- @Override
- public void check() {
- if (maxRetryTimes <= 0) {
- throw new RuntimeException(
- String.format("Parameter maxRetryTimes(%d) should be greater than
zero.", maxRetryTimes));
- }
- if (initInterval <= 0) {
- throw new RuntimeException(
- String.format("Parameter retryInterval(%d) should be greater than
zero.", initInterval));
- }
- if (backOffFactor <= 0) {
- throw new RuntimeException(
- String.format("Parameter backOffFactor(%f) should be greater than
zero.", backOffFactor));
- }
- }
-}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java
deleted file mode 100644
index 27e9c08d1ff..00000000000
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.customizer.connector.retry;
-
-import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.customizer.PipeStrategy;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-
-/**
- * Used to customize the strategy for reconnecting to sinks in {@link
- * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}.
- *
- * <p>When the PipeConnector fails to connect to the sink, it will try to
reconnect by the specified
- * strategy.
- */
-public interface RetryStrategy extends PipeStrategy {}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
similarity index 98%
rename from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
rename to
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
index 2ba9fffa612..a70151a97fe 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.customizer;
+package org.apache.iotdb.pipe.api.customizer.parameter;
import org.apache.iotdb.pipe.api.exception.PipeAttributeNotProvidedException;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
similarity index 94%
rename from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java
rename to
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 1dd852d9b36..cfea33aa3b9 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -17,12 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.customizer;
+package org.apache.iotdb.pipe.api.customizer.parameter;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipeProcessor;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
import java.util.Map;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index d1a95bb45ba..8bf376138e3 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.plugin.builtin;
import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnectorV1_1;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
@@ -34,7 +35,8 @@ public enum BuiltinPipePlugin {
// connectors
DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
- IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class);
+ IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class),
+ IOTDB_SYNC_CONNECTOR_V_1_1("iotdb_sync_connector_v1.1",
IoTDBSyncConnectorV1_1.class),
;
private final String pipePluginName;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
index 9902cce4ca0..8d3f0276126 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.commons.pipe.plugin.builtin.collector;
import org.apache.iotdb.pipe.api.PipeCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
/**
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index 2522fdc66f6..0e18d1ffea9 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java
similarity index 81%
copy from
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
copy to
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java
index 82ddd05ba3c..d994b096353 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java
@@ -20,20 +20,20 @@
package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
/**
- * This class is a placeholder and should not be initialized. It represents
the IoTDB Thrift
- * connector. There is a real implementation in the server module but cannot
be imported here. The
- * pipe agent in the server module will replace this class with the real
implementation when
- * initializing the IoTDB Thrift connector.
+ * This class is a placeholder and should not be initialized. It represents
the IoTDB Sync connector
+ * (for IoTDB v1.1). There is a real implementation in the server module but
cannot be imported
+ * here. The pipe agent in the server module will replace this class with the
real implementation
+ * when initializing the IoTDB Sync connector.
*/
-public class IoTDBThriftConnector implements PipeConnector {
+public class IoTDBSyncConnectorV1_1 implements PipeConnector {
@Override
public void validate(PipeParameterValidator validator) {
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index 82ddd05ba3c..315e1347a6d 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index d083783a716..511414dde73 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -21,9 +21,9 @@ package
org.apache.iotdb.commons.pipe.plugin.builtin.processor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index df8be6538a2..09a43c4ef76 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.commons.pipe.task.meta;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 454cf5c1788..9266bdf9a22 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -25,14 +25,14 @@ import
org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
index bb247981bf6..7916e1d92fb 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.collector;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionCollector;
import
org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionTsFileCollector;
@@ -29,12 +28,11 @@ import
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionFakeCol
import
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionLogCollector;
import
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionTsFileCollector;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
import org.apache.iotdb.pipe.api.PipeCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -44,12 +42,12 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG;
public class IoTDBDataRegionCollector implements PipeCollector {
@@ -57,31 +55,17 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
private final AtomicBoolean hasBeenStarted;
- private final PipeTaskMeta pipeTaskMeta;
- private final long creationTime;
- private final UnboundedBlockingPendingQueue<Event> collectorPendingQueue;
-
- // TODO: support pattern in historical collector
private PipeHistoricalDataRegionCollector historicalCollector;
private PipeRealtimeDataRegionCollector realtimeCollector;
private int dataRegionId;
- public IoTDBDataRegionCollector(
- PipeTaskMeta pipeTaskMeta,
- long creationTime,
- UnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
+ public IoTDBDataRegionCollector() {
this.hasBeenStarted = new AtomicBoolean(false);
-
- this.pipeTaskMeta = pipeTaskMeta;
- this.creationTime = creationTime;
- this.collectorPendingQueue = collectorPendingQueue;
}
@Override
public void validate(PipeParameterValidator validator) throws Exception {
- validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
-
// validate collector.history.enable and collector.realtime.enable
validator
.validateAttributeValueRange(
@@ -115,48 +99,34 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
private void constructHistoricalCollector(PipeParameters parameters) {
// enable historical collector by default
- historicalCollector =
- parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
- ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta,
Long.MIN_VALUE)
- // We define the realtime data as the data generated after the
creation time
- // of the pipe from user's perspective. But we still need to use
- // PipeHistoricalDataRegionCollector to collect the realtime data
generated between the
- // creation time of the pipe and the time when the pipe starts,
because those data
- // can not be listened by PipeRealtimeDataRegionCollector, and
should be collected by
- // PipeHistoricalDataRegionCollector from implementation
perspective.
- : new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta,
creationTime);
+ historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
}
private void constructRealtimeCollector(PipeParameters parameters) {
// enable realtime collector by default
if (!parameters.getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) {
- realtimeCollector = new
PipeRealtimeDataRegionFakeCollector(pipeTaskMeta);
+ realtimeCollector = new PipeRealtimeDataRegionFakeCollector();
return;
}
// use hybrid mode by default
if (!parameters.hasAttribute(COLLECTOR_REALTIME_MODE)) {
- realtimeCollector =
- new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta,
collectorPendingQueue);
+ realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
return;
}
switch (parameters.getString(COLLECTOR_REALTIME_MODE)) {
case COLLECTOR_REALTIME_MODE_FILE:
- realtimeCollector =
- new PipeRealtimeDataRegionTsFileCollector(pipeTaskMeta,
collectorPendingQueue);
+ realtimeCollector = new PipeRealtimeDataRegionTsFileCollector();
break;
case COLLECTOR_REALTIME_MODE_LOG:
- realtimeCollector =
- new PipeRealtimeDataRegionLogCollector(pipeTaskMeta,
collectorPendingQueue);
+ realtimeCollector = new PipeRealtimeDataRegionLogCollector();
break;
case COLLECTOR_REALTIME_MODE_HYBRID:
- realtimeCollector =
- new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta,
collectorPendingQueue);
+ realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
break;
default:
- realtimeCollector =
- new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta,
collectorPendingQueue);
+ realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
LOGGER.warn(
String.format(
"Unsupported collector realtime mode: %s, create a hybrid
collector.",
@@ -167,7 +137,7 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
@Override
public void customize(PipeParameters parameters,
PipeCollectorRuntimeConfiguration configuration)
throws Exception {
- dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
+ dataRegionId = ((PipeTaskCollectorRuntimeEnvironment)
configuration).getRegionId();
historicalCollector.customize(parameters, configuration);
realtimeCollector.customize(parameters, configuration);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index f39923fd67a..48b3a1c11ae 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -27,12 +27,12 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.utils.DateTimeUtils;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
@@ -44,51 +44,48 @@ import java.util.ArrayDeque;
import java.util.Queue;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
-import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_KEY;
public class PipeHistoricalDataRegionTsFileCollector extends
PipeHistoricalDataRegionCollector {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class);
- private final PipeTaskMeta pipeTaskMeta;
- private final ProgressIndex startIndex;
+ private PipeTaskMeta pipeTaskMeta;
+ private ProgressIndex startIndex;
private int dataRegionId;
private String pattern;
- private final long historicalDataCollectionTimeLowerBound; // arrival time
private long historicalDataCollectionStartTime; // event time
private long historicalDataCollectionEndTime; // event time
- private Queue<PipeTsFileInsertionEvent> pendingQueue;
+ private long historicalDataCollectionTimeLowerBound; // arrival time
- public PipeHistoricalDataRegionTsFileCollector(
- PipeTaskMeta pipeTaskMeta, long historicalDataCollectionTimeLowerBound) {
- this.pipeTaskMeta = pipeTaskMeta;
- this.startIndex = pipeTaskMeta.getProgressIndex();
+ private Queue<PipeTsFileInsertionEvent> pendingQueue;
- this.historicalDataCollectionTimeLowerBound =
historicalDataCollectionTimeLowerBound;
- }
+ public PipeHistoricalDataRegionTsFileCollector() {}
@Override
- public void validate(PipeParameterValidator validator) throws Exception {
- validator.validateRequiredAttribute(DATA_REGION_KEY);
- }
+ public void validate(PipeParameterValidator validator) {}
@Override
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration
configuration) {
- dataRegionId = parameters.getInt(DATA_REGION_KEY);
+ final PipeTaskCollectorRuntimeEnvironment environment =
+ (PipeTaskCollectorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
- pattern =
- parameters.getStringOrDefault(
- PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
- PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
+ pipeTaskMeta = environment.getPipeTaskMeta();
+ startIndex = environment.getPipeTaskMeta().getProgressIndex();
+
+ dataRegionId = environment.getRegionId();
+
+ pattern = parameters.getStringOrDefault(COLLECTOR_PATTERN_KEY,
COLLECTOR_PATTERN_DEFAULT_VALUE);
// user may set the COLLECTOR_HISTORY_START_TIME and
COLLECTOR_HISTORY_END_TIME without
// enabling the historical data collection, which may affect the realtime
data collection.
@@ -105,6 +102,18 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
parameters.getString(COLLECTOR_HISTORY_END_TIME),
ZoneId.systemDefault())
: Long.MAX_VALUE;
+ // enable historical collector by default
+ historicalDataCollectionTimeLowerBound =
+ parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
+ ? Long.MIN_VALUE
+ // We define the realtime data as the data generated after the
creation time
+ // of the pipe from user's perspective. But we still need to use
+ // PipeHistoricalDataRegionCollector to collect the realtime data
generated between the
+ // creation time of the pipe and the time when the pipe starts,
because those data
+ // can not be listened by PipeRealtimeDataRegionCollector, and
should be collected by
+ // PipeHistoricalDataRegionCollector from implementation
perspective.
+ : environment.getCreationTime();
+
// Only invoke flushDataRegionAllTsFiles() when the pipe runs in the
realtime only mode.
// realtime only mode -> (historicalDataCollectionTimeLowerBound !=
Long.MIN_VALUE)
//
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
index 658da872795..ab9be42b062 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -21,28 +21,24 @@ package org.apache.iotdb.db.pipe.collector.realtime;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.pipe.api.PipeCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
public abstract class PipeRealtimeDataRegionCollector implements PipeCollector
{
- protected final PipeTaskMeta pipeTaskMeta;
-
protected String pattern;
protected String dataRegionId;
+ protected PipeTaskMeta pipeTaskMeta;
- public PipeRealtimeDataRegionCollector(PipeTaskMeta pipeTaskMeta) {
- this.pipeTaskMeta = pipeTaskMeta;
- }
+ public PipeRealtimeDataRegionCollector() {}
@Override
- public void validate(PipeParameterValidator validator) throws Exception {
- validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
- }
+ public void validate(PipeParameterValidator validator) throws Exception {}
@Override
public void customize(PipeParameters parameters,
PipeCollectorRuntimeConfiguration configuration)
@@ -51,7 +47,11 @@ public abstract class PipeRealtimeDataRegionCollector
implements PipeCollector {
parameters.getStringOrDefault(
PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
- dataRegionId = parameters.getString(PipeCollectorConstant.DATA_REGION_KEY);
+
+ final PipeTaskCollectorRuntimeEnvironment environment =
+ (PipeTaskCollectorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
+ dataRegionId = String.valueOf(environment.getRegionId());
+ pipeTaskMeta = environment.getPipeTaskMeta();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
index bd0913132b5..fbc28c4dbca 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
@@ -19,19 +19,13 @@
package org.apache.iotdb.db.pipe.collector.realtime;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
public class PipeRealtimeDataRegionFakeCollector extends
PipeRealtimeDataRegionCollector {
-
- public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
- super(pipeTaskMeta);
- }
-
@Override
public void validate(PipeParameterValidator validator) {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 50349097d50..1551fc2a582 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.collector.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
@@ -33,7 +32,6 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// TODO: make this collector as a builtin pipe plugin. register it in
BuiltinPipePlugin.
public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegionCollector {
private static final Logger LOGGER =
@@ -43,10 +41,8 @@ public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegio
// supply() will poll events from this queue and send them to the next pipe
plugin.
private final UnboundedBlockingPendingQueue<Event> pendingQueue;
- public PipeRealtimeDataRegionHybridCollector(
- PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event>
pendingQueue) {
- super(pipeTaskMeta);
- this.pendingQueue = pendingQueue;
+ public PipeRealtimeDataRegionHybridCollector() {
+ this.pendingQueue = new UnboundedBlockingPendingQueue<>();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index 845637b5f85..42da6a5d6fc 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.collector.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
@@ -40,10 +39,8 @@ public class PipeRealtimeDataRegionLogCollector extends
PipeRealtimeDataRegionCo
// supply() will poll events from this queue and send them to the next pipe
plugin.
private final UnboundedBlockingPendingQueue<Event> pendingQueue;
- public PipeRealtimeDataRegionLogCollector(
- PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event>
pendingQueue) {
- super(pipeTaskMeta);
- this.pendingQueue = pendingQueue;
+ public PipeRealtimeDataRegionLogCollector() {
+ this.pendingQueue = new UnboundedBlockingPendingQueue<>();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index da0c1fb90c3..cd56caae708 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.collector.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
@@ -40,10 +39,8 @@ public class PipeRealtimeDataRegionTsFileCollector extends
PipeRealtimeDataRegio
// supply() will poll events from this queue and send them to the next pipe
plugin.
private final UnboundedBlockingPendingQueue<Event> pendingQueue;
- public PipeRealtimeDataRegionTsFileCollector(
- PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event>
pendingQueue) {
- super(pipeTaskMeta);
- this.pendingQueue = pendingQueue;
+ public PipeRealtimeDataRegionTsFileCollector() {
+ this.pendingQueue = new UnboundedBlockingPendingQueue<>();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
similarity index 94%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
index 105d45f8484..a98bac82bdd 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.config.constant;
public class PipeCollectorConstant {
@@ -26,8 +26,6 @@ public class PipeCollectorConstant {
public static final String COLLECTOR_PATTERN_KEY = "collector.pattern";
public static final String COLLECTOR_PATTERN_DEFAULT_VALUE = "root";
- public static final String DATA_REGION_KEY = "collector.data-region";
-
public static final String COLLECTOR_HISTORY_ENABLE_KEY =
"collector.history.enable";
public static final String COLLECTOR_HISTORY_START_TIME =
"collector.history.start-time";
public static final String COLLECTOR_HISTORY_END_TIME =
"collector.history.end-time";
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
similarity index 76%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 7b88c56ef20..7c92206de81 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.config.constant;
public class PipeConnectorConstant {
@@ -26,6 +26,11 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
+ public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
+ public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
+ public static final String CONNECTOR_IOTDB_PASSWORD_KEY =
"connector.password";
+ public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
+
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
similarity index 95%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
index 1af34f3ef31..8caa87c03ec 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.config.constant;
public class PipeProcessorConstant {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
new file mode 100644
index 00000000000..29810c46960
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.config.plugin.configuraion;
+
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
+
+public class PipeTaskRuntimeConfiguration
+ implements PipeCollectorRuntimeConfiguration,
+ PipeProcessorRuntimeConfiguration,
+ PipeConnectorRuntimeConfiguration {
+
+ private final PipeRuntimeEnvironment environment;
+
+ public PipeTaskRuntimeConfiguration(PipeRuntimeEnvironment environment) {
+ this.environment = environment;
+ }
+
+ @Override
+ public PipeRuntimeEnvironment getRuntimeEnvironment() {
+ return environment;
+ }
+}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java
similarity index 56%
rename from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java
index a0bbd230338..530c977b9eb 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java
@@ -17,19 +17,28 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.customizer.processor;
+package org.apache.iotdb.db.pipe.config.plugin.env;
-import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-/**
- * Used in {@link PipeProcessor#customize(PipeParameters,
PipeProcessorRuntimeConfiguration)} to
- * customize the runtime behavior of the PipeProcessor.
- */
-public class PipeProcessorRuntimeConfiguration implements
PipeRuntimeConfiguration {
+public class PipeTaskCollectorRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
+
+ private final int regionId;
+
+ private final PipeTaskMeta pipeTaskMeta;
+
+ public PipeTaskCollectorRuntimeEnvironment(
+ String pipeName, long creationTime, int regionId, PipeTaskMeta
pipeTaskMeta) {
+ super(pipeName, creationTime);
+ this.regionId = regionId;
+ this.pipeTaskMeta = pipeTaskMeta;
+ }
+
+ public int getRegionId() {
+ return regionId;
+ }
- @Override
- public void check() throws PipeException {}
+ public PipeTaskMeta getPipeTaskMeta() {
+ return pipeTaskMeta;
+ }
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
similarity index 59%
rename from
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
index e99d21be71f..a935dcf2dac 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
@@ -17,14 +17,27 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.customizer.collector;
+package org.apache.iotdb.db.pipe.config.plugin.env;
-import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.exception.PipeException;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
-// TODO: complete this class
-public class PipeCollectorRuntimeConfiguration implements
PipeRuntimeConfiguration {
+public class PipeTaskRuntimeEnvironment implements PipeRuntimeEnvironment {
+
+ private final String pipeName;
+ private final long creationTime;
+
+ public PipeTaskRuntimeEnvironment(String pipeName, long creationTime) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ }
+
+ @Override
+ public String getPipeName() {
+ return pipeName;
+ }
@Override
- public void check() throws PipeException {}
+ public long getCreationTime() {
+ return creationTime;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
new file mode 100644
index 00000000000..3099cf008cc
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.lagacy;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
+
+public class IoTDBSyncConnectorImplV1_1 implements PipeConnector {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSyncConnectorImplV1_1.class);
+
+ private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
+ public static final String IOTDB_SYNC_CONNECTOR_VERSION = "1.1";
+
+ private String ipAddress;
+ private int port;
+
+ private String user;
+ private String password;
+
+ private String pipeName;
+ private Long creationTime;
+
+ private IoTDBThriftConnectorClient client;
+
+ private static SessionPool sessionPool;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ validator
+ .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY)
+ .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY);
+ }
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
+ throws Exception {
+ this.ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
+ this.port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
+
+ this.user =
+ parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY,
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+ this.password =
+ parameters.getStringOrDefault(
+ CONNECTOR_IOTDB_PASSWORD_KEY,
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+
+ pipeName = configuration.getRuntimeEnvironment().getPipeName();
+ creationTime = configuration.getRuntimeEnvironment().getCreationTime();
+ }
+
+ @Override
+ public void handshake() throws Exception {
+ close();
+
+ client =
+ new IoTDBThriftConnectorClient(
+ new ThriftClientProperty.Builder()
+
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
+
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
+ .build(),
+ ipAddress,
+ port);
+
+ try {
+ final TSyncIdentityInfo identityInfo =
+ new TSyncIdentityInfo(
+ pipeName, creationTime, IOTDB_SYNC_CONNECTOR_VERSION,
IoTDBConstant.PATH_ROOT);
+ final TSStatus status = client.handshake(identityInfo);
+ if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ String errorMsg =
+ String.format(
+ "The receiver %s:%s rejected the pipe task because %s",
+ ipAddress, port, status.message);
+ LOGGER.warn(errorMsg);
+ throw new PipeRuntimeCriticalException(errorMsg);
+ }
+ } catch (TException e) {
+ LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress,
port), e);
+ throw new PipeConnectionException(e.getMessage(), e);
+ }
+
+ sessionPool =
+ new SessionPool.Builder()
+ .host(ipAddress)
+ .port(port)
+ .user(user)
+ .password(password)
+ .maxSize(1)
+ .build();
+ }
+
+ @Override
+ public void heartbeat() throws Exception {}
+
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
+ try {
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+ doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
+ } else {
+ throw new NotImplementedException(
+ "IoTDBSyncConnectorV1_1 only support PipeInsertNodeInsertionEvent
and PipeTabletInsertionEvent.");
+ }
+ } catch (TException e) {
+ LOGGER.error(
+ "Network error when transfer tablet insertion event: {}.",
tabletInsertionEvent, e);
+ // the connection may be broken, try to reconnect by catching
PipeConnectionException
+ throw new PipeConnectionException(
+ String.format(
+ "Network error when transfer tablet insertion event, because
%s.", e.getMessage()),
+ e);
+ }
+ }
+
+ private void doTransfer(PipeInsertNodeTabletInsertionEvent
pipeInsertNodeInsertionEvent)
+ throws IoTDBConnectionException, StatementExecutionException {
+ sessionPool.insertTablet(pipeInsertNodeInsertionEvent.convertToTablet());
+ }
+
+ private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
+ throws PipeException, TException, IoTDBConnectionException,
StatementExecutionException {
+ sessionPool.insertTablet(pipeTabletInsertionEvent.convertToTablet());
+ }
+
+ @Override
+ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
+ if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+ throw new NotImplementedException(
+ "IoTDBSyncConnectorV1_1 only support PipeTsFileInsertionEvent.");
+ }
+
+ try {
+ doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
+ } catch (TException e) {
+ LOGGER.error(
+ "Network error when transfer tsFile insertion event: {}.",
tsFileInsertionEvent, e);
+ // The connection may be broken, try to reconnect by catching
PipeConnectionException
+ throw new PipeConnectionException("Network error when transfer tsFile
insertion event.", e);
+ }
+ }
+
+ private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
+ throws PipeException, TException, InterruptedException, IOException {
+ pipeTsFileInsertionEvent.waitForTsFileClose();
+
+ final File tsFile = pipeTsFileInsertionEvent.getTsFile();
+ transportSingleFilePieceByPiece(tsFile);
+ client.sendPipeData(ByteBuffer.wrap(new TsFilePipeData("",
tsFile.getName(), -1).serialize()));
+ }
+
+ private void transportSingleFilePieceByPiece(File file) throws IOException {
+ // Cut the file into pieces to send
+ long position = 0;
+
+ // Try small piece to rebase the file position.
+ final byte[] buffer = new
byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()];
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
+ while (true) {
+ final int dataLength = randomAccessFile.read(buffer);
+ if (dataLength == -1) {
+ break;
+ }
+
+ final ByteBuffer buffToSend = ByteBuffer.wrap(buffer, 0, dataLength);
+ final TSyncTransportMetaInfo metaInfo =
+ new TSyncTransportMetaInfo(file.getName(), position);
+
+ final TSStatus status = client.sendFile(metaInfo, buffToSend);
+
+ if ((status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode())) {
+ // Success
+ position += dataLength;
+ } else if (status.code ==
TSStatusCode.SYNC_FILE_REDIRECTION_ERROR.getStatusCode()) {
+ position = Long.parseLong(status.message);
+ randomAccessFile.seek(position);
+ LOGGER.info(
+ String.format("Redirect to position %s in transferring tsFile
%s.", position, file));
+ } else if (status.code ==
TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) {
+ String errorMsg =
+ String.format("Network failed to receive tsFile %s, status: %s",
file, status);
+ LOGGER.warn(errorMsg);
+ throw new PipeConnectionException(errorMsg);
+ }
+ }
+ } catch (TException e) {
+ LOGGER.error(String.format("Cannot send pipe data to receiver %s:%s.",
ipAddress, port), e);
+ throw new PipeConnectionException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void transfer(Event event) throws Exception {
+ LOGGER.warn("IoTDBSyncConnectorV1_1 does not support transfer generic
event: {}.", event);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+ if (sessionPool != null) {
+ sessionPool.close();
+ sessionPool = null;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index 04719ad2f44..3e1e0db312c 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient;
import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
@@ -38,9 +37,9 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -59,6 +58,9 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+
public class IoTDBThriftConnectorV1 implements PipeConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftConnectorV1.class);
@@ -76,15 +78,15 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator
-
.validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY)
-
.validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY);
+ .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY)
+ .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY);
}
@Override
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
throws Exception {
- this.ipAddress =
parameters.getString(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY);
- this.port =
parameters.getInt(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY);
+ this.ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
+ this.port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
}
@Override
@@ -102,11 +104,16 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
ipAddress,
port);
- final TPipeTransferResp resp =
- client.pipeTransfer(
-
PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision()));
- if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(String.format("Handshake error, result status
%s.", resp.status));
+ try {
+ final TPipeTransferResp resp =
+ client.pipeTransfer(
+
PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision()));
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(String.format("Handshake error, result status
%s.", resp.status));
+ }
+ } catch (TException e) {
+ LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress,
port), e);
+ throw new PipeConnectionException(e.getMessage(), e);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 4b181f83549..27975236127 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
import org.apache.iotdb.pipe.api.event.Event;
import java.util.concurrent.atomic.AtomicInteger;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index a86842875d4..8a3f9ca8a77 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -139,6 +139,18 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
}
+ public Tablet convertToTablet() {
+ try {
+ if (dataContainer == null) {
+ dataContainer = new TabletInsertionDataContainer(getInsertNode(),
getPattern());
+ }
+ return dataContainer.convertToTablet();
+ } catch (Exception e) {
+ LOGGER.error("Convert to tablet error.", e);
+ throw new PipeException("Convert to tablet error.", e);
+ }
+ }
+
/////////////////////////// Object ///////////////////////////
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 8629eb52ba4..c8bc2fd55ab 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.event.common.tablet;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
index c01751ed824..70d21e85b3a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
@@ -19,14 +19,14 @@
package org.apache.iotdb.db.pipe.processor;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 80eb6c37ffa..cadb4746984 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -5,9 +5,9 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -26,7 +26,8 @@ public class PipeWALResourceManager implements AutoCloseable {
private final ScheduledFuture<?> ttlCheckerFuture;
public PipeWALResourceManager() {
- memtableIdToPipeWALResourceMap = new HashMap<>();
+ // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple
threads
+ memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>();
memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 7ad1bffa2a4..5a9e40ccfb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -45,20 +45,26 @@ public class PipeTaskBuilder {
// we first build the collector and connector, then build the processor.
final PipeTaskCollectorStage collectorStage =
new PipeTaskCollectorStage(
- dataRegionId,
- pipeTaskMeta,
+ pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- pipeStaticMeta.getCollectorParameters());
+ pipeStaticMeta.getCollectorParameters(),
+ dataRegionId,
+ pipeTaskMeta);
+
final PipeTaskConnectorStage connectorStage =
- new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters());
+ new PipeTaskConnectorStage(
+ pipeStaticMeta.getPipeName(),
+ pipeStaticMeta.getCreationTime(),
+ pipeStaticMeta.getConnectorParameters());
// the processor connects the collector and connector.
final PipeTaskProcessorStage processorStage =
new PipeTaskProcessorStage(
pipeStaticMeta.getPipeName(),
+ pipeStaticMeta.getCreationTime(),
+ pipeStaticMeta.getProcessorParameters(),
dataRegionId,
collectorStage.getEventSupplier(),
- pipeStaticMeta.getProcessorParameters(),
connectorStage.getPipeConnectorPendingQueue());
return new PipeTask(
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index ca1ec8fda9b..524b94eb604 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -24,64 +24,48 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.collector.IoTDBDataRegionCollector;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
-import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeCollector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import java.util.HashMap;
-
public class PipeTaskCollectorStage extends PipeTaskStage {
private final PipeCollector pipeCollector;
public PipeTaskCollectorStage(
- TConsensusGroupId dataRegionId,
- PipeTaskMeta pipeTaskMeta,
+ String pipeName,
long creationTime,
- PipeParameters collectorParameters) {
- PipeParameters localizedCollectorParameters;
-
+ PipeParameters collectorParameters,
+ TConsensusGroupId dataRegionId,
+ PipeTaskMeta pipeTaskMeta) {
// TODO: avoid if-else, use reflection to create collector all the time
- if (collectorParameters
- .getStringOrDefault(
- PipeCollectorConstant.COLLECTOR_KEY,
- BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
- .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())) {
- // we want to pass data region id to collector, so we need to create a
new collector
- // parameters and put data region id into it. we can't put data region
id into collector
- // parameters directly, because the given collector parameters may be
used by other pipe task.
- localizedCollectorParameters =
- new PipeParameters(new
HashMap<>(collectorParameters.getAttribute()));
- // set data region id to collector parameters, so that collector can get
data region id inside
- // collector
- localizedCollectorParameters
- .getAttribute()
- .put(PipeCollectorConstant.DATA_REGION_KEY,
String.valueOf(dataRegionId.getId()));
-
- this.pipeCollector =
- new IoTDBDataRegionCollector(
- pipeTaskMeta, creationTime, new
UnboundedBlockingPendingQueue<>());
- } else {
- localizedCollectorParameters = collectorParameters;
-
- this.pipeCollector =
PipeAgent.plugin().reflectCollector(localizedCollectorParameters);
- }
+ this.pipeCollector =
+ collectorParameters
+ .getStringOrDefault(
+ PipeCollectorConstant.COLLECTOR_KEY,
+ BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
+ .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
+ ? new IoTDBDataRegionCollector()
+ : PipeAgent.plugin().reflectCollector(collectorParameters);
// validate and customize should be called before createSubtask. this
allows collector exposing
// exceptions in advance.
try {
// 1. validate collector parameters
- pipeCollector.validate(new
PipeParameterValidator(localizedCollectorParameters));
+ pipeCollector.validate(new PipeParameterValidator(collectorParameters));
// 2. customize collector
final PipeCollectorRuntimeConfiguration runtimeConfiguration =
- new PipeCollectorRuntimeConfiguration();
- pipeCollector.customize(localizedCollectorParameters,
runtimeConfiguration);
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskCollectorRuntimeEnvironment(
+ pipeName, creationTime, dataRegionId.getId(), pipeTaskMeta));
+ pipeCollector.customize(collectorParameters, runtimeConfiguration);
} catch (Exception e) {
throw new PipeException(e.getMessage(), e);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 6b96097fd6f..ff4e954e614 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,10 +19,11 @@
package org.apache.iotdb.db.pipe.task.stage;
+import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtaskManager;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -32,13 +33,16 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
protected String connectorSubtaskId;
- public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
+ public PipeTaskConnectorStage(
+ String pipeName, long creationTime, PipeParameters
pipeConnectorParameters) {
this.pipeConnectorParameters = pipeConnectorParameters;
+
connectorSubtaskId =
PipeConnectorSubtaskManager.instance()
.register(
PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
- pipeConnectorParameters);
+ pipeConnectorParameters,
+ new PipeTaskRuntimeEnvironment(pipeName, creationTime));
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 02c6576ce94..60fbc3798df 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.pipe.task.stage;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
+import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.processor.PipeDoNothingProcessor;
@@ -31,37 +33,35 @@ import
org.apache.iotdb.db.pipe.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
public class PipeTaskProcessorStage extends PipeTaskStage {
- protected final PipeProcessorSubtaskExecutor executor =
+ private final PipeProcessorSubtaskExecutor executor =
PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
- protected final PipeParameters pipeProcessorParameters;
- protected final PipeProcessor pipeProcessor;
- protected final PipeProcessorSubtask pipeProcessorSubtask;
+ private final PipeProcessorSubtask pipeProcessorSubtask;
/**
* @param pipeName pipe name
+ * @param creationTime pipe creation time
+ * @param pipeProcessorParameters used to create pipe processor
* @param dataRegionId data region id
* @param pipeCollectorInputEventSupplier used to input events from pipe
collector
- * @param pipeProcessorParameters used to create pipe processor
* @param pipeConnectorOutputPendingQueue used to output events to pipe
connector
*/
public PipeTaskProcessorStage(
String pipeName,
+ long creationTime,
+ PipeParameters pipeProcessorParameters,
TConsensusGroupId dataRegionId,
EventSupplier pipeCollectorInputEventSupplier,
- PipeParameters pipeProcessorParameters,
BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
- this.pipeProcessorParameters = pipeProcessorParameters;
-
- pipeProcessor =
+ final PipeProcessor pipeProcessor =
pipeProcessorParameters
.getStringOrDefault(
PipeProcessorConstant.PROCESSOR_KEY,
@@ -69,6 +69,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
? new PipeDoNothingProcessor()
: PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
+
// validate and customize should be called before createSubtask. this
allows collector exposing
// exceptions in advance.
try {
@@ -77,7 +78,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
// 2. customize processor
final PipeProcessorRuntimeConfiguration runtimeConfiguration =
- new PipeProcessorRuntimeConfiguration();
+ new PipeTaskRuntimeConfiguration(new
PipeTaskRuntimeEnvironment(pipeName, creationTime));
pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
} catch (Exception e) {
throw new PipeException(e.getMessage(), e);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
index a6e338aae2e..fadb6a0184f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
@@ -22,14 +22,16 @@ package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
+import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import org.apache.iotdb.db.pipe.connector.lagacy.IoTDBSyncConnectorImplV1_1;
import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1;
import
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
-import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -43,7 +45,9 @@ public class PipeConnectorSubtaskManager {
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
public synchronized String register(
- PipeConnectorSubtaskExecutor executor, PipeParameters
pipeConnectorParameters) {
+ PipeConnectorSubtaskExecutor executor,
+ PipeParameters pipeConnectorParameters,
+ PipeRuntimeEnvironment pipeRuntimeEnvironment) {
final String attributeSortedString =
new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
@@ -51,20 +55,25 @@ public class PipeConnectorSubtaskManager {
// TODO: construct all PipeConnector with the same reflection method,
avoid using if-else
// 1. construct, validate and customize PipeConnector, and then
handshake (create connection)
// with the target
- final PipeConnector pipeConnector =
- pipeConnectorParameters
- .getStringOrDefault(
- PipeConnectorConstant.CONNECTOR_KEY,
-
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
-
.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
- ? new IoTDBThriftConnectorV1()
- : PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
+ final String connectorKey =
+ pipeConnectorParameters.getStringOrDefault(
+ PipeConnectorConstant.CONNECTOR_KEY,
+ BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
+
+ PipeConnector pipeConnector;
+ if
(connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()))
{
+ pipeConnector = new IoTDBThriftConnectorV1();
+ } else if (connectorKey.equals(
+ BuiltinPipePlugin.IOTDB_SYNC_CONNECTOR_V_1_1.getPipePluginName())) {
+ pipeConnector = new IoTDBSyncConnectorImplV1_1();
+ } else {
+ pipeConnector =
PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
+ }
+
try {
pipeConnector.validate(new
PipeParameterValidator(pipeConnectorParameters));
- final PipeConnectorRuntimeConfiguration runtimeConfiguration =
- new PipeConnectorRuntimeConfiguration();
- pipeConnector.customize(pipeConnectorParameters, runtimeConfiguration);
- // TODO: use runtimeConfiguration to configure PipeConnector
+ pipeConnector.customize(
+ pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
pipeConnector.handshake();
} catch (Exception e) {
throw new PipeException(
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
index ef8e2e67e3a..e0977828d75 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java
@@ -19,12 +19,13 @@
package org.apache.iotdb.db.pipe.collector;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
import
org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -64,38 +65,34 @@ public class CachedSchemaPatternMatcherTest {
@Test
public void testCachedMatcher() throws Exception {
- PipeRealtimeDataRegionCollector databaseCollector =
- new PipeRealtimeDataRegionFakeCollector(null);
+ PipeRealtimeDataRegionCollector databaseCollector = new
PipeRealtimeDataRegionFakeCollector();
databaseCollector.customize(
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root");
- put(PipeCollectorConstant.DATA_REGION_KEY, "1");
}
}),
- null);
+ new PipeTaskRuntimeConfiguration(new
PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
collectorList.add(databaseCollector);
int deviceCollectorNum = 10;
int seriesCollectorNum = 10;
for (int i = 0; i < deviceCollectorNum; i++) {
- PipeRealtimeDataRegionCollector deviceCollector =
- new PipeRealtimeDataRegionFakeCollector(null);
+ PipeRealtimeDataRegionCollector deviceCollector = new
PipeRealtimeDataRegionFakeCollector();
int finalI1 = i;
deviceCollector.customize(
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root." +
finalI1);
- put(PipeCollectorConstant.DATA_REGION_KEY, "1");
}
}),
- null);
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
collectorList.add(deviceCollector);
for (int j = 0; j < seriesCollectorNum; j++) {
- PipeRealtimeDataRegionCollector seriesCollector =
- new PipeRealtimeDataRegionFakeCollector(null);
+ PipeRealtimeDataRegionCollector seriesCollector = new
PipeRealtimeDataRegionFakeCollector();
int finalI = i;
int finalJ = j;
seriesCollector.customize(
@@ -105,10 +102,10 @@ public class CachedSchemaPatternMatcherTest {
put(
PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
"root." + finalI + "." + finalJ);
- put(PipeCollectorConstant.DATA_REGION_KEY, "1");
}
}),
- null);
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null)));
collectorList.add(seriesCollector);
}
}
@@ -152,9 +149,7 @@ public class CachedSchemaPatternMatcherTest {
public static class PipeRealtimeDataRegionFakeCollector extends
PipeRealtimeDataRegionCollector {
- public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
- super(pipeTaskMeta);
- }
+ public PipeRealtimeDataRegionFakeCollector() {}
@Override
public Event supply() {
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
index 202c1527bd2..1b12e5822f9 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java
@@ -28,10 +28,11 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector;
import
org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import
org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant;
+import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -98,55 +99,58 @@ public class PipeRealtimeCollectTest {
}
@Test
- public void testRealtimeCollectProcess() throws ExecutionException,
InterruptedException {
+ public void testRealtimeCollectProcess() {
// set up realtime collector
try (PipeRealtimeDataRegionHybridCollector collector1 =
- new PipeRealtimeDataRegionHybridCollector(null, new
UnboundedBlockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector();
PipeRealtimeDataRegionHybridCollector collector2 =
- new PipeRealtimeDataRegionHybridCollector(null, new
UnboundedBlockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector();
PipeRealtimeDataRegionHybridCollector collector3 =
- new PipeRealtimeDataRegionHybridCollector(null, new
UnboundedBlockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector();
PipeRealtimeDataRegionHybridCollector collector4 =
- new PipeRealtimeDataRegionHybridCollector(
- null, new UnboundedBlockingPendingQueue<>())) {
+ new PipeRealtimeDataRegionHybridCollector()) {
collector1.customize(
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
- put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
}
}),
- null);
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskCollectorRuntimeEnvironment(
+ "1", 1, Integer.parseInt(dataRegion1), null)));
collector2.customize(
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
- put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
}
}),
- null);
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskCollectorRuntimeEnvironment(
+ "1", 1, Integer.parseInt(dataRegion1), null)));
collector3.customize(
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
- put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
}
}),
- null);
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskCollectorRuntimeEnvironment(
+ "1", 1, Integer.parseInt(dataRegion2), null)));
collector4.customize(
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
- put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
}
}),
- null);
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskCollectorRuntimeEnvironment(
+ "1", 1, Integer.parseInt(dataRegion2), null)));
PipeRealtimeDataRegionCollector[] collectors =
new PipeRealtimeDataRegionCollector[] {collector1, collector2,
collector3, collector4};