This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch 1.2-rename-collector-to-extractor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ac7392891c6f69a0466838e147c32ee9d0d1f0f4 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 24 01:32:42 2023 +0800 pipe rename: collector -> extractor --- .../iotdb/commons/pipe/PipeMetaDeSerTest.java | 2 +- .../extractor/CachedSchemaPatternMatcherTest.java | 37 ++++++------ ...llectTest.java => PipeRealtimeExtractTest.java} | 66 +++++++++++----------- 3 files changed, 52 insertions(+), 53 deletions(-) diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java index c5813959938..08962a920c8 100644 --- a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java +++ b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java @@ -44,7 +44,7 @@ public class PipeMetaDeSerTest { 123L, new HashMap() { { - put("collector-key", "collector-value"); + put("extractor-key", "extractor-value"); } }, new HashMap() { diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java index f1aa273bfaa..b726625864c 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java @@ -49,13 +49,13 @@ public class CachedSchemaPatternMatcherTest { private CachedSchemaPatternMatcher matcher; private ExecutorService executorService; - private List<PipeRealtimeDataRegionExtractor> collectorList; + private List<PipeRealtimeDataRegionExtractor> extractors; @Before public void setUp() { matcher = new CachedSchemaPatternMatcher(); executorService = Executors.newSingleThreadExecutor(); - collectorList = new ArrayList<>(); + extractors = new ArrayList<>(); } @After @@ -65,8 +65,8 @@ public class CachedSchemaPatternMatcherTest { @Test public void testCachedMatcher() throws Exception { - PipeRealtimeDataRegionExtractor databaseCollector = new PipeRealtimeDataRegionFakeExtractor(); - databaseCollector.customize( + PipeRealtimeDataRegionExtractor dataRegionExtractor = new PipeRealtimeDataRegionFakeExtractor(); + dataRegionExtractor.customize( new PipeParameters( new HashMap<String, String>() { { @@ -74,14 +74,14 @@ public class CachedSchemaPatternMatcherTest { } }), new PipeTaskRuntimeConfiguration(new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null))); - collectorList.add(databaseCollector); + extractors.add(dataRegionExtractor); - int deviceCollectorNum = 10; - int seriesCollectorNum = 10; - for (int i = 0; i < deviceCollectorNum; i++) { - PipeRealtimeDataRegionExtractor deviceCollector = new PipeRealtimeDataRegionFakeExtractor(); + int deviceExtractorNum = 10; + int seriesExtractorNum = 10; + for (int i = 0; i < deviceExtractorNum; i++) { + PipeRealtimeDataRegionExtractor deviceExtractor = new PipeRealtimeDataRegionFakeExtractor(); int finalI1 = i; - deviceCollector.customize( + deviceExtractor.customize( new PipeParameters( new HashMap<String, String>() { { @@ -90,12 +90,12 @@ public class CachedSchemaPatternMatcherTest { }), new PipeTaskRuntimeConfiguration( new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null))); - collectorList.add(deviceCollector); - for (int j = 0; j < seriesCollectorNum; j++) { - PipeRealtimeDataRegionExtractor seriesCollector = new PipeRealtimeDataRegionFakeExtractor(); + extractors.add(deviceExtractor); + for (int j = 0; j < seriesExtractorNum; j++) { + PipeRealtimeDataRegionExtractor seriesExtractor = new PipeRealtimeDataRegionFakeExtractor(); int finalI = i; int finalJ = j; - seriesCollector.customize( + seriesExtractor.customize( new PipeParameters( new HashMap<String, String>() { { @@ -106,13 +106,12 @@ public class CachedSchemaPatternMatcherTest { }), new PipeTaskRuntimeConfiguration( new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null))); - collectorList.add(seriesCollector); + extractors.add(seriesExtractor); } } Future<?> future = - executorService.submit( - () -> collectorList.forEach(collector -> matcher.register(collector))); + executorService.submit(() -> extractors.forEach(extractor -> matcher.register(extractor))); int epochNum = 10000; int deviceNum = 1000; @@ -130,12 +129,12 @@ public class CachedSchemaPatternMatcherTest { new PipeRealtimeEvent( null, null, Collections.singletonMap("root." + i, measurements), "root"); long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(collector -> collector.extract(event)); + matcher.match(event).forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap, "root"); long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(collector -> collector.extract(event)); + matcher.match(event).forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } System.out.println("matcher.getRegisterCount() = " + matcher.getRegisterCount()); diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java similarity index 87% rename from server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java rename to server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java index a30854cc95d..e1846f8c2f6 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeCollectTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java @@ -61,9 +61,9 @@ import java.util.function.Function; import static org.mockito.Mockito.mock; -public class PipeRealtimeCollectTest { +public class PipeRealtimeExtractTest { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeCollectTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeExtractTest.class); private final String dataRegion1 = "1"; private final String dataRegion2 = "2"; @@ -81,7 +81,7 @@ public class PipeRealtimeCollectTest { public void setUp() throws IOException { writeService = Executors.newFixedThreadPool(2); listenerService = Executors.newFixedThreadPool(4); - tmpDir = new File(Files.createTempDirectory("pipeRealtimeCollect").toString()); + tmpDir = new File(Files.createTempDirectory("pipeRealtimeExtractor").toString()); tsFileDir = new File( tmpDir.getPath() @@ -99,19 +99,19 @@ public class PipeRealtimeCollectTest { } @Test - public void testRealtimeCollectProcess() { - // set up realtime collector + public void testRealtimeExtractProcess() { + // set up realtime extractor - try (PipeRealtimeDataRegionHybridExtractor collector1 = + try (PipeRealtimeDataRegionHybridExtractor extractor1 = new PipeRealtimeDataRegionHybridExtractor(); - PipeRealtimeDataRegionHybridExtractor collector2 = + PipeRealtimeDataRegionHybridExtractor extractor2 = new PipeRealtimeDataRegionHybridExtractor(); - PipeRealtimeDataRegionHybridExtractor collector3 = + PipeRealtimeDataRegionHybridExtractor extractor3 = new PipeRealtimeDataRegionHybridExtractor(); - PipeRealtimeDataRegionHybridExtractor collector4 = + PipeRealtimeDataRegionHybridExtractor extractor4 = new PipeRealtimeDataRegionHybridExtractor()) { - collector1.customize( + extractor1.customize( new PipeParameters( new HashMap<String, String>() { { @@ -121,7 +121,7 @@ public class PipeRealtimeCollectTest { new PipeTaskRuntimeConfiguration( new PipeTaskExtractorRuntimeEnvironment( "1", 1, Integer.parseInt(dataRegion1), null))); - collector2.customize( + extractor2.customize( new PipeParameters( new HashMap<String, String>() { { @@ -131,7 +131,7 @@ public class PipeRealtimeCollectTest { new PipeTaskRuntimeConfiguration( new PipeTaskExtractorRuntimeEnvironment( "1", 1, Integer.parseInt(dataRegion1), null))); - collector3.customize( + extractor3.customize( new PipeParameters( new HashMap<String, String>() { { @@ -141,7 +141,7 @@ public class PipeRealtimeCollectTest { new PipeTaskRuntimeConfiguration( new PipeTaskExtractorRuntimeEnvironment( "1", 1, Integer.parseInt(dataRegion2), null))); - collector4.customize( + extractor4.customize( new PipeParameters( new HashMap<String, String>() { { @@ -152,14 +152,14 @@ public class PipeRealtimeCollectTest { new PipeTaskExtractorRuntimeEnvironment( "1", 1, Integer.parseInt(dataRegion2), null))); - PipeRealtimeDataRegionExtractor[] collectors = - new PipeRealtimeDataRegionExtractor[] {collector1, collector2, collector3, collector4}; + PipeRealtimeDataRegionExtractor[] extractors = + new PipeRealtimeDataRegionExtractor[] {extractor1, extractor2, extractor3, extractor4}; - // start collector 0, 1 - collectors[0].start(); - collectors[1].start(); + // start extractor 0, 1 + extractors[0].start(); + extractors[1].start(); - // test result of collector 0, 1 + // test result of extractor 0, 1 int writeNum = 10; List<Future<?>> writeFutures = Arrays.asList( @@ -170,16 +170,16 @@ public class PipeRealtimeCollectTest { List<Future<?>> listenFutures = Arrays.asList( listen( - collectors[0], + extractors[0], event -> event instanceof TabletInsertionEvent ? 1 : 2, writeNum << 1), - listen(collectors[1], event -> 1, writeNum)); + listen(extractors[1], event -> 1, writeNum)); try { listenFutures.get(0).get(10, TimeUnit.MINUTES); listenFutures.get(1).get(10, TimeUnit.MINUTES); } catch (TimeoutException e) { - LOGGER.warn("Time out when listening collector", e); + LOGGER.warn("Time out when listening extractor", e); alive.set(false); Assert.fail(); } @@ -192,11 +192,11 @@ public class PipeRealtimeCollectTest { } }); - // start collector 2, 3 - collectors[2].start(); - collectors[3].start(); + // start extractor 2, 3 + extractors[2].start(); + extractors[3].start(); - // test result of collector 0 - 3 + // test result of extractor 0 - 3 writeFutures = Arrays.asList( write2DataRegion(writeNum, dataRegion1, writeNum), @@ -206,22 +206,22 @@ public class PipeRealtimeCollectTest { listenFutures = Arrays.asList( listen( - collectors[0], + extractors[0], event -> event instanceof TabletInsertionEvent ? 1 : 2, writeNum << 1), - listen(collectors[1], event -> 1, writeNum), + listen(extractors[1], event -> 1, writeNum), listen( - collectors[2], + extractors[2], event -> event instanceof TabletInsertionEvent ? 1 : 2, writeNum << 1), - listen(collectors[3], event -> 1, writeNum)); + listen(extractors[3], event -> 1, writeNum)); try { listenFutures.get(0).get(10, TimeUnit.MINUTES); listenFutures.get(1).get(10, TimeUnit.MINUTES); listenFutures.get(2).get(10, TimeUnit.MINUTES); listenFutures.get(3).get(10, TimeUnit.MINUTES); } catch (TimeoutException e) { - LOGGER.warn("Time out when listening collector", e); + LOGGER.warn("Time out when listening extractor", e); alive.set(false); Assert.fail(); } @@ -291,7 +291,7 @@ public class PipeRealtimeCollectTest { } private Future<?> listen( - PipeRealtimeDataRegionExtractor collector, Function<Event, Integer> weight, int expectNum) { + PipeRealtimeDataRegionExtractor extractor, Function<Event, Integer> weight, int expectNum) { return listenerService.submit( () -> { int eventNum = 0; @@ -299,7 +299,7 @@ public class PipeRealtimeCollectTest { while (alive.get() && eventNum < expectNum) { Event event; try { - event = collector.supply(); + event = extractor.supply(); } catch (Exception e) { throw new RuntimeException(e); }
