This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch throw-exception-connector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 91529162ed81570bb6330feaa9ba969367b1fea2 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Mar 18 20:31:59 2024 +0800 ThrowingExceptionProcessor --- .../PipeDataRegionProcessorConstructor.java | 4 + .../pipe/plugin/builtin/BuiltinPipePlugin.java | 3 + .../throwing/ThrowingExceptionProcessor.java | 99 ++++++++++++++++++++++ 3 files changed, 106 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index ba49f55ff32..8c926dd06e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion; import org.apache.iotdb.commons.pipe.agent.plugin.PipeProcessorConstructor; import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor; +import org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper; import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor; import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor; @@ -42,5 +43,8 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { pluginConstructors.put( BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(), SwingingDoorTrendingSamplingProcessor::new); + pluginConstructors.put( + BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(), + ThrowingExceptionProcessor::new); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index c1ae3a75f2b..8f309e4285f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -34,6 +34,7 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtract import org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor; +import org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; import java.util.Arrays; import java.util.Collections; @@ -54,6 +55,7 @@ public enum BuiltinPipePlugin { TUMBLING_TIME_SAMPLING_PROCESSOR( "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class), SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class), + THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class), // connectors DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class), @@ -113,6 +115,7 @@ public enum BuiltinPipePlugin { // Processors TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), + THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(), // Connectors DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java new file mode 100644 index 00000000000..69b75be47f8 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/throwing/ThrowingExceptionProcessor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing; + +import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.collector.EventCollector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +public class ThrowingExceptionProcessor implements PipeProcessor { + + private boolean throwInCustomize = false; + private boolean throwInProcessTabletInsertionEvent = false; + private boolean throwInProcessTsFileInsertionEvent = false; + private boolean throwInProcessEvent = false; + private boolean throwInClose = false; + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + final Set<String> throwingStages = + Arrays.stream( + validator.getParameters().getStringOrDefault("stages", "").toLowerCase().split(",")) + .collect(Collectors.toSet()); + + final boolean throwInValidate = throwingStages.contains("validate"); + if (throwInValidate) { + throw new Exception("Throwing exception in validate"); + } + + throwInCustomize = throwingStages.contains("customize"); + throwInProcessTabletInsertionEvent = throwingStages.contains("process-tablet-insertion-event"); + throwInProcessTsFileInsertionEvent = throwingStages.contains("process-tsfile-insertion-event"); + throwInProcessEvent = throwingStages.contains("process-event"); + throwInClose = throwingStages.contains("close"); + } + + @Override + public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) + throws Exception { + if (throwInCustomize) { + throw new Exception("Throwing exception in customize"); + } + } + + @Override + public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) + throws Exception { + if (throwInProcessTabletInsertionEvent) { + throw new Exception("Throwing exception in process(TabletInsertionEvent, EventCollector)"); + } + } + + @Override + public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) + throws Exception { + if (throwInProcessTsFileInsertionEvent) { + throw new Exception("Throwing exception in process(TsFileInsertionEvent, EventCollector)"); + } + } + + @Override + public void process(Event event, EventCollector eventCollector) throws Exception { + if (throwInProcessEvent) { + throw new Exception("Throwing exception in process(Event, EventCollector)"); + } + } + + @Override + public void close() throws Exception { + if (throwInClose) { + throw new Exception("Throwing exception in close"); + } + } +}
