Repository: samza Updated Branches: refs/heads/master e47edbe55 -> cc314be3b
SAMZA-2021: Adding an API to rel converter to filter out system messages. Author: Aditya Toomula <[email protected]> Reviewers: srinipunuru Closes #839 from atoomula/system and squashes the following commits: 0dcba87b [Aditya Toomula] Adding an API to rel converter to filter out system messages. 2bee3ba4 [Aditya Toomula] Adding an API to rel converter to filter out system messages. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cc314be3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cc314be3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cc314be3 Branch: refs/heads/master Commit: cc314be3b84fbf8c874c9532f1470ea2d9edd116 Parents: e47edbe Author: Aditya Toomula <[email protected]> Authored: Tue Dec 4 15:50:07 2018 -0800 Committer: Aditya Toomula <[email protected]> Committed: Tue Dec 4 15:50:07 2018 -0800 ---------------------------------------------------------------------- .../samza/sql/interfaces/SamzaRelConverter.java | 11 ++++ .../samza/sql/translator/ScanTranslator.java | 33 +++++++++++- .../sql/testutil/SampleRelConverterFactory.java | 56 ++++++++++++++++++++ .../test/samzasql/TestSamzaSqlEndToEnd.java | 28 +++++++++- 4 files changed, 124 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/cc314be3/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java index 12d5f28..6e5b395 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java @@ -31,6 +31,16 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage; */ public interface SamzaRelConverter { /** + * Determine if the input samza message is a system message. + * This API will soon be removed when descriptor creation is done by SamzaRelConverter. + * @param message input samza message. + * @return true if the input message is system message. + */ + default boolean isSystemMessage(KV<Object, Object> message) { + return false; + } + + /** * Converts the object to relational message corresponding to the tableName with relational schema. * @param message samza message that needs to be converted. * @return Relational message extracted from the object. @@ -43,4 +53,5 @@ public interface SamzaRelConverter { * @return the key and value of the Samza message */ KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage); + } http://git-wip-us.apache.org/repos/asf/samza/blob/cc314be3/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java index 2a5a0e8..e564cae 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java @@ -25,6 +25,7 @@ import org.apache.calcite.rel.core.TableScan; import org.apache.commons.lang.Validate; import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.context.Context; +import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.system.descriptors.GenericInputDescriptor; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; @@ -49,6 +50,30 @@ class ScanTranslator { private final Map<String, SqlIOConfig> systemStreamConfig; private final int queryId; + // FilterFunction to filter out any messages that are system specific. + private static class FilterSystemMessageFunction implements FilterFunction<KV<Object, Object>> { + private transient SamzaRelConverter relConverter; + private final String source; + private final int queryId; + + FilterSystemMessageFunction(String source, int queryId) { + this.source = source; + this.queryId = queryId; + } + + @Override + public void init(Context context) { + TranslatorContext translatorContext = + ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId); + relConverter = translatorContext.getMsgConverter(source); + } + + @Override + public boolean apply(KV<Object, Object> message) { + return !relConverter.isSystemMessage(message); + } + } + ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlIOConfig> ssc, int queryId) { relMsgConverters = converters; this.systemStreamConfig = ssc; @@ -109,8 +134,12 @@ class ScanTranslator { sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new); GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamId, noOpKVSerde); - MessageStream<KV<Object, Object>> inputStream = inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd)); - MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName, queryId)); + MessageStream<KV<Object, Object>> inputStream = + inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd)); + MessageStream<KV<Object, Object>> outputStream = + inputStream.filter(new FilterSystemMessageFunction(sourceName, queryId)); + MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = + outputStream.map(new ScanMapFunction(sourceName, queryId)); context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream); } } http://git-wip-us.apache.org/repos/asf/samza/blob/cc314be3/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java new file mode 100644 index 0000000..7c67082 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java @@ -0,0 +1,56 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.testutil; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.sql.avro.AvroRelConverter; +import org.apache.samza.sql.avro.AvroRelSchemaProvider; +import org.apache.samza.sql.interfaces.RelSchemaProvider; +import org.apache.samza.sql.interfaces.SamzaRelConverter; +import org.apache.samza.sql.interfaces.SamzaRelConverterFactory; +import org.apache.samza.system.SystemStream; + + +/** + * SampleRelConverter is an {@link AvroRelConverter} which identifies alternate messages as system messages. + * This is used purely for testing system messages. + */ +public class SampleRelConverterFactory implements SamzaRelConverterFactory { + + private int i = 0; + + @Override + public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config) { + return new SampleRelConverter(systemStream, (AvroRelSchemaProvider) relSchemaProvider, config); + } + + public class SampleRelConverter extends AvroRelConverter { + public SampleRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) { + super(systemStream, schemaProvider, config); + } + + @Override + public boolean isSystemMessage(KV<Object, Object> kv) { + // Return alternate ones as system messages. + return (i++) % 2 == 0; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/cc314be3/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index 3fc5750..d593870 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -39,6 +39,7 @@ import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; import org.apache.samza.sql.system.TestAvroSystemFactory; import org.apache.samza.sql.testutil.JsonUtil; import org.apache.samza.sql.testutil.MyTestUdf; +import org.apache.samza.sql.testutil.SampleRelConverterFactory; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; @@ -91,6 +92,29 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { } @Test + public void testEndToEndWithSystemMessages() { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String avroSamzaToRelMsgConverterDomain = + String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro"); + staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + SampleRelConverterFactory.class.getName()); + String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<Integer> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) + .sorted() + .collect(Collectors.toList()); + Assert.assertEquals((numMessages + 1) / 2, outMessages.size()); + } + + @Test public void testEndToEndWithNullRecords() { int numMessages = 20; @@ -138,7 +162,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { @Test public void testEndToEndMultiSqlStmts() { - int numMessages = 4; + int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; @@ -205,7 +229,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { @Test public void testEndToEndFanOut() { - int numMessages = 4; + int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); String sql1 = "Insert into testavro.SIMPLE2 select * from testavro.SIMPLE1";
