This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-task-schedule in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c76809d64763f5dc91d41aedd21bb88279c210b9 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 22 10:44:29 2023 +0800 remove deletion event and introduce generic event to pipe engine --- .../builtin/connector/DoNothingConnector.java | 4 +- .../builtin/connector/IoTDBThriftConnector.java | 4 +- .../builtin/processor/DoNothingProcessor.java | 11 +++-- .../org/apache/iotdb/pipe/api/PipeConnector.java | 10 ++--- .../org/apache/iotdb/pipe/api/PipeProcessor.java | 10 ++--- .../iotdb/pipe/api/collector/EventCollector.java | 39 ++++-------------- .../pipe/api/event/dml/deletion/DeletionEvent.java | 48 ---------------------- .../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 6 +-- .../event/view/collector/PipeEventCollector.java | 19 +-------- .../db/pipe/task/subtask/PipeConnectorSubtask.java | 8 +--- .../db/pipe/task/subtask/PipeProcessorSubtask.java | 8 +--- 11 files changed, 35 insertions(+), 132 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java index 00f3b80424a..2522fdc66f6 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java @@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; +import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -61,7 +61,7 @@ public class DoNothingConnector implements PipeConnector { } @Override - public void transfer(DeletionEvent deletionEvent) { + public void transfer(Event event) { // do nothing } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java index e252e5be726..82ddd05ba3c 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java @@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; +import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -67,7 +67,7 @@ public class IoTDBThriftConnector implements PipeConnector { } @Override - public void transfer(DeletionEvent deletionEvent) { + public void transfer(Event event) { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java index 6a18f9a64e8..bc56a8bb3cf 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java @@ -24,7 +24,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; +import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -46,19 +46,18 @@ public class DoNothingProcessor implements PipeProcessor { @Override public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) throws IOException { - eventCollector.collectTabletInsertionEvent(tabletInsertionEvent); + eventCollector.collect(tabletInsertionEvent); } @Override public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws IOException { - eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent); + eventCollector.collect(tsFileInsertionEvent); } @Override - public void process(DeletionEvent deletionEvent, EventCollector eventCollector) - throws IOException { - eventCollector.collectDeletionEvent(deletionEvent); + public void process(Event event, EventCollector eventCollector) throws IOException { + eventCollector.collect(event); } @Override diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java index 5502de8bb37..6d74847e763 100644 --- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java +++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java @@ -22,7 +22,7 @@ package org.apache.iotdb.pipe.api; import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; +import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; @@ -52,7 +52,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException; * following 3 methods will be called: {@link * PipeConnector#transfer(TabletInsertionEvent)}, {@link * PipeConnector#transfer(TsFileInsertionEvent)} and {@link - * PipeConnector#transfer(DeletionEvent)}. + * PipeConnector#transfer(Event)}. * </ul> * <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link * PipeConnector#close() } method will be called. @@ -130,11 +130,11 @@ public interface PipeConnector extends PipePlugin { void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception; /** - * This method is used to transfer the DeletionEvent. + * This method is used to transfer the Event. * - * @param deletionEvent DeletionEvent to be transferred + * @param event Event to be transferred * @throws PipeConnectionException if the connection is broken * @throws Exception the user can throw errors if necessary */ - void transfer(DeletionEvent deletionEvent) throws Exception; + void transfer(Event event) throws Exception; } diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java index 16a5e81ba6c..e94384d5f23 100644 --- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java +++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java @@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; +import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -48,7 +48,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; * following 3 methods will be called: {@link * PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link * PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link - * PipeProcessor#process(DeletionEvent, EventCollector)}. + * PipeProcessor#process(Event, EventCollector)}. * <li>PipeConnector serializes the events into binaries and send them to sinks. * </ul> * <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link @@ -107,11 +107,11 @@ public interface PipeProcessor extends PipePlugin { throws Exception; /** - * This method is called to process the DeletionEvent. + * This method is called to process the Event. * - * @param deletionEvent DeletionEvent to be processed + * @param event Event to be processed * @param eventCollector used to collect result events after processing * @throws Exception the user can throw errors if necessary */ - void process(DeletionEvent deletionEvent, EventCollector eventCollector) throws Exception; + void process(Event event, EventCollector eventCollector) throws Exception; } diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java index 2e53693d65b..a9ef1f0aa62 100644 --- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java +++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java @@ -19,44 +19,21 @@ package org.apache.iotdb.pipe.api.collector; -import org.apache.iotdb.pipe.api.PipeProcessor; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; -import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.event.Event; import java.io.IOException; -/** - * Used to collect events generated by {@link PipeProcessor#process(TabletInsertionEvent, - * EventCollector)}, {@link PipeProcessor#process(TsFileInsertionEvent, EventCollector)} or {@link - * PipeProcessor#process(DeletionEvent, EventCollector)}. - */ +/** Used to collect events in pipe engine. */ public interface EventCollector { /** - * Collects an insertion event in form of TabletInsertionEvent. - * - * @param event TabletInsertionEvent to be collected - * @throws IOException if any I/O errors occur - * @see TabletInsertionEvent - */ - void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException; - - /** - * Collects an insertion event in form of TsFileInsertionEvent. - * - * @param event TsFileInsertionEvent to be collected - * @throws IOException if any I/O errors occur - * @see TsFileInsertionEvent - */ - void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException; - - /** - * Collects a deletion event. + * Collects a Event in pipe engine. * - * @param event DeletionEvent to be collected + * @param event Event to be collected * @throws IOException if any I/O errors occur - * @see DeletionEvent + * @see Event + * @see org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent + * @see org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent */ - void collectDeletionEvent(DeletionEvent event) throws IOException; + void collect(Event event) throws IOException; } diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java deleted file mode 100644 index d1fb966379c..00000000000 --- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.pipe.api.event.dml.deletion; - -import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.EventType; -import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.read.common.TimeRange; - -/** DeletionEvent is used to define the event of deletion. */ -public interface DeletionEvent extends Event { - - /** - * The method is used to get the path pattern of the deleted data. - * - * @return String - */ - Path getPath(); - - /** - * The method is used to get the time range of the deleted data. - * - * @return TimeRange - */ - TimeRange getTimeRange(); - - @Override - default EventType getType() { - return EventType.DELETION; - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java index 32408769579..6b414c03b36 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java @@ -38,7 +38,7 @@ import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; +import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; @@ -218,8 +218,8 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { } @Override - public void transfer(DeletionEvent deletionEvent) throws Exception { - throw new NotImplementedException("Not implement for deletion event."); + public void transfer(Event event) { + LOGGER.warn("IoTDBThriftConnectorV1 does not support transfer generic event: {}.", event); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java index a1f443a9d43..c38b7c28a67 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java @@ -23,9 +23,6 @@ import org.apache.iotdb.db.pipe.core.event.EnrichedEvent; import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; -import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import java.util.LinkedList; import java.util.Queue; @@ -48,21 +45,7 @@ public class PipeEventCollector implements EventCollector { } @Override - public void collectTabletInsertionEvent(TabletInsertionEvent event) { - collect(event); - } - - @Override - public void collectTsFileInsertionEvent(TsFileInsertionEvent event) { - collect(event); - } - - @Override - public void collectDeletionEvent(DeletionEvent event) { - collect(event); - } - - private synchronized void collect(Event event) { + public synchronized void collect(Event event) { if (event instanceof EnrichedEvent) { ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName()); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java index 80f366eb096..8cc0d2f4265 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java @@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; @@ -76,12 +75,9 @@ public class PipeConnectorSubtask extends PipeSubtask { case TSFILE_INSERTION: outputPipeConnector.transfer((TsFileInsertionEvent) event); break; - case DELETION: - outputPipeConnector.transfer((DeletionEvent) event); - break; default: - throw new UnsupportedOperationException( - "Unsupported event type: " + event.getClass().getName()); + outputPipeConnector.transfer(event); + break; } releaseLastEvent(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java index feb584fcaff..dad72977865 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java @@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.task.queue.EventSupplier; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -67,12 +66,9 @@ public class PipeProcessorSubtask extends PipeSubtask { case TSFILE_INSERTION: pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector); break; - case DELETION: - pipeProcessor.process((DeletionEvent) event, outputEventCollector); - break; default: - throw new UnsupportedOperationException( - "Unsupported event type: " + event.getClass().getName()); + pipeProcessor.process(event, outputEventCollector); + break; } releaseLastEvent();
