Repository: samza Updated Branches: refs/heads/master 6dd3e1823 -> 1d6f693c8
SAMZA-1986: Samza-sql: Use system name along with stream name for streamId Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes #800 from atoomula/streamid Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1d6f693c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1d6f693c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1d6f693c Branch: refs/heads/master Commit: 1d6f693c84c7476d0d51a3a47e41908f7ddd1646 Parents: 6dd3e18 Author: Aditya Toomula <[email protected]> Authored: Mon Nov 12 11:42:51 2018 -0800 Committer: Srinivasulu Punuru <[email protected]> Committed: Mon Nov 12 11:42:51 2018 -0800 ---------------------------------------------------------------------- .../samza/sql/interfaces/SqlIOConfig.java | 29 +++++++------ .../sql/runner/SamzaSqlApplicationRunner.java | 4 +- .../samza/sql/translator/ModifyTranslator.java | 4 +- .../samza/sql/translator/QueryTranslator.java | 2 +- .../samza/sql/translator/ScanTranslator.java | 4 +- .../runner/TestSamzaSqlApplicationRunner.java | 8 ++-- .../samza/sql/system/TestAvroSystemFactory.java | 34 ++++++++++----- .../samza/sql/testutil/SamzaSqlTestConfig.java | 23 ++++++++++ .../test/samzasql/TestSamzaSqlEndToEnd.java | 45 ++++++++++++++++++++ 9 files changed, 119 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java index 3ef1795..5fa30e7 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java @@ -43,9 +43,7 @@ public class SqlIOConfig { public static final String CFG_SAMZA_REL_TABLE_KEY_CONVERTER = "samzaRelTableKeyConverterName"; public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName"; - private final String systemName; - - private final String streamName; + private final String streamId; private final String samzaRelConverterName; private final String samzaRelTableKeyConverterName; @@ -71,13 +69,16 @@ public class SqlIOConfig { public SqlIOConfig(String systemName, String streamName, List<String> sourceParts, Config systemConfig, TableDescriptor tableDescriptor) { HashMap<String, String> streamConfigs = new HashMap<>(systemConfig); - this.systemName = systemName; - this.streamName = streamName; this.source = getSourceFromSourceParts(sourceParts); this.sourceParts = sourceParts; this.systemStream = new SystemStream(systemName, streamName); this.tableDescriptor = Optional.ofNullable(tableDescriptor); + // Remote table has no backing stream associated with it and hence streamId does not make sense. But let's keep it + // for uniformity. Remote table has table descriptor defined. + // Local table has both backing stream and a tableDescriptor defined. + this.streamId = String.format("%s-%s", systemName, streamName); + samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER); Validate.notEmpty(samzaRelConverterName, String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName)); @@ -96,10 +97,14 @@ public class SqlIOConfig { streamConfigs.remove(CFG_SAMZA_REL_CONVERTER); streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER); - // Currently, only local table is supported. And it is assumed that all tables are local tables. - if (tableDescriptor != null) { - streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamName), "true"); - streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamName), "oldest"); + if (!isRemoteTable()) { + // The below config is required for local table and streams but not for remote table. + streamConfigs.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), streamId), streamName); + if (tableDescriptor != null) { + // For local table, set the bootstrap config and default offset to oldest + streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamId), "true"); + streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamId), "oldest"); + } } config = new MapConfig(streamConfigs); @@ -114,11 +119,11 @@ public class SqlIOConfig { } public String getSystemName() { - return systemName; + return systemStream.getSystem(); } - public String getStreamName() { - return streamName; + public String getStreamId() { + return streamId; } public String getSamzaRelConverterName() { http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java index 9d361fb..41e37f1 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java @@ -94,14 +94,14 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner { // Populate stream to system mapping config for input and output system streams for (String source : inputSystemStreams) { SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(source); - newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()), + newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamId()), inputSystemStreamConfig.getSystemName()); newConfig.putAll(inputSystemStreamConfig.getConfig()); } for (String sink : outputSystemStreams) { SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(sink); - newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()), + newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamId()), outputSystemStreamConfig.getSystemName()); newConfig.putAll(outputSystemStreamConfig.getConfig()); } http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java index b26f4a7..dbeabab 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java @@ -99,13 +99,13 @@ class ModifyTranslator { SqlIOConfig sinkConfig = systemStreamConfig.get(targetName); final String systemName = sinkConfig.getSystemName(); - final String streamName = sinkConfig.getStreamName(); + final String streamId = sinkConfig.getStreamId(); final String source = sinkConfig.getSource(); KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); DelegatingSystemDescriptor sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new); - GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamName, noOpKVSerde); + GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamId, noOpKVSerde); MessageStreamImpl<SamzaSqlRelMessage> stream = (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(tableModify.getInput().getId()); http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index 7f3c11e..a826f9f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -209,7 +209,7 @@ public class QueryTranslator { String systemName = sinkConfig.getSystemName(); DelegatingSystemDescriptor sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new); - GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde); + GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde); if (OutputMapFunction.logOutputStream == null) { OutputMapFunction.logOutputStream = appDesc.getOutputStream(osd); } http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java index 2615aad..2a5a0e8 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java @@ -90,7 +90,7 @@ class ScanTranslator { Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName)); SqlIOConfig sqlIOConfig = systemStreamConfig.get(sourceName); final String systemName = sqlIOConfig.getSystemName(); - final String streamName = sqlIOConfig.getStreamName(); + final String streamId = sqlIOConfig.getStreamId(); final String source = sqlIOConfig.getSource(); final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() && @@ -107,7 +107,7 @@ class ScanTranslator { KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); DelegatingSystemDescriptor sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new); - GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde); + GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamId, noOpKVSerde); MessageStream<KV<Object, Object>> inputStream = inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd)); MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName, queryId)); http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java index f0df3a9..ccab449 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java @@ -40,13 +40,13 @@ public class TestSamzaSqlApplicationRunner { MapConfig samzaConfig = new MapConfig(configs); Config newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(true, samzaConfig); Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), LocalApplicationRunner.class.getName()); - // Check whether three new configs added. - Assert.assertEquals(newConfigs.size(), configs.size() + 3); + // Check whether five new configs added. + Assert.assertEquals(newConfigs.size(), configs.size() + 5); newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(false, samzaConfig); Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), RemoteApplicationRunner.class.getName()); - // Check whether three new configs added. - Assert.assertEquals(newConfigs.size(), configs.size() + 3); + // Check whether five new configs added. + Assert.assertEquals(newConfigs.size(), configs.size() + 5); } } http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java index 9a9e269..f7d13a4 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java @@ -49,6 +49,7 @@ import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemFactory; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemStreamPartition; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,17 +165,28 @@ public class TestAvroSystemFactory implements SystemFactory { @Override public void register(SystemStreamPartition systemStreamPartition, String offset) { - if (systemStreamPartition.getStream().toLowerCase().contains("simple1")) { - simpleRecordSsps.add(systemStreamPartition); - } - if (systemStreamPartition.getStream().toLowerCase().contains("profile")) { - profileRecordSsps.add(systemStreamPartition); - } - if (systemStreamPartition.getStream().toLowerCase().contains("company")) { - companyRecordSsps.add(systemStreamPartition); - } - if (systemStreamPartition.getStream().toLowerCase().contains("pageview")) { - pageViewRecordSsps.add(systemStreamPartition); + switch (systemStreamPartition.getStream().toLowerCase()) { + case "simple1": + simpleRecordSsps.add(systemStreamPartition); + break; + case "profile": + profileRecordSsps.add(systemStreamPartition); + break; + case "company": + companyRecordSsps.add(systemStreamPartition); + break; + case "pageview": + pageViewRecordSsps.add(systemStreamPartition); + break; + case "complex1": + break; + case "simple2": + break; + case "simple3": + break; + default: + Assert.assertTrue(String.format("ssp %s is not recognized", systemStreamPartition), false); + break; } curMessagesPerSsp.put(systemStreamPartition, 0); } http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java index 80b47eb..f9f4124 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java @@ -53,6 +53,8 @@ import static org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory.TES public class SamzaSqlTestConfig { public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro"; + public static final String SAMZA_SYSTEM_TEST_AVRO2 = "testavro2"; + public static final String SAMZA_SYSTEM_TEST_DB = "testDb"; public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) { return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false); @@ -109,6 +111,24 @@ public class SamzaSqlTestConfig { staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_TABLE_KEY_CONVERTER, "sample"); staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + String avro2SystemConfigPrefix = + String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO2); + String avro2SamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO2); + staticConfigs.put(avro2SystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName()); + staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES, + String.valueOf(numberOfMessages)); + staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS, + includeNullForeignKeys ? "true" : "false"); + staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS, + String.valueOf(windowDurationMs / 2)); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs)); + staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro"); + staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + + String testDbSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_DB); + staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro"); + staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + String avroSamzaToRelMsgConverterDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro"); staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY, @@ -133,6 +153,9 @@ public class SamzaSqlTestConfig { "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString()); staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro2", "SIMPLE1"), SimpleRecord.SCHEMA$.toString()); + + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, "testavro", "SIMPLE2"), SimpleRecord.SCHEMA$.toString()); staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index 77538ff..3f2148c 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -90,6 +90,26 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { } @Test + public void testEndToEndWithDifferentSystemSameStream() { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String sql = "Insert into testavro2.SIMPLE1 select * from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<Integer> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) + .sorted() + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages)); + } + + @Test public void testEndToEndMultiSqlStmts() { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -110,6 +130,31 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet))); } + // The below test won't work until SAMZA-1990 is fixed. Currently, Samza framework does not allow same system stream + // to be used as both input and output stream. + @Ignore + @Test + public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() { + int numMessages = 20; + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String sql1 = "Insert into testavro.SIMPLE1 select * from testavro.SIMPLE2"; + String sql2 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql1, sql2); + + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + List<Integer> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) + .sorted() + .collect(Collectors.toList()); + Assert.assertEquals(numMessages * 2, outMessages.size()); + Set<Integer> outMessagesSet = new HashSet<>(outMessages); + Assert.assertEquals(numMessages, outMessagesSet.size()); + Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet))); + } + @Test public void testEndToEndFanIn() { int numMessages = 20;
