This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-task-schedule
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 414f0ffaa2791efed3535517680810dd5f4e7a9d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 22 11:06:08 2023 +0800

    remove getType() method of Event
---
 .../org/apache/iotdb/pipe/api/event/Event.java     |  6 +--
 .../event/dml/insertion/TabletInsertionEvent.java  |  6 ---
 .../event/dml/insertion/TsFileInsertionEvent.java  |  6 ---
 .../PipeRealtimeDataRegionHybridCollector.java     | 49 +++++++++++-----------
 .../event/realtime/PipeRealtimeCollectEvent.java   |  6 ---
 .../db/pipe/task/subtask/PipeConnectorSubtask.java | 16 +++----
 .../db/pipe/task/subtask/PipeProcessorSubtask.java | 16 +++----
 .../core/collector/PipeRealtimeCollectTest.java    | 20 ++++-----
 8 files changed, 47 insertions(+), 78 deletions(-)

diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
index f5d8d2fbcd3..74ddf9e47d2 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
@@ -20,8 +20,4 @@
 package org.apache.iotdb.pipe.api.event;
 
 /** This interface is used to abstract events in collaboration tasks. */
-public interface Event {
-
-  /** @return the type of the event */
-  EventType getType();
-}
+public interface Event {}
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index e353bf397e0..9fd6d89428c 100644
--- 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.pipe.api.event.dml.insertion;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import java.util.Iterator;
@@ -54,9 +53,4 @@ public interface TabletInsertionEvent extends Event {
    *     RowCollector
    */
   TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer);
-
-  @Override
-  default EventType getType() {
-    return EventType.TABLET_INSERTION;
-  }
 }
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index 2d5badc8072..325f0683cb7 100644
--- 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
+++ 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.pipe.api.event.dml.insertion;
 
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
 
 /**
  * TsFileInsertionEvent is used to define the event of writing TsFile. Event 
data stores in disks,
@@ -42,9 +41,4 @@ public interface TsFileInsertionEvent extends Event {
    * @return TsFileInsertionEvent
    */
   TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> 
iterable);
-
-  @Override
-  default EventType getType() {
-    return EventType.TSFILE_INSERTION;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 67d461e8c8a..898b93cb22d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -25,6 +25,8 @@ import 
org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 
 import org.slf4j.Logger;
@@ -48,18 +50,17 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
 
   @Override
   public void collect(PipeRealtimeCollectEvent event) {
-    switch (event.getEvent().getType()) {
-      case TABLET_INSERTION:
-        collectTabletInsertion(event);
-        break;
-      case TSFILE_INSERTION:
-        collectTsFileInsertion(event);
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            String.format(
-                "Unsupported event type %s for Hybrid Realtime Collector %s",
-                event.getEvent().getType(), this));
+    final Event eventToCollect = event.getEvent();
+
+    if (eventToCollect instanceof TabletInsertionEvent) {
+      collectTabletInsertion(event);
+    } else if (eventToCollect instanceof TsFileInsertionEvent) {
+      collectTsFileInsertion(event);
+    } else {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Unsupported event type %s for Hybrid Realtime Collector %s",
+              eventToCollect.getClass(), this));
     }
   }
 
@@ -107,18 +108,18 @@ public class PipeRealtimeDataRegionHybridCollector 
extends PipeRealtimeDataRegio
 
     while (collectEvent != null) {
       Event suppliedEvent;
-      switch (collectEvent.getEvent().getType()) {
-        case TABLET_INSERTION:
-          suppliedEvent = supplyTabletInsertion(collectEvent);
-          break;
-        case TSFILE_INSERTION:
-          suppliedEvent = supplyTsFileInsertion(collectEvent);
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              String.format(
-                  "Unsupported event type %s for Hybrid Realtime Collector %s",
-                  collectEvent.getEvent().getType(), this));
+
+      // used to judge type of event, not directly for supplying.
+      final Event eventToSupply = collectEvent.getEvent();
+      if (eventToSupply instanceof TabletInsertionEvent) {
+        suppliedEvent = supplyTabletInsertion(collectEvent);
+      } else if (eventToSupply instanceof TsFileInsertionEvent) {
+        suppliedEvent = supplyTsFileInsertion(collectEvent);
+      } else {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unsupported event type %s for Hybrid Realtime Collector %s",
+                eventToSupply.getClass(), this));
       }
 
       
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 63fcc891361..0c63131e6e9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
 
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
 
 import java.util.Map;
 
@@ -55,11 +54,6 @@ public class PipeRealtimeCollectEvent implements Event, 
EnrichedEvent {
     device2Measurements = null;
   }
 
-  @Override
-  public EventType getType() {
-    return event.getType();
-  }
-
   @Override
   public boolean increaseReferenceCount(String holderMessage) {
     return !(event instanceof EnrichedEvent)
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 8cc0d2f4265..1ec0994ecdc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -68,16 +68,12 @@ public class PipeConnectorSubtask extends PipeSubtask {
     }
 
     try {
-      switch (event.getType()) {
-        case TABLET_INSERTION:
-          outputPipeConnector.transfer((TabletInsertionEvent) event);
-          break;
-        case TSFILE_INSERTION:
-          outputPipeConnector.transfer((TsFileInsertionEvent) event);
-          break;
-        default:
-          outputPipeConnector.transfer(event);
-          break;
+      if (event instanceof TabletInsertionEvent) {
+        outputPipeConnector.transfer((TabletInsertionEvent) event);
+      } else if (event instanceof TsFileInsertionEvent) {
+        outputPipeConnector.transfer((TsFileInsertionEvent) event);
+      } else {
+        outputPipeConnector.transfer(event);
       }
 
       releaseLastEvent();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index dad72977865..0e65894c14d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -59,16 +59,12 @@ public class PipeProcessorSubtask extends PipeSubtask {
     }
 
     try {
-      switch (event.getType()) {
-        case TABLET_INSERTION:
-          pipeProcessor.process((TabletInsertionEvent) event, 
outputEventCollector);
-          break;
-        case TSFILE_INSERTION:
-          pipeProcessor.process((TsFileInsertionEvent) event, 
outputEventCollector);
-          break;
-        default:
-          pipeProcessor.process(event, outputEventCollector);
-          break;
+      if (event instanceof TabletInsertionEvent) {
+        pipeProcessor.process((TabletInsertionEvent) event, 
outputEventCollector);
+      } else if (event instanceof TsFileInsertionEvent) {
+        pipeProcessor.process((TsFileInsertionEvent) event, 
outputEventCollector);
+      } else {
+        pipeProcessor.process(event, outputEventCollector);
       }
 
       releaseLastEvent();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index a2c3508b294..7cd705af588 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -32,7 +32,7 @@ import 
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDa
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
 import org.junit.After;
@@ -163,9 +163,9 @@ public class PipeRealtimeCollectTest {
           Arrays.asList(
               listen(
                   collectors[0],
-                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[1], typ2 -> 1, writeNum));
+              listen(collectors[1], event -> 1, writeNum));
 
       try {
         listenFutures.get(0).get(10, TimeUnit.MINUTES);
@@ -199,14 +199,14 @@ public class PipeRealtimeCollectTest {
           Arrays.asList(
               listen(
                   collectors[0],
-                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[1], typ2 -> 1, writeNum),
+              listen(collectors[1], event -> 1, writeNum),
               listen(
                   collectors[2],
-                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[3], typ2 -> 1, writeNum));
+              listen(collectors[3], event -> 1, writeNum));
       try {
         listenFutures.get(0).get(10, TimeUnit.MINUTES);
         listenFutures.get(1).get(10, TimeUnit.MINUTES);
@@ -278,9 +278,7 @@ public class PipeRealtimeCollectTest {
   }
 
   private Future<?> listen(
-      PipeRealtimeDataRegionCollector collector,
-      Function<EventType, Integer> weight,
-      int expectNum) {
+      PipeRealtimeDataRegionCollector collector, Function<Event, Integer> 
weight, int expectNum) {
     return listenerService.submit(
         () -> {
           int eventNum = 0;
@@ -293,7 +291,7 @@ public class PipeRealtimeCollectTest {
                 throw new RuntimeException(e);
               }
               if (event != null) {
-                eventNum += weight.apply(event.getType());
+                eventNum += weight.apply(event);
               }
             }
           } finally {

Reply via email to