This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-task-schedule in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 414f0ffaa2791efed3535517680810dd5f4e7a9d Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 22 11:06:08 2023 +0800 remove getType() method of Event --- .../org/apache/iotdb/pipe/api/event/Event.java | 6 +-- .../event/dml/insertion/TabletInsertionEvent.java | 6 --- .../event/dml/insertion/TsFileInsertionEvent.java | 6 --- .../PipeRealtimeDataRegionHybridCollector.java | 49 +++++++++++----------- .../event/realtime/PipeRealtimeCollectEvent.java | 6 --- .../db/pipe/task/subtask/PipeConnectorSubtask.java | 16 +++---- .../db/pipe/task/subtask/PipeProcessorSubtask.java | 16 +++---- .../core/collector/PipeRealtimeCollectTest.java | 20 ++++----- 8 files changed, 47 insertions(+), 78 deletions(-) diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java index f5d8d2fbcd3..74ddf9e47d2 100644 --- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java +++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java @@ -20,8 +20,4 @@ package org.apache.iotdb.pipe.api.event; /** This interface is used to abstract events in collaboration tasks. */ -public interface Event { - - /** @return the type of the event */ - EventType getType(); -} +public interface Event {} diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java index e353bf397e0..9fd6d89428c 100644 --- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java +++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java @@ -22,7 +22,6 @@ package org.apache.iotdb.pipe.api.event.dml.insertion; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.EventType; import org.apache.iotdb.tsfile.write.record.Tablet; import java.util.Iterator; @@ -54,9 +53,4 @@ public interface TabletInsertionEvent extends Event { * RowCollector */ TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer); - - @Override - default EventType getType() { - return EventType.TABLET_INSERTION; - } } diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java index 2d5badc8072..325f0683cb7 100644 --- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java +++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java @@ -20,7 +20,6 @@ package org.apache.iotdb.pipe.api.event.dml.insertion; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.EventType; /** * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks, @@ -42,9 +41,4 @@ public interface TsFileInsertionEvent extends Event { * @return TsFileInsertionEvent */ TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable); - - @Override - default EventType getType() { - return EventType.TSFILE_INSERTION; - } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java index 67d461e8c8a..898b93cb22d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java @@ -25,6 +25,8 @@ import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent; import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch; import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue; import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException; import org.slf4j.Logger; @@ -48,18 +50,17 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio @Override public void collect(PipeRealtimeCollectEvent event) { - switch (event.getEvent().getType()) { - case TABLET_INSERTION: - collectTabletInsertion(event); - break; - case TSFILE_INSERTION: - collectTsFileInsertion(event); - break; - default: - throw new UnsupportedOperationException( - String.format( - "Unsupported event type %s for Hybrid Realtime Collector %s", - event.getEvent().getType(), this)); + final Event eventToCollect = event.getEvent(); + + if (eventToCollect instanceof TabletInsertionEvent) { + collectTabletInsertion(event); + } else if (eventToCollect instanceof TsFileInsertionEvent) { + collectTsFileInsertion(event); + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported event type %s for Hybrid Realtime Collector %s", + eventToCollect.getClass(), this)); } } @@ -107,18 +108,18 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio while (collectEvent != null) { Event suppliedEvent; - switch (collectEvent.getEvent().getType()) { - case TABLET_INSERTION: - suppliedEvent = supplyTabletInsertion(collectEvent); - break; - case TSFILE_INSERTION: - suppliedEvent = supplyTsFileInsertion(collectEvent); - break; - default: - throw new UnsupportedOperationException( - String.format( - "Unsupported event type %s for Hybrid Realtime Collector %s", - collectEvent.getEvent().getType(), this)); + + // used to judge type of event, not directly for supplying. + final Event eventToSupply = collectEvent.getEvent(); + if (eventToSupply instanceof TabletInsertionEvent) { + suppliedEvent = supplyTabletInsertion(collectEvent); + } else if (eventToSupply instanceof TsFileInsertionEvent) { + suppliedEvent = supplyTsFileInsertion(collectEvent); + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported event type %s for Hybrid Realtime Collector %s", + eventToSupply.getClass(), this)); } collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName()); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java index 63fcc891361..0c63131e6e9 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.realtime; import org.apache.iotdb.db.pipe.core.event.EnrichedEvent; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.EventType; import java.util.Map; @@ -55,11 +54,6 @@ public class PipeRealtimeCollectEvent implements Event, EnrichedEvent { device2Measurements = null; } - @Override - public EventType getType() { - return event.getType(); - } - @Override public boolean increaseReferenceCount(String holderMessage) { return !(event instanceof EnrichedEvent) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java index 8cc0d2f4265..1ec0994ecdc 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java @@ -68,16 +68,12 @@ public class PipeConnectorSubtask extends PipeSubtask { } try { - switch (event.getType()) { - case TABLET_INSERTION: - outputPipeConnector.transfer((TabletInsertionEvent) event); - break; - case TSFILE_INSERTION: - outputPipeConnector.transfer((TsFileInsertionEvent) event); - break; - default: - outputPipeConnector.transfer(event); - break; + if (event instanceof TabletInsertionEvent) { + outputPipeConnector.transfer((TabletInsertionEvent) event); + } else if (event instanceof TsFileInsertionEvent) { + outputPipeConnector.transfer((TsFileInsertionEvent) event); + } else { + outputPipeConnector.transfer(event); } releaseLastEvent(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java index dad72977865..0e65894c14d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java @@ -59,16 +59,12 @@ public class PipeProcessorSubtask extends PipeSubtask { } try { - switch (event.getType()) { - case TABLET_INSERTION: - pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector); - break; - case TSFILE_INSERTION: - pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector); - break; - default: - pipeProcessor.process(event, outputEventCollector); - break; + if (event instanceof TabletInsertionEvent) { + pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector); + } else if (event instanceof TsFileInsertionEvent) { + pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector); + } else { + pipeProcessor.process(event, outputEventCollector); } releaseLastEvent(); diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java index a2c3508b294..7cd705af588 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java @@ -32,7 +32,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDa import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.EventType; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.junit.After; @@ -163,9 +163,9 @@ public class PipeRealtimeCollectTest { Arrays.asList( listen( collectors[0], - type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2, + event -> event instanceof TabletInsertionEvent ? 1 : 2, writeNum << 1), - listen(collectors[1], typ2 -> 1, writeNum)); + listen(collectors[1], event -> 1, writeNum)); try { listenFutures.get(0).get(10, TimeUnit.MINUTES); @@ -199,14 +199,14 @@ public class PipeRealtimeCollectTest { Arrays.asList( listen( collectors[0], - type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2, + event -> event instanceof TabletInsertionEvent ? 1 : 2, writeNum << 1), - listen(collectors[1], typ2 -> 1, writeNum), + listen(collectors[1], event -> 1, writeNum), listen( collectors[2], - type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2, + event -> event instanceof TabletInsertionEvent ? 1 : 2, writeNum << 1), - listen(collectors[3], typ2 -> 1, writeNum)); + listen(collectors[3], event -> 1, writeNum)); try { listenFutures.get(0).get(10, TimeUnit.MINUTES); listenFutures.get(1).get(10, TimeUnit.MINUTES); @@ -278,9 +278,7 @@ public class PipeRealtimeCollectTest { } private Future<?> listen( - PipeRealtimeDataRegionCollector collector, - Function<EventType, Integer> weight, - int expectNum) { + PipeRealtimeDataRegionCollector collector, Function<Event, Integer> weight, int expectNum) { return listenerService.submit( () -> { int eventNum = 0; @@ -293,7 +291,7 @@ public class PipeRealtimeCollectTest { throw new RuntimeException(e); } if (event != null) { - eventNum += weight.apply(event.getType()); + eventNum += weight.apply(event); } } } finally {
