This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5940-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8c9206d301098e66c9bc020fa6a71b5e217d6641 Author: Caideyipi <[email protected]> AuthorDate: Sun Jun 11 01:23:58 2023 +0800 [IOTDB-5940] Pipe: support 1.2 -> 1.1 sync connector (#10107) Co-authored-by: yschengzi <[email protected]> Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit 904a2a52695dc0f62071cb1b723a4c2488f8bc48) --- .../persistence/pipe/PipePluginInfo.java | 8 +- .../org/apache/iotdb/pipe/api/PipeCollector.java | 7 +- .../org/apache/iotdb/pipe/api/PipeConnector.java | 6 +- .../org/apache/iotdb/pipe/api/PipeProcessor.java | 6 +- .../iotdb/pipe/api/customizer/PipeStrategy.java | 28 --- .../PipeCollectorRuntimeConfiguration.java} | 10 +- .../PipeConnectorRuntimeConfiguration.java} | 6 +- .../PipeProcessorRuntimeConfiguration.java} | 6 +- .../PipeRuntimeConfiguration.java} | 7 +- .../PipeRuntimeEnvironment.java} | 9 +- .../PipeConnectorRuntimeConfiguration.java | 84 ------- .../retry/EqualRetryIntervalStrategy.java | 63 ----- .../retry/ExponentialRetryIntervalStrategy.java | 70 ------ .../customizer/connector/retry/RetryStrategy.java | 34 --- .../{ => parameter}/PipeParameterValidator.java | 2 +- .../customizer/{ => parameter}/PipeParameters.java | 6 +- .../pipe/plugin/builtin/BuiltinPipePlugin.java | 4 +- .../plugin/builtin/collector/IoTDBCollector.java | 6 +- .../builtin/connector/DoNothingConnector.java | 6 +- ...tConnector.java => IoTDBSyncConnectorV1_1.java} | 16 +- .../builtin/connector/IoTDBThriftConnector.java | 6 +- .../builtin/processor/DoNothingProcessor.java | 6 +- .../commons/pipe/task/meta/PipeStaticMeta.java | 2 +- .../db/pipe/agent/plugin/PipePluginAgent.java | 8 +- .../pipe/collector/IoTDBDataRegionCollector.java | 68 ++---- .../PipeHistoricalDataRegionTsFileCollector.java | 61 +++-- .../realtime/PipeRealtimeDataRegionCollector.java | 26 +- .../PipeRealtimeDataRegionFakeCollector.java | 12 +- .../PipeRealtimeDataRegionHybridCollector.java | 8 +- .../PipeRealtimeDataRegionLogCollector.java | 7 +- .../PipeRealtimeDataRegionTsFileCollector.java | 7 +- .../{ => constant}/PipeCollectorConstant.java | 4 +- .../{ => constant}/PipeConnectorConstant.java | 7 +- .../{ => constant}/PipeProcessorConstant.java | 2 +- .../configuraion/PipeTaskRuntimeConfiguration.java | 42 ++++ .../env/PipeTaskCollectorRuntimeEnvironment.java | 33 ++- .../plugin/env/PipeTaskRuntimeEnvironment.java | 25 +- .../lagacy/IoTDBSyncConnectorImplV1_1.java | 268 +++++++++++++++++++++ .../pipe/connector/v1/IoTDBThriftConnectorV1.java | 33 ++- .../apache/iotdb/db/pipe/event/EnrichedEvent.java | 2 +- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 12 + .../common/tablet/PipeRawTabletInsertionEvent.java | 2 +- .../db/pipe/processor/PipeDoNothingProcessor.java | 8 +- .../pipe/resource/wal/PipeWALResourceManager.java | 5 +- .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 16 +- .../db/pipe/task/stage/PipeTaskCollectorStage.java | 62 ++--- .../db/pipe/task/stage/PipeTaskConnectorStage.java | 10 +- .../db/pipe/task/stage/PipeTaskProcessorStage.java | 29 +-- .../task/subtask/PipeConnectorSubtaskManager.java | 43 ++-- .../collector/CachedSchemaPatternMatcherTest.java | 31 +-- .../db/pipe/collector/PipeRealtimeCollectTest.java | 38 +-- 51 files changed, 652 insertions(+), 615 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 22f0cb97943..784bf7df744 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -34,10 +34,10 @@ import org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl import org.apache.iotdb.confignode.consensus.response.udf.JarResp; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.consensus.common.DataSet; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; -import org.apache.iotdb.db.pipe.config.PipeConnectorConstant; -import org.apache.iotdb.db.pipe.config.PipeProcessorConstant; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; +import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java index 724c1595571..8771febead4 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java @@ -19,9 +19,9 @@ package org.apache.iotdb.pipe.api; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; /** @@ -48,7 +48,6 @@ import org.apache.iotdb.pipe.api.event.Event; * cancelled (the `DROP PIPE` command is executed). * </ul> */ -// TODO: support event lifecycle management public interface PipeCollector extends PipePlugin { /** diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java index 6d74847e763..083590c1aef 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java @@ -19,9 +19,9 @@ package org.apache.iotdb.pipe.api; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java index e94384d5f23..3c35e8b9e13 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java @@ -20,9 +20,9 @@ package org.apache.iotdb.pipe.api; import org.apache.iotdb.pipe.api.collector.EventCollector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java deleted file mode 100644 index 433ccc83512..00000000000 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeStrategy.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.pipe.api.customizer; - -import org.apache.iotdb.pipe.api.exception.PipeStrategyNotValidException; - -public interface PipeStrategy { - - /** @throws PipeStrategyNotValidException if invalid strategy is set */ - void check() throws PipeStrategyNotValidException; -} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java similarity index 75% rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java rename to iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java index c75f85bb7a7..071bfeb60c6 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeRuntimeConfiguration.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeCollectorRuntimeConfiguration.java @@ -17,12 +17,6 @@ * under the License. */ -package org.apache.iotdb.pipe.api.customizer; +package org.apache.iotdb.pipe.api.customizer.configuration; -import org.apache.iotdb.pipe.api.exception.PipeException; - -public interface PipeRuntimeConfiguration { - - /** @throws PipeException if invalid runtime configuration is set */ - void check() throws PipeException; -} +public interface PipeCollectorRuntimeConfiguration extends PipeRuntimeConfiguration {} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeConnectorRuntimeConfiguration.java similarity index 82% copy from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java copy to iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeConnectorRuntimeConfiguration.java index a103b1db97a..4fc6813bef8 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeConnectorRuntimeConfiguration.java @@ -17,8 +17,6 @@ * under the License. */ -package org.apache.iotdb.pipe.api.customizer.connector.reuse; +package org.apache.iotdb.pipe.api.customizer.configuration; -import org.apache.iotdb.pipe.api.customizer.PipeStrategy; - -public interface ReuseStrategy extends PipeStrategy {} +public interface PipeConnectorRuntimeConfiguration extends PipeRuntimeConfiguration {} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeProcessorRuntimeConfiguration.java similarity index 82% copy from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java copy to iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeProcessorRuntimeConfiguration.java index a103b1db97a..04c6f89ddbc 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeProcessorRuntimeConfiguration.java @@ -17,8 +17,6 @@ * under the License. */ -package org.apache.iotdb.pipe.api.customizer.connector.reuse; +package org.apache.iotdb.pipe.api.customizer.configuration; -import org.apache.iotdb.pipe.api.customizer.PipeStrategy; - -public interface ReuseStrategy extends PipeStrategy {} +public interface PipeProcessorRuntimeConfiguration extends PipeRuntimeConfiguration {} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeConfiguration.java similarity index 82% rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java rename to iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeConfiguration.java index a103b1db97a..827d9f93ae7 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/reuse/ReuseStrategy.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeConfiguration.java @@ -17,8 +17,9 @@ * under the License. */ -package org.apache.iotdb.pipe.api.customizer.connector.reuse; +package org.apache.iotdb.pipe.api.customizer.configuration; -import org.apache.iotdb.pipe.api.customizer.PipeStrategy; +public interface PipeRuntimeConfiguration { -public interface ReuseStrategy extends PipeStrategy {} + PipeRuntimeEnvironment getRuntimeEnvironment(); +} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java similarity index 81% rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java rename to iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java index 3df93c6c5cd..455d293dccc 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/parallel/ParallelStrategy.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java @@ -17,8 +17,11 @@ * under the License. */ -package org.apache.iotdb.pipe.api.customizer.connector.parallel; +package org.apache.iotdb.pipe.api.customizer.configuration; -import org.apache.iotdb.pipe.api.customizer.PipeStrategy; +public interface PipeRuntimeEnvironment { -public interface ParallelStrategy extends PipeStrategy {} + String getPipeName(); + + long getCreationTime(); +} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java deleted file mode 100644 index 08417800108..00000000000 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/PipeConnectorRuntimeConfiguration.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.pipe.api.customizer.connector; - -import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration; -import org.apache.iotdb.pipe.api.customizer.connector.parallel.ParallelStrategy; -import org.apache.iotdb.pipe.api.customizer.connector.retry.RetryStrategy; -import org.apache.iotdb.pipe.api.customizer.connector.reuse.ReuseStrategy; -import org.apache.iotdb.pipe.api.exception.PipeException; - -/** - * Used in {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)} to customize - * the runtime behavior of the PipeConnector. - * <p> - * Supports calling methods in a chain. - * <p> - * Sample code: - * <pre>{@code - * @Override - * public void beforeStart(PipeParameters params, PipeConnectorRuntimeConfiguration configs) { - * configs - * .reuseStrategy(X) - * .parallelStrategy(Y) - * .retryStrategy(Z); - * }</pre> - */ -public class PipeConnectorRuntimeConfiguration implements PipeRuntimeConfiguration { - - private ReuseStrategy reuseStrategy; - private ParallelStrategy parallelStrategy; - private RetryStrategy retryStrategy; - - public PipeConnectorRuntimeConfiguration reuseStrategy(ReuseStrategy reuseStrategy) { - this.reuseStrategy = reuseStrategy; - return this; - } - - public PipeConnectorRuntimeConfiguration parallelStrategy(ParallelStrategy parallelStrategy) { - this.parallelStrategy = parallelStrategy; - return this; - } - - public PipeConnectorRuntimeConfiguration retryStrategy(RetryStrategy retryStrategy) { - this.retryStrategy = retryStrategy; - return this; - } - - @Override - public void check() throws PipeException { - if (reuseStrategy == null) { - throw new PipeException("ReuseStrategy is not set!"); - } - reuseStrategy.check(); - - if (parallelStrategy == null) { - throw new PipeException("ParallelStrategy is not set!"); - } - parallelStrategy.check(); - - if (retryStrategy == null) { - throw new PipeException("RetryStrategy is not set!"); - } - retryStrategy.check(); - } -} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java deleted file mode 100644 index fe693cc6fac..00000000000 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/EqualRetryIntervalStrategy.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.pipe.api.customizer.connector.retry; - -import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.exception.PipeStrategyNotValidException; - -/** - * Used in {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}. When - * the PipeConnector fails to connect to the sink, it will try to reconnect by the specified - * strategy. - * - * <p>When PipeConnector is set to {@link EqualRetryIntervalStrategy}, the interval of waiting time - * for retrying will be the same. - * - * @see PipeConnector - * @see PipeConnectorRuntimeConfiguration - */ -public class EqualRetryIntervalStrategy implements RetryStrategy { - - private final int maxRetryTimes; - private final long retryInterval; - - /** - * @param maxRetryTimes maxRetryTimes > 0 - * @param retryInterval retryInterval > 0 - */ - public EqualRetryIntervalStrategy(int maxRetryTimes, long retryInterval) { - this.maxRetryTimes = maxRetryTimes; - this.retryInterval = retryInterval; - } - - @Override - public void check() { - if (maxRetryTimes <= 0) { - throw new PipeStrategyNotValidException( - String.format("Parameter maxRetryTimes(%d) should be greater than zero.", maxRetryTimes)); - } - if (retryInterval <= 0) { - throw new PipeStrategyNotValidException( - String.format("Parameter retryInterval(%d) should be greater than zero.", retryInterval)); - } - } -} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java deleted file mode 100644 index 797372158cd..00000000000 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/ExponentialRetryIntervalStrategy.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.pipe.api.customizer.connector.retry; - -import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; - -/** - * Used in {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}. When - * the PipeConnector fails to connect to the sink, it will try to reconnect by the specified - * strategy. - * - * <p>When PipeConnector is set to {@link ExponentialRetryIntervalStrategy}, the interval of waiting - * time for retrying will be increased exponentially each time. - * - * @see PipeConnector - * @see PipeConnectorRuntimeConfiguration - */ -public class ExponentialRetryIntervalStrategy implements RetryStrategy { - - private final int maxRetryTimes; - private final long initInterval; - private final double backOffFactor; - - /** - * @param maxRetryTimes maxRetryTimes > 0 - * @param initInterval retryInterval > 0 - * @param backOffFactor backOffFactor > 0 - */ - public ExponentialRetryIntervalStrategy( - int maxRetryTimes, long initInterval, double backOffFactor) { - this.maxRetryTimes = maxRetryTimes; - this.initInterval = initInterval; - this.backOffFactor = backOffFactor; - } - - @Override - public void check() { - if (maxRetryTimes <= 0) { - throw new RuntimeException( - String.format("Parameter maxRetryTimes(%d) should be greater than zero.", maxRetryTimes)); - } - if (initInterval <= 0) { - throw new RuntimeException( - String.format("Parameter retryInterval(%d) should be greater than zero.", initInterval)); - } - if (backOffFactor <= 0) { - throw new RuntimeException( - String.format("Parameter backOffFactor(%f) should be greater than zero.", backOffFactor)); - } - } -} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java deleted file mode 100644 index 27e9c08d1ff..00000000000 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/connector/retry/RetryStrategy.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.pipe.api.customizer.connector.retry; - -import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.PipeStrategy; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; - -/** - * Used to customize the strategy for reconnecting to sinks in {@link - * PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}. - * - * <p>When the PipeConnector fails to connect to the sink, it will try to reconnect by the specified - * strategy. - */ -public interface RetryStrategy extends PipeStrategy {} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java similarity index 98% rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java rename to iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java index 2ba9fffa612..a70151a97fe 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.pipe.api.customizer; +package org.apache.iotdb.pipe.api.customizer.parameter; import org.apache.iotdb.pipe.api.exception.PipeAttributeNotProvidedException; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java similarity index 94% rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java rename to iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index 1dd852d9b36..cfea33aa3b9 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -17,12 +17,12 @@ * under the License. */ -package org.apache.iotdb.pipe.api.customizer; +package org.apache.iotdb.pipe.api.customizer.parameter; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.PipeProcessor; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; import java.util.Map; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index d1a95bb45ba..8bf376138e3 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.plugin.builtin; import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnectorV1_1; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; @@ -34,7 +35,8 @@ public enum BuiltinPipePlugin { // connectors DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class), - IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class); + IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class), + IOTDB_SYNC_CONNECTOR_V_1_1("iotdb_sync_connector_v1.1", IoTDBSyncConnectorV1_1.class), ; private final String pipePluginName; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java index 9902cce4ca0..8d3f0276126 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java @@ -20,9 +20,9 @@ package org.apache.iotdb.commons.pipe.plugin.builtin.collector; import org.apache.iotdb.pipe.api.PipeCollector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; /** diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java index 2522fdc66f6..0e18d1ffea9 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java @@ -20,9 +20,9 @@ package org.apache.iotdb.commons.pipe.plugin.builtin.connector; import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java similarity index 81% copy from node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java copy to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java index 82ddd05ba3c..d994b096353 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java @@ -20,20 +20,20 @@ package org.apache.iotdb.commons.pipe.plugin.builtin.connector; import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; /** - * This class is a placeholder and should not be initialized. It represents the IoTDB Thrift - * connector. There is a real implementation in the server module but cannot be imported here. The - * pipe agent in the server module will replace this class with the real implementation when - * initializing the IoTDB Thrift connector. + * This class is a placeholder and should not be initialized. It represents the IoTDB Sync connector + * (for IoTDB v1.1). There is a real implementation in the server module but cannot be imported + * here. The pipe agent in the server module will replace this class with the real implementation + * when initializing the IoTDB Sync connector. */ -public class IoTDBThriftConnector implements PipeConnector { +public class IoTDBSyncConnectorV1_1 implements PipeConnector { @Override public void validate(PipeParameterValidator validator) { diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java index 82ddd05ba3c..315e1347a6d 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java @@ -20,9 +20,9 @@ package org.apache.iotdb.commons.pipe.plugin.builtin.connector; import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java index d083783a716..511414dde73 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java @@ -21,9 +21,9 @@ package org.apache.iotdb.commons.pipe.plugin.builtin.processor; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.collector.EventCollector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java index df8be6538a2..09a43c4ef76 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.commons.pipe.task.meta; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java index 454cf5c1788..9266bdf9a22 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java @@ -25,14 +25,14 @@ import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader; import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager; import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; -import org.apache.iotdb.db.pipe.config.PipeConnectorConstant; -import org.apache.iotdb.db.pipe.config.PipeProcessorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.pipe.api.PipeCollector; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.pipe.api.PipeProcessor; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; import org.slf4j.Logger; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java index bb247981bf6..7916e1d92fb 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.pipe.collector; import org.apache.iotdb.commons.consensus.DataRegionId; -import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionCollector; import org.apache.iotdb.db.pipe.collector.historical.PipeHistoricalDataRegionTsFileCollector; @@ -29,12 +28,11 @@ import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionFakeCol import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector; import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionLogCollector; import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionTsFileCollector; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; -import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.pipe.api.PipeCollector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -44,12 +42,12 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG; public class IoTDBDataRegionCollector implements PipeCollector { @@ -57,31 +55,17 @@ public class IoTDBDataRegionCollector implements PipeCollector { private final AtomicBoolean hasBeenStarted; - private final PipeTaskMeta pipeTaskMeta; - private final long creationTime; - private final UnboundedBlockingPendingQueue<Event> collectorPendingQueue; - - // TODO: support pattern in historical collector private PipeHistoricalDataRegionCollector historicalCollector; private PipeRealtimeDataRegionCollector realtimeCollector; private int dataRegionId; - public IoTDBDataRegionCollector( - PipeTaskMeta pipeTaskMeta, - long creationTime, - UnboundedBlockingPendingQueue<Event> collectorPendingQueue) { + public IoTDBDataRegionCollector() { this.hasBeenStarted = new AtomicBoolean(false); - - this.pipeTaskMeta = pipeTaskMeta; - this.creationTime = creationTime; - this.collectorPendingQueue = collectorPendingQueue; } @Override public void validate(PipeParameterValidator validator) throws Exception { - validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY); - // validate collector.history.enable and collector.realtime.enable validator .validateAttributeValueRange( @@ -115,48 +99,34 @@ public class IoTDBDataRegionCollector implements PipeCollector { private void constructHistoricalCollector(PipeParameters parameters) { // enable historical collector by default - historicalCollector = - parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true) - ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta, Long.MIN_VALUE) - // We define the realtime data as the data generated after the creation time - // of the pipe from user's perspective. But we still need to use - // PipeHistoricalDataRegionCollector to collect the realtime data generated between the - // creation time of the pipe and the time when the pipe starts, because those data - // can not be listened by PipeRealtimeDataRegionCollector, and should be collected by - // PipeHistoricalDataRegionCollector from implementation perspective. - : new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta, creationTime); + historicalCollector = new PipeHistoricalDataRegionTsFileCollector(); } private void constructRealtimeCollector(PipeParameters parameters) { // enable realtime collector by default if (!parameters.getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) { - realtimeCollector = new PipeRealtimeDataRegionFakeCollector(pipeTaskMeta); + realtimeCollector = new PipeRealtimeDataRegionFakeCollector(); return; } // use hybrid mode by default if (!parameters.hasAttribute(COLLECTOR_REALTIME_MODE)) { - realtimeCollector = - new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue); + realtimeCollector = new PipeRealtimeDataRegionHybridCollector(); return; } switch (parameters.getString(COLLECTOR_REALTIME_MODE)) { case COLLECTOR_REALTIME_MODE_FILE: - realtimeCollector = - new PipeRealtimeDataRegionTsFileCollector(pipeTaskMeta, collectorPendingQueue); + realtimeCollector = new PipeRealtimeDataRegionTsFileCollector(); break; case COLLECTOR_REALTIME_MODE_LOG: - realtimeCollector = - new PipeRealtimeDataRegionLogCollector(pipeTaskMeta, collectorPendingQueue); + realtimeCollector = new PipeRealtimeDataRegionLogCollector(); break; case COLLECTOR_REALTIME_MODE_HYBRID: - realtimeCollector = - new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue); + realtimeCollector = new PipeRealtimeDataRegionHybridCollector(); break; default: - realtimeCollector = - new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue); + realtimeCollector = new PipeRealtimeDataRegionHybridCollector(); LOGGER.warn( String.format( "Unsupported collector realtime mode: %s, create a hybrid collector.", @@ -167,7 +137,7 @@ public class IoTDBDataRegionCollector implements PipeCollector { @Override public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) throws Exception { - dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY); + dataRegionId = ((PipeTaskCollectorRuntimeEnvironment) configuration).getRegionId(); historicalCollector.customize(parameters, configuration); realtimeCollector.customize(parameters, configuration); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java index f39923fd67a..48b3a1c11ae 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java @@ -27,12 +27,12 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.utils.DateTimeUtils; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.slf4j.Logger; @@ -44,51 +44,48 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.stream.Collectors; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME; -import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_KEY; public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataRegionCollector { private static final Logger LOGGER = LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class); - private final PipeTaskMeta pipeTaskMeta; - private final ProgressIndex startIndex; + private PipeTaskMeta pipeTaskMeta; + private ProgressIndex startIndex; private int dataRegionId; private String pattern; - private final long historicalDataCollectionTimeLowerBound; // arrival time private long historicalDataCollectionStartTime; // event time private long historicalDataCollectionEndTime; // event time - private Queue<PipeTsFileInsertionEvent> pendingQueue; + private long historicalDataCollectionTimeLowerBound; // arrival time - public PipeHistoricalDataRegionTsFileCollector( - PipeTaskMeta pipeTaskMeta, long historicalDataCollectionTimeLowerBound) { - this.pipeTaskMeta = pipeTaskMeta; - this.startIndex = pipeTaskMeta.getProgressIndex(); + private Queue<PipeTsFileInsertionEvent> pendingQueue; - this.historicalDataCollectionTimeLowerBound = historicalDataCollectionTimeLowerBound; - } + public PipeHistoricalDataRegionTsFileCollector() {} @Override - public void validate(PipeParameterValidator validator) throws Exception { - validator.validateRequiredAttribute(DATA_REGION_KEY); - } + public void validate(PipeParameterValidator validator) {} @Override public void customize( PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) { - dataRegionId = parameters.getInt(DATA_REGION_KEY); + final PipeTaskCollectorRuntimeEnvironment environment = + (PipeTaskCollectorRuntimeEnvironment) configuration.getRuntimeEnvironment(); - pattern = - parameters.getStringOrDefault( - PipeCollectorConstant.COLLECTOR_PATTERN_KEY, - PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE); + pipeTaskMeta = environment.getPipeTaskMeta(); + startIndex = environment.getPipeTaskMeta().getProgressIndex(); + + dataRegionId = environment.getRegionId(); + + pattern = parameters.getStringOrDefault(COLLECTOR_PATTERN_KEY, COLLECTOR_PATTERN_DEFAULT_VALUE); // user may set the COLLECTOR_HISTORY_START_TIME and COLLECTOR_HISTORY_END_TIME without // enabling the historical data collection, which may affect the realtime data collection. @@ -105,6 +102,18 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR parameters.getString(COLLECTOR_HISTORY_END_TIME), ZoneId.systemDefault()) : Long.MAX_VALUE; + // enable historical collector by default + historicalDataCollectionTimeLowerBound = + parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true) + ? Long.MIN_VALUE + // We define the realtime data as the data generated after the creation time + // of the pipe from user's perspective. But we still need to use + // PipeHistoricalDataRegionCollector to collect the realtime data generated between the + // creation time of the pipe and the time when the pipe starts, because those data + // can not be listened by PipeRealtimeDataRegionCollector, and should be collected by + // PipeHistoricalDataRegionCollector from implementation perspective. + : environment.getCreationTime(); + // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the realtime only mode. // realtime only mode -> (historicalDataCollectionTimeLowerBound != Long.MIN_VALUE) // diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java index 658da872795..ab9be42b062 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java @@ -21,28 +21,24 @@ package org.apache.iotdb.db.pipe.collector.realtime; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; import org.apache.iotdb.pipe.api.PipeCollector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; public abstract class PipeRealtimeDataRegionCollector implements PipeCollector { - protected final PipeTaskMeta pipeTaskMeta; - protected String pattern; protected String dataRegionId; + protected PipeTaskMeta pipeTaskMeta; - public PipeRealtimeDataRegionCollector(PipeTaskMeta pipeTaskMeta) { - this.pipeTaskMeta = pipeTaskMeta; - } + public PipeRealtimeDataRegionCollector() {} @Override - public void validate(PipeParameterValidator validator) throws Exception { - validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY); - } + public void validate(PipeParameterValidator validator) throws Exception {} @Override public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) @@ -51,7 +47,11 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector { parameters.getStringOrDefault( PipeCollectorConstant.COLLECTOR_PATTERN_KEY, PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE); - dataRegionId = parameters.getString(PipeCollectorConstant.DATA_REGION_KEY); + + final PipeTaskCollectorRuntimeEnvironment environment = + (PipeTaskCollectorRuntimeEnvironment) configuration.getRuntimeEnvironment(); + dataRegionId = String.valueOf(environment.getRegionId()); + pipeTaskMeta = environment.getPipeTaskMeta(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java index bd0913132b5..fbc28c4dbca 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java @@ -19,19 +19,13 @@ package org.apache.iotdb.db.pipe.collector.realtime; -import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; public class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionCollector { - - public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) { - super(pipeTaskMeta); - } - @Override public void validate(PipeParameterValidator validator) {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java index 50349097d50..1551fc2a582 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.collector.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; @@ -33,7 +32,6 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO: make this collector as a builtin pipe plugin. register it in BuiltinPipePlugin. public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegionCollector { private static final Logger LOGGER = @@ -43,10 +41,8 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; - public PipeRealtimeDataRegionHybridCollector( - PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> pendingQueue) { - super(pipeTaskMeta); - this.pendingQueue = pendingQueue; + public PipeRealtimeDataRegionHybridCollector() { + this.pendingQueue = new UnboundedBlockingPendingQueue<>(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java index 845637b5f85..42da6a5d6fc 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.pipe.collector.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; -import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; @@ -40,10 +39,8 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; - public PipeRealtimeDataRegionLogCollector( - PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> pendingQueue) { - super(pipeTaskMeta); - this.pendingQueue = pendingQueue; + public PipeRealtimeDataRegionLogCollector() { + this.pendingQueue = new UnboundedBlockingPendingQueue<>(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java index da0c1fb90c3..cd56caae708 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.pipe.collector.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; -import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.collector.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; @@ -40,10 +39,8 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio // supply() will poll events from this queue and send them to the next pipe plugin. private final UnboundedBlockingPendingQueue<Event> pendingQueue; - public PipeRealtimeDataRegionTsFileCollector( - PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> pendingQueue) { - super(pipeTaskMeta); - this.pendingQueue = pendingQueue; + public PipeRealtimeDataRegionTsFileCollector() { + this.pendingQueue = new UnboundedBlockingPendingQueue<>(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java similarity index 94% rename from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java rename to server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java index 105d45f8484..a98bac82bdd 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeCollectorConstant.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.config; +package org.apache.iotdb.db.pipe.config.constant; public class PipeCollectorConstant { @@ -26,8 +26,6 @@ public class PipeCollectorConstant { public static final String COLLECTOR_PATTERN_KEY = "collector.pattern"; public static final String COLLECTOR_PATTERN_DEFAULT_VALUE = "root"; - public static final String DATA_REGION_KEY = "collector.data-region"; - public static final String COLLECTOR_HISTORY_ENABLE_KEY = "collector.history.enable"; public static final String COLLECTOR_HISTORY_START_TIME = "collector.history.start-time"; public static final String COLLECTOR_HISTORY_END_TIME = "collector.history.end-time"; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java similarity index 76% rename from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java rename to server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java index 7b88c56ef20..7c92206de81 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.config; +package org.apache.iotdb.db.pipe.config.constant; public class PipeConnectorConstant { @@ -26,6 +26,11 @@ public class PipeConnectorConstant { public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip"; public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port"; + public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user"; + public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root"; + public static final String CONNECTOR_IOTDB_PASSWORD_KEY = "connector.password"; + public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root"; + private PipeConnectorConstant() { throw new IllegalStateException("Utility class"); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java similarity index 95% rename from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java rename to server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java index 1af34f3ef31..8caa87c03ec 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.config; +package org.apache.iotdb.db.pipe.config.constant; public class PipeProcessorConstant { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java new file mode 100644 index 00000000000..29810c46960 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/configuraion/PipeTaskRuntimeConfiguration.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.config.plugin.configuraion; + +import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; + +public class PipeTaskRuntimeConfiguration + implements PipeCollectorRuntimeConfiguration, + PipeProcessorRuntimeConfiguration, + PipeConnectorRuntimeConfiguration { + + private final PipeRuntimeEnvironment environment; + + public PipeTaskRuntimeConfiguration(PipeRuntimeEnvironment environment) { + this.environment = environment; + } + + @Override + public PipeRuntimeEnvironment getRuntimeEnvironment() { + return environment; + } +} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java similarity index 56% rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java rename to server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java index a0bbd230338..530c977b9eb 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/processor/PipeProcessorRuntimeConfiguration.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskCollectorRuntimeEnvironment.java @@ -17,19 +17,28 @@ * under the License. */ -package org.apache.iotdb.pipe.api.customizer.processor; +package org.apache.iotdb.db.pipe.config.plugin.env; -import org.apache.iotdb.pipe.api.PipeProcessor; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration; -import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; -/** - * Used in {@link PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} to - * customize the runtime behavior of the PipeProcessor. - */ -public class PipeProcessorRuntimeConfiguration implements PipeRuntimeConfiguration { +public class PipeTaskCollectorRuntimeEnvironment extends PipeTaskRuntimeEnvironment { + + private final int regionId; + + private final PipeTaskMeta pipeTaskMeta; + + public PipeTaskCollectorRuntimeEnvironment( + String pipeName, long creationTime, int regionId, PipeTaskMeta pipeTaskMeta) { + super(pipeName, creationTime); + this.regionId = regionId; + this.pipeTaskMeta = pipeTaskMeta; + } + + public int getRegionId() { + return regionId; + } - @Override - public void check() throws PipeException {} + public PipeTaskMeta getPipeTaskMeta() { + return pipeTaskMeta; + } } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java similarity index 59% rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java rename to server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java index e99d21be71f..a935dcf2dac 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java @@ -17,14 +17,27 @@ * under the License. */ -package org.apache.iotdb.pipe.api.customizer.collector; +package org.apache.iotdb.db.pipe.config.plugin.env; -import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration; -import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; -// TODO: complete this class -public class PipeCollectorRuntimeConfiguration implements PipeRuntimeConfiguration { +public class PipeTaskRuntimeEnvironment implements PipeRuntimeEnvironment { + + private final String pipeName; + private final long creationTime; + + public PipeTaskRuntimeEnvironment(String pipeName, long creationTime) { + this.pipeName = pipeName; + this.creationTime = creationTime; + } + + @Override + public String getPipeName() { + return pipeName; + } @Override - public void check() throws PipeException {} + public long getCreationTime() { + return creationTime; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java new file mode 100644 index 00000000000..3099cf008cc --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.lagacy; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.sync.pipedata.TsFilePipeData; +import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; +import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; +import org.apache.iotdb.session.pool.SessionPool; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; + +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY; + +public class IoTDBSyncConnectorImplV1_1 implements PipeConnector { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncConnectorImplV1_1.class); + + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); + public static final String IOTDB_SYNC_CONNECTOR_VERSION = "1.1"; + + private String ipAddress; + private int port; + + private String user; + private String password; + + private String pipeName; + private Long creationTime; + + private IoTDBThriftConnectorClient client; + + private static SessionPool sessionPool; + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + validator + .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY) + .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY); + } + + @Override + public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) + throws Exception { + this.ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY); + this.port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY); + + this.user = + parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY, CONNECTOR_IOTDB_USER_DEFAULT_VALUE); + this.password = + parameters.getStringOrDefault( + CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); + + pipeName = configuration.getRuntimeEnvironment().getPipeName(); + creationTime = configuration.getRuntimeEnvironment().getCreationTime(); + } + + @Override + public void handshake() throws Exception { + close(); + + client = + new IoTDBThriftConnectorClient( + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled()) + .build(), + ipAddress, + port); + + try { + final TSyncIdentityInfo identityInfo = + new TSyncIdentityInfo( + pipeName, creationTime, IOTDB_SYNC_CONNECTOR_VERSION, IoTDBConstant.PATH_ROOT); + final TSStatus status = client.handshake(identityInfo); + if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + String errorMsg = + String.format( + "The receiver %s:%s rejected the pipe task because %s", + ipAddress, port, status.message); + LOGGER.warn(errorMsg); + throw new PipeRuntimeCriticalException(errorMsg); + } + } catch (TException e) { + LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress, port), e); + throw new PipeConnectionException(e.getMessage(), e); + } + + sessionPool = + new SessionPool.Builder() + .host(ipAddress) + .port(port) + .user(user) + .password(password) + .maxSize(1) + .build(); + } + + @Override + public void heartbeat() throws Exception {} + + @Override + public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception { + try { + if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { + doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent); + } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) { + doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent); + } else { + throw new NotImplementedException( + "IoTDBSyncConnectorV1_1 only support PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent."); + } + } catch (TException e) { + LOGGER.error( + "Network error when transfer tablet insertion event: {}.", tabletInsertionEvent, e); + // the connection may be broken, try to reconnect by catching PipeConnectionException + throw new PipeConnectionException( + String.format( + "Network error when transfer tablet insertion event, because %s.", e.getMessage()), + e); + } + } + + private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.insertTablet(pipeInsertNodeInsertionEvent.convertToTablet()); + } + + private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent) + throws PipeException, TException, IoTDBConnectionException, StatementExecutionException { + sessionPool.insertTablet(pipeTabletInsertionEvent.convertToTablet()); + } + + @Override + public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { + if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) { + throw new NotImplementedException( + "IoTDBSyncConnectorV1_1 only support PipeTsFileInsertionEvent."); + } + + try { + doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent); + } catch (TException e) { + LOGGER.error( + "Network error when transfer tsFile insertion event: {}.", tsFileInsertionEvent, e); + // The connection may be broken, try to reconnect by catching PipeConnectionException + throw new PipeConnectionException("Network error when transfer tsFile insertion event.", e); + } + } + + private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) + throws PipeException, TException, InterruptedException, IOException { + pipeTsFileInsertionEvent.waitForTsFileClose(); + + final File tsFile = pipeTsFileInsertionEvent.getTsFile(); + transportSingleFilePieceByPiece(tsFile); + client.sendPipeData(ByteBuffer.wrap(new TsFilePipeData("", tsFile.getName(), -1).serialize())); + } + + private void transportSingleFilePieceByPiece(File file) throws IOException { + // Cut the file into pieces to send + long position = 0; + + // Try small piece to rebase the file position. + final byte[] buffer = new byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()]; + try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + while (true) { + final int dataLength = randomAccessFile.read(buffer); + if (dataLength == -1) { + break; + } + + final ByteBuffer buffToSend = ByteBuffer.wrap(buffer, 0, dataLength); + final TSyncTransportMetaInfo metaInfo = + new TSyncTransportMetaInfo(file.getName(), position); + + final TSStatus status = client.sendFile(metaInfo, buffToSend); + + if ((status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode())) { + // Success + position += dataLength; + } else if (status.code == TSStatusCode.SYNC_FILE_REDIRECTION_ERROR.getStatusCode()) { + position = Long.parseLong(status.message); + randomAccessFile.seek(position); + LOGGER.info( + String.format("Redirect to position %s in transferring tsFile %s.", position, file)); + } else if (status.code == TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) { + String errorMsg = + String.format("Network failed to receive tsFile %s, status: %s", file, status); + LOGGER.warn(errorMsg); + throw new PipeConnectionException(errorMsg); + } + } + } catch (TException e) { + LOGGER.error(String.format("Cannot send pipe data to receiver %s:%s.", ipAddress, port), e); + throw new PipeConnectionException(e.getMessage(), e); + } + } + + @Override + public void transfer(Event event) throws Exception { + LOGGER.warn("IoTDBSyncConnectorV1_1 does not support transfer generic event: {}.", event); + } + + @Override + public void close() throws Exception { + if (client != null) { + client.close(); + client = null; + } + if (sessionPool != null) { + sessionPool.close(); + sessionPool = null; + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java index 04719ad2f44..3e1e0db312c 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.pipe.config.PipeConnectorConstant; import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient; import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq; @@ -38,9 +37,9 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -59,6 +58,9 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.util.Arrays; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY; + public class IoTDBThriftConnectorV1 implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV1.class); @@ -76,15 +78,15 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { @Override public void validate(PipeParameterValidator validator) throws Exception { validator - .validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY) - .validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY); + .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY) + .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY); } @Override public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) throws Exception { - this.ipAddress = parameters.getString(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY); - this.port = parameters.getInt(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY); + this.ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY); + this.port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY); } @Override @@ -102,11 +104,16 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { ipAddress, port); - final TPipeTransferResp resp = - client.pipeTransfer( - PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision())); - if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - throw new PipeException(String.format("Handshake error, result status %s.", resp.status)); + try { + final TPipeTransferResp resp = + client.pipeTransfer( + PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision())); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(String.format("Handshake error, result status %s.", resp.status)); + } + } catch (TException e) { + LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress, port), e); + throw new PipeConnectionException(e.getMessage(), e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index 4b181f83549..27975236127 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; import org.apache.iotdb.pipe.api.event.Event; import java.util.concurrent.atomic.AtomicInteger; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index a86842875d4..8a3f9ca8a77 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -139,6 +139,18 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } } + public Tablet convertToTablet() { + try { + if (dataContainer == null) { + dataContainer = new TabletInsertionDataContainer(getInsertNode(), getPattern()); + } + return dataContainer.convertToTablet(); + } catch (Exception e) { + LOGGER.error("Convert to tablet error.", e); + throw new PipeException("Convert to tablet error.", e); + } + } + /////////////////////////// Object /////////////////////////// @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 8629eb52ba4..c8bc2fd55ab 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java index c01751ed824..70d21e85b3a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java @@ -19,14 +19,14 @@ package org.apache.iotdb.db.pipe.processor; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; import org.apache.iotdb.db.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.collector.EventCollector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java index 80eb6c37ffa..cadb4746984 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java @@ -5,9 +5,9 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.db.wal.utils.WALEntryHandler; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -26,7 +26,8 @@ public class PipeWALResourceManager implements AutoCloseable { private final ScheduledFuture<?> ttlCheckerFuture; public PipeWALResourceManager() { - memtableIdToPipeWALResourceMap = new HashMap<>(); + // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple threads + memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>(); memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT]; for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java index 7ad1bffa2a4..5a9e40ccfb0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java @@ -45,20 +45,26 @@ public class PipeTaskBuilder { // we first build the collector and connector, then build the processor. final PipeTaskCollectorStage collectorStage = new PipeTaskCollectorStage( - dataRegionId, - pipeTaskMeta, + pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime(), - pipeStaticMeta.getCollectorParameters()); + pipeStaticMeta.getCollectorParameters(), + dataRegionId, + pipeTaskMeta); + final PipeTaskConnectorStage connectorStage = - new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters()); + new PipeTaskConnectorStage( + pipeStaticMeta.getPipeName(), + pipeStaticMeta.getCreationTime(), + pipeStaticMeta.getConnectorParameters()); // the processor connects the collector and connector. final PipeTaskProcessorStage processorStage = new PipeTaskProcessorStage( pipeStaticMeta.getPipeName(), + pipeStaticMeta.getCreationTime(), + pipeStaticMeta.getProcessorParameters(), dataRegionId, collectorStage.getEventSupplier(), - pipeStaticMeta.getProcessorParameters(), connectorStage.getPipeConnectorPendingQueue()); return new PipeTask( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java index ca1ec8fda9b..524b94eb604 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java @@ -24,64 +24,48 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.collector.IoTDBDataRegionCollector; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.pipe.task.connection.EventSupplier; -import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.pipe.api.PipeCollector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeCollectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; -import java.util.HashMap; - public class PipeTaskCollectorStage extends PipeTaskStage { private final PipeCollector pipeCollector; public PipeTaskCollectorStage( - TConsensusGroupId dataRegionId, - PipeTaskMeta pipeTaskMeta, + String pipeName, long creationTime, - PipeParameters collectorParameters) { - PipeParameters localizedCollectorParameters; - + PipeParameters collectorParameters, + TConsensusGroupId dataRegionId, + PipeTaskMeta pipeTaskMeta) { // TODO: avoid if-else, use reflection to create collector all the time - if (collectorParameters - .getStringOrDefault( - PipeCollectorConstant.COLLECTOR_KEY, - BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName()) - .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())) { - // we want to pass data region id to collector, so we need to create a new collector - // parameters and put data region id into it. we can't put data region id into collector - // parameters directly, because the given collector parameters may be used by other pipe task. - localizedCollectorParameters = - new PipeParameters(new HashMap<>(collectorParameters.getAttribute())); - // set data region id to collector parameters, so that collector can get data region id inside - // collector - localizedCollectorParameters - .getAttribute() - .put(PipeCollectorConstant.DATA_REGION_KEY, String.valueOf(dataRegionId.getId())); - - this.pipeCollector = - new IoTDBDataRegionCollector( - pipeTaskMeta, creationTime, new UnboundedBlockingPendingQueue<>()); - } else { - localizedCollectorParameters = collectorParameters; - - this.pipeCollector = PipeAgent.plugin().reflectCollector(localizedCollectorParameters); - } + this.pipeCollector = + collectorParameters + .getStringOrDefault( + PipeCollectorConstant.COLLECTOR_KEY, + BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName()) + .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName()) + ? new IoTDBDataRegionCollector() + : PipeAgent.plugin().reflectCollector(collectorParameters); // validate and customize should be called before createSubtask. this allows collector exposing // exceptions in advance. try { // 1. validate collector parameters - pipeCollector.validate(new PipeParameterValidator(localizedCollectorParameters)); + pipeCollector.validate(new PipeParameterValidator(collectorParameters)); // 2. customize collector final PipeCollectorRuntimeConfiguration runtimeConfiguration = - new PipeCollectorRuntimeConfiguration(); - pipeCollector.customize(localizedCollectorParameters, runtimeConfiguration); + new PipeTaskRuntimeConfiguration( + new PipeTaskCollectorRuntimeEnvironment( + pipeName, creationTime, dataRegionId.getId(), pipeTaskMeta)); + pipeCollector.customize(collectorParameters, runtimeConfiguration); } catch (Exception e) { throw new PipeException(e.getMessage(), e); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java index 6b96097fd6f..ff4e954e614 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java @@ -19,10 +19,11 @@ package org.apache.iotdb.db.pipe.task.stage; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment; import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager; import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue; import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtaskManager; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -32,13 +33,16 @@ public class PipeTaskConnectorStage extends PipeTaskStage { protected String connectorSubtaskId; - public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) { + public PipeTaskConnectorStage( + String pipeName, long creationTime, PipeParameters pipeConnectorParameters) { this.pipeConnectorParameters = pipeConnectorParameters; + connectorSubtaskId = PipeConnectorSubtaskManager.instance() .register( PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(), - pipeConnectorParameters); + pipeConnectorParameters, + new PipeTaskRuntimeEnvironment(pipeName, creationTime)); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java index 02c6576ce94..60fbc3798df 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java @@ -22,7 +22,9 @@ package org.apache.iotdb.db.pipe.task.stage; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.config.PipeProcessorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant; +import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment; import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor; import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager; import org.apache.iotdb.db.pipe.processor.PipeDoNothingProcessor; @@ -31,37 +33,35 @@ import org.apache.iotdb.db.pipe.task.connection.EventSupplier; import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask; import org.apache.iotdb.pipe.api.PipeProcessor; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; public class PipeTaskProcessorStage extends PipeTaskStage { - protected final PipeProcessorSubtaskExecutor executor = + private final PipeProcessorSubtaskExecutor executor = PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor(); - protected final PipeParameters pipeProcessorParameters; - protected final PipeProcessor pipeProcessor; - protected final PipeProcessorSubtask pipeProcessorSubtask; + private final PipeProcessorSubtask pipeProcessorSubtask; /** * @param pipeName pipe name + * @param creationTime pipe creation time + * @param pipeProcessorParameters used to create pipe processor * @param dataRegionId data region id * @param pipeCollectorInputEventSupplier used to input events from pipe collector - * @param pipeProcessorParameters used to create pipe processor * @param pipeConnectorOutputPendingQueue used to output events to pipe connector */ public PipeTaskProcessorStage( String pipeName, + long creationTime, + PipeParameters pipeProcessorParameters, TConsensusGroupId dataRegionId, EventSupplier pipeCollectorInputEventSupplier, - PipeParameters pipeProcessorParameters, BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) { - this.pipeProcessorParameters = pipeProcessorParameters; - - pipeProcessor = + final PipeProcessor pipeProcessor = pipeProcessorParameters .getStringOrDefault( PipeProcessorConstant.PROCESSOR_KEY, @@ -69,6 +69,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage { .equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()) ? new PipeDoNothingProcessor() : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters); + // validate and customize should be called before createSubtask. this allows collector exposing // exceptions in advance. try { @@ -77,7 +78,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage { // 2. customize processor final PipeProcessorRuntimeConfiguration runtimeConfiguration = - new PipeProcessorRuntimeConfiguration(); + new PipeTaskRuntimeConfiguration(new PipeTaskRuntimeEnvironment(pipeName, creationTime)); pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration); } catch (Exception e) { throw new PipeException(e.getMessage(), e); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java index a6e338aae2e..fadb6a0184f 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java @@ -22,14 +22,16 @@ package org.apache.iotdb.db.pipe.task.subtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.config.PipeConnectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; +import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; +import org.apache.iotdb.db.pipe.connector.lagacy.IoTDBSyncConnectorImplV1_1; import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1; import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor; import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue; import org.apache.iotdb.pipe.api.PipeConnector; -import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; -import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -43,7 +45,9 @@ public class PipeConnectorSubtaskManager { attributeSortedString2SubtaskLifeCycleMap = new HashMap<>(); public synchronized String register( - PipeConnectorSubtaskExecutor executor, PipeParameters pipeConnectorParameters) { + PipeConnectorSubtaskExecutor executor, + PipeParameters pipeConnectorParameters, + PipeRuntimeEnvironment pipeRuntimeEnvironment) { final String attributeSortedString = new TreeMap<>(pipeConnectorParameters.getAttribute()).toString(); @@ -51,20 +55,25 @@ public class PipeConnectorSubtaskManager { // TODO: construct all PipeConnector with the same reflection method, avoid using if-else // 1. construct, validate and customize PipeConnector, and then handshake (create connection) // with the target - final PipeConnector pipeConnector = - pipeConnectorParameters - .getStringOrDefault( - PipeConnectorConstant.CONNECTOR_KEY, - BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()) - .equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()) - ? new IoTDBThriftConnectorV1() - : PipeAgent.plugin().reflectConnector(pipeConnectorParameters); + final String connectorKey = + pipeConnectorParameters.getStringOrDefault( + PipeConnectorConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()); + + PipeConnector pipeConnector; + if (connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())) { + pipeConnector = new IoTDBThriftConnectorV1(); + } else if (connectorKey.equals( + BuiltinPipePlugin.IOTDB_SYNC_CONNECTOR_V_1_1.getPipePluginName())) { + pipeConnector = new IoTDBSyncConnectorImplV1_1(); + } else { + pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters); + } + try { pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters)); - final PipeConnectorRuntimeConfiguration runtimeConfiguration = - new PipeConnectorRuntimeConfiguration(); - pipeConnector.customize(pipeConnectorParameters, runtimeConfiguration); - // TODO: use runtimeConfiguration to configure PipeConnector + pipeConnector.customize( + pipeConnectorParameters, new PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment)); pipeConnector.handshake(); } catch (Exception e) { throw new PipeException( diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java index ef8e2e67e3a..e0977828d75 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/collector/CachedSchemaPatternMatcherTest.java @@ -19,12 +19,13 @@ package org.apache.iotdb.db.pipe.collector; -import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; import org.apache.iotdb.db.pipe.collector.realtime.matcher.CachedSchemaPatternMatcher; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeCollectEvent; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -64,38 +65,34 @@ public class CachedSchemaPatternMatcherTest { @Test public void testCachedMatcher() throws Exception { - PipeRealtimeDataRegionCollector databaseCollector = - new PipeRealtimeDataRegionFakeCollector(null); + PipeRealtimeDataRegionCollector databaseCollector = new PipeRealtimeDataRegionFakeCollector(); databaseCollector.customize( new PipeParameters( new HashMap<String, String>() { { put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root"); - put(PipeCollectorConstant.DATA_REGION_KEY, "1"); } }), - null); + new PipeTaskRuntimeConfiguration(new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null))); collectorList.add(databaseCollector); int deviceCollectorNum = 10; int seriesCollectorNum = 10; for (int i = 0; i < deviceCollectorNum; i++) { - PipeRealtimeDataRegionCollector deviceCollector = - new PipeRealtimeDataRegionFakeCollector(null); + PipeRealtimeDataRegionCollector deviceCollector = new PipeRealtimeDataRegionFakeCollector(); int finalI1 = i; deviceCollector.customize( new PipeParameters( new HashMap<String, String>() { { put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root." + finalI1); - put(PipeCollectorConstant.DATA_REGION_KEY, "1"); } }), - null); + new PipeTaskRuntimeConfiguration( + new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null))); collectorList.add(deviceCollector); for (int j = 0; j < seriesCollectorNum; j++) { - PipeRealtimeDataRegionCollector seriesCollector = - new PipeRealtimeDataRegionFakeCollector(null); + PipeRealtimeDataRegionCollector seriesCollector = new PipeRealtimeDataRegionFakeCollector(); int finalI = i; int finalJ = j; seriesCollector.customize( @@ -105,10 +102,10 @@ public class CachedSchemaPatternMatcherTest { put( PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root." + finalI + "." + finalJ); - put(PipeCollectorConstant.DATA_REGION_KEY, "1"); } }), - null); + new PipeTaskRuntimeConfiguration( + new PipeTaskCollectorRuntimeEnvironment("1", 1, 1, null))); collectorList.add(seriesCollector); } } @@ -152,9 +149,7 @@ public class CachedSchemaPatternMatcherTest { public static class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionCollector { - public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) { - super(pipeTaskMeta); - } + public PipeRealtimeDataRegionFakeCollector() {} @Override public Event supply() { diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java index 202c1527bd2..1b12e5822f9 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/collector/PipeRealtimeCollectTest.java @@ -28,10 +28,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionCollector; import org.apache.iotdb.db.pipe.collector.realtime.PipeRealtimeDataRegionHybridCollector; import org.apache.iotdb.db.pipe.collector.realtime.listener.PipeInsertionDataNodeListener; -import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; -import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant; +import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; +import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskCollectorRuntimeEnvironment; import org.apache.iotdb.db.wal.utils.WALEntryHandler; -import org.apache.iotdb.pipe.api.customizer.PipeParameters; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -98,55 +99,58 @@ public class PipeRealtimeCollectTest { } @Test - public void testRealtimeCollectProcess() throws ExecutionException, InterruptedException { + public void testRealtimeCollectProcess() { // set up realtime collector try (PipeRealtimeDataRegionHybridCollector collector1 = - new PipeRealtimeDataRegionHybridCollector(null, new UnboundedBlockingPendingQueue<>()); + new PipeRealtimeDataRegionHybridCollector(); PipeRealtimeDataRegionHybridCollector collector2 = - new PipeRealtimeDataRegionHybridCollector(null, new UnboundedBlockingPendingQueue<>()); + new PipeRealtimeDataRegionHybridCollector(); PipeRealtimeDataRegionHybridCollector collector3 = - new PipeRealtimeDataRegionHybridCollector(null, new UnboundedBlockingPendingQueue<>()); + new PipeRealtimeDataRegionHybridCollector(); PipeRealtimeDataRegionHybridCollector collector4 = - new PipeRealtimeDataRegionHybridCollector( - null, new UnboundedBlockingPendingQueue<>())) { + new PipeRealtimeDataRegionHybridCollector()) { collector1.customize( new PipeParameters( new HashMap<String, String>() { { put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1); - put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1); } }), - null); + new PipeTaskRuntimeConfiguration( + new PipeTaskCollectorRuntimeEnvironment( + "1", 1, Integer.parseInt(dataRegion1), null))); collector2.customize( new PipeParameters( new HashMap<String, String>() { { put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2); - put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1); } }), - null); + new PipeTaskRuntimeConfiguration( + new PipeTaskCollectorRuntimeEnvironment( + "1", 1, Integer.parseInt(dataRegion1), null))); collector3.customize( new PipeParameters( new HashMap<String, String>() { { put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1); - put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2); } }), - null); + new PipeTaskRuntimeConfiguration( + new PipeTaskCollectorRuntimeEnvironment( + "1", 1, Integer.parseInt(dataRegion2), null))); collector4.customize( new PipeParameters( new HashMap<String, String>() { { put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2); - put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2); } }), - null); + new PipeTaskRuntimeConfiguration( + new PipeTaskCollectorRuntimeEnvironment( + "1", 1, Integer.parseInt(dataRegion2), null))); PipeRealtimeDataRegionCollector[] collectors = new PipeRealtimeDataRegionCollector[] {collector1, collector2, collector3, collector4};
