This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch 1.2-rename-collector-to-extractor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ac7392891c6f69a0466838e147c32ee9d0d1f0f4
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Jun 24 01:32:42 2023 +0800

    pipe rename: collector -> extractor
---
 .../iotdb/commons/pipe/PipeMetaDeSerTest.java      |  2 +-
 .../extractor/CachedSchemaPatternMatcherTest.java  | 37 ++++++------
 ...llectTest.java => PipeRealtimeExtractTest.java} | 66 +++++++++++-----------
 3 files changed, 52 insertions(+), 53 deletions(-)

diff --git 
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
 
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
index c5813959938..08962a920c8 100644
--- 
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
+++ 
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
@@ -44,7 +44,7 @@ public class PipeMetaDeSerTest {
             123L,
             new HashMap() {
               {
-                put("collector-key", "collector-value");
+                put("extractor-key", "extractor-value");
               }
             },
             new HashMap() {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
index f1aa273bfaa..b726625864c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
@@ -49,13 +49,13 @@ public class CachedSchemaPatternMatcherTest {
 
   private CachedSchemaPatternMatcher matcher;
   private ExecutorService executorService;
-  private List<PipeRealtimeDataRegionExtractor> collectorList;
+  private List<PipeRealtimeDataRegionExtractor> extractors;
 
   @Before
   public void setUp() {
     matcher = new CachedSchemaPatternMatcher();
     executorService = Executors.newSingleThreadExecutor();
-    collectorList = new ArrayList<>();
+    extractors = new ArrayList<>();
   }
 
   @After
@@ -65,8 +65,8 @@ public class CachedSchemaPatternMatcherTest {
 
   @Test
   public void testCachedMatcher() throws Exception {
-    PipeRealtimeDataRegionExtractor databaseCollector = new 
PipeRealtimeDataRegionFakeExtractor();
-    databaseCollector.customize(
+    PipeRealtimeDataRegionExtractor dataRegionExtractor = new 
PipeRealtimeDataRegionFakeExtractor();
+    dataRegionExtractor.customize(
         new PipeParameters(
             new HashMap<String, String>() {
               {
@@ -74,14 +74,14 @@ public class CachedSchemaPatternMatcherTest {
               }
             }),
         new PipeTaskRuntimeConfiguration(new 
PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
-    collectorList.add(databaseCollector);
+    extractors.add(dataRegionExtractor);
 
-    int deviceCollectorNum = 10;
-    int seriesCollectorNum = 10;
-    for (int i = 0; i < deviceCollectorNum; i++) {
-      PipeRealtimeDataRegionExtractor deviceCollector = new 
PipeRealtimeDataRegionFakeExtractor();
+    int deviceExtractorNum = 10;
+    int seriesExtractorNum = 10;
+    for (int i = 0; i < deviceExtractorNum; i++) {
+      PipeRealtimeDataRegionExtractor deviceExtractor = new 
PipeRealtimeDataRegionFakeExtractor();
       int finalI1 = i;
-      deviceCollector.customize(
+      deviceExtractor.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
@@ -90,12 +90,12 @@ public class CachedSchemaPatternMatcherTest {
               }),
           new PipeTaskRuntimeConfiguration(
               new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
-      collectorList.add(deviceCollector);
-      for (int j = 0; j < seriesCollectorNum; j++) {
-        PipeRealtimeDataRegionExtractor seriesCollector = new 
PipeRealtimeDataRegionFakeExtractor();
+      extractors.add(deviceExtractor);
+      for (int j = 0; j < seriesExtractorNum; j++) {
+        PipeRealtimeDataRegionExtractor seriesExtractor = new 
PipeRealtimeDataRegionFakeExtractor();
         int finalI = i;
         int finalJ = j;
-        seriesCollector.customize(
+        seriesExtractor.customize(
             new PipeParameters(
                 new HashMap<String, String>() {
                   {
@@ -106,13 +106,12 @@ public class CachedSchemaPatternMatcherTest {
                 }),
             new PipeTaskRuntimeConfiguration(
                 new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
-        collectorList.add(seriesCollector);
+        extractors.add(seriesExtractor);
       }
     }
 
     Future<?> future =
-        executorService.submit(
-            () -> collectorList.forEach(collector -> 
matcher.register(collector)));
+        executorService.submit(() -> extractors.forEach(extractor -> 
matcher.register(extractor)));
 
     int epochNum = 10000;
     int deviceNum = 1000;
@@ -130,12 +129,12 @@ public class CachedSchemaPatternMatcherTest {
             new PipeRealtimeEvent(
                 null, null, Collections.singletonMap("root." + i, 
measurements), "root");
         long startTime = System.currentTimeMillis();
-        matcher.match(event).forEach(collector -> collector.extract(event));
+        matcher.match(event).forEach(extractor -> extractor.extract(event));
         totalTime += (System.currentTimeMillis() - startTime);
       }
       PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap, 
"root");
       long startTime = System.currentTimeMillis();
-      matcher.match(event).forEach(collector -> collector.extract(event));
+      matcher.match(event).forEach(extractor -> extractor.extract(event));
       totalTime += (System.currentTimeMillis() - startTime);
     }
     System.out.println("matcher.getRegisterCount() = " + 
matcher.getRegisterCount());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
similarity index 87%
rename from 
server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java
rename to 
server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
index a30854cc95d..e1846f8c2f6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
@@ -61,9 +61,9 @@ import java.util.function.Function;
 
 import static org.mockito.Mockito.mock;
 
-public class PipeRealtimeCollectTest {
+public class PipeRealtimeExtractTest {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRealtimeExtractTest.class);
 
   private final String dataRegion1 = "1";
   private final String dataRegion2 = "2";
@@ -81,7 +81,7 @@ public class PipeRealtimeCollectTest {
   public void setUp() throws IOException {
     writeService = Executors.newFixedThreadPool(2);
     listenerService = Executors.newFixedThreadPool(4);
-    tmpDir = new 
File(Files.createTempDirectory("pipeRealtimeCollect").toString());
+    tmpDir = new 
File(Files.createTempDirectory("pipeRealtimeExtractor").toString());
     tsFileDir =
         new File(
             tmpDir.getPath()
@@ -99,19 +99,19 @@ public class PipeRealtimeCollectTest {
   }
 
   @Test
-  public void testRealtimeCollectProcess() {
-    // set up realtime collector
+  public void testRealtimeExtractProcess() {
+    // set up realtime extractor
 
-    try (PipeRealtimeDataRegionHybridExtractor collector1 =
+    try (PipeRealtimeDataRegionHybridExtractor extractor1 =
             new PipeRealtimeDataRegionHybridExtractor();
-        PipeRealtimeDataRegionHybridExtractor collector2 =
+        PipeRealtimeDataRegionHybridExtractor extractor2 =
             new PipeRealtimeDataRegionHybridExtractor();
-        PipeRealtimeDataRegionHybridExtractor collector3 =
+        PipeRealtimeDataRegionHybridExtractor extractor3 =
             new PipeRealtimeDataRegionHybridExtractor();
-        PipeRealtimeDataRegionHybridExtractor collector4 =
+        PipeRealtimeDataRegionHybridExtractor extractor4 =
             new PipeRealtimeDataRegionHybridExtractor()) {
 
-      collector1.customize(
+      extractor1.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
@@ -121,7 +121,7 @@ public class PipeRealtimeCollectTest {
           new PipeTaskRuntimeConfiguration(
               new PipeTaskExtractorRuntimeEnvironment(
                   "1", 1, Integer.parseInt(dataRegion1), null)));
-      collector2.customize(
+      extractor2.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
@@ -131,7 +131,7 @@ public class PipeRealtimeCollectTest {
           new PipeTaskRuntimeConfiguration(
               new PipeTaskExtractorRuntimeEnvironment(
                   "1", 1, Integer.parseInt(dataRegion1), null)));
-      collector3.customize(
+      extractor3.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
@@ -141,7 +141,7 @@ public class PipeRealtimeCollectTest {
           new PipeTaskRuntimeConfiguration(
               new PipeTaskExtractorRuntimeEnvironment(
                   "1", 1, Integer.parseInt(dataRegion2), null)));
-      collector4.customize(
+      extractor4.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
@@ -152,14 +152,14 @@ public class PipeRealtimeCollectTest {
               new PipeTaskExtractorRuntimeEnvironment(
                   "1", 1, Integer.parseInt(dataRegion2), null)));
 
-      PipeRealtimeDataRegionExtractor[] collectors =
-          new PipeRealtimeDataRegionExtractor[] {collector1, collector2, 
collector3, collector4};
+      PipeRealtimeDataRegionExtractor[] extractors =
+          new PipeRealtimeDataRegionExtractor[] {extractor1, extractor2, 
extractor3, extractor4};
 
-      // start collector 0, 1
-      collectors[0].start();
-      collectors[1].start();
+      // start extractor 0, 1
+      extractors[0].start();
+      extractors[1].start();
 
-      // test result of collector 0, 1
+      // test result of extractor 0, 1
       int writeNum = 10;
       List<Future<?>> writeFutures =
           Arrays.asList(
@@ -170,16 +170,16 @@ public class PipeRealtimeCollectTest {
       List<Future<?>> listenFutures =
           Arrays.asList(
               listen(
-                  collectors[0],
+                  extractors[0],
                   event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[1], event -> 1, writeNum));
+              listen(extractors[1], event -> 1, writeNum));
 
       try {
         listenFutures.get(0).get(10, TimeUnit.MINUTES);
         listenFutures.get(1).get(10, TimeUnit.MINUTES);
       } catch (TimeoutException e) {
-        LOGGER.warn("Time out when listening collector", e);
+        LOGGER.warn("Time out when listening extractor", e);
         alive.set(false);
         Assert.fail();
       }
@@ -192,11 +192,11 @@ public class PipeRealtimeCollectTest {
             }
           });
 
-      // start collector 2, 3
-      collectors[2].start();
-      collectors[3].start();
+      // start extractor 2, 3
+      extractors[2].start();
+      extractors[3].start();
 
-      // test result of collector 0 - 3
+      // test result of extractor 0 - 3
       writeFutures =
           Arrays.asList(
               write2DataRegion(writeNum, dataRegion1, writeNum),
@@ -206,22 +206,22 @@ public class PipeRealtimeCollectTest {
       listenFutures =
           Arrays.asList(
               listen(
-                  collectors[0],
+                  extractors[0],
                   event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[1], event -> 1, writeNum),
+              listen(extractors[1], event -> 1, writeNum),
               listen(
-                  collectors[2],
+                  extractors[2],
                   event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[3], event -> 1, writeNum));
+              listen(extractors[3], event -> 1, writeNum));
       try {
         listenFutures.get(0).get(10, TimeUnit.MINUTES);
         listenFutures.get(1).get(10, TimeUnit.MINUTES);
         listenFutures.get(2).get(10, TimeUnit.MINUTES);
         listenFutures.get(3).get(10, TimeUnit.MINUTES);
       } catch (TimeoutException e) {
-        LOGGER.warn("Time out when listening collector", e);
+        LOGGER.warn("Time out when listening extractor", e);
         alive.set(false);
         Assert.fail();
       }
@@ -291,7 +291,7 @@ public class PipeRealtimeCollectTest {
   }
 
   private Future<?> listen(
-      PipeRealtimeDataRegionExtractor collector, Function<Event, Integer> 
weight, int expectNum) {
+      PipeRealtimeDataRegionExtractor extractor, Function<Event, Integer> 
weight, int expectNum) {
     return listenerService.submit(
         () -> {
           int eventNum = 0;
@@ -299,7 +299,7 @@ public class PipeRealtimeCollectTest {
             while (alive.get() && eventNum < expectNum) {
               Event event;
               try {
-                event = collector.supply();
+                event = extractor.supply();
               } catch (Exception e) {
                 throw new RuntimeException(e);
               }

Reply via email to