Repository: samza
Updated Branches:
  refs/heads/master e47edbe55 -> cc314be3b


SAMZA-2021: Adding an API to rel converter to filter out system messages.

Author: Aditya Toomula <[email protected]>

Reviewers: srinipunuru

Closes #839 from atoomula/system and squashes the following commits:

0dcba87b [Aditya Toomula] Adding an API to rel converter to filter out system 
messages.
2bee3ba4 [Aditya Toomula] Adding an API to rel converter to filter out system 
messages.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cc314be3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cc314be3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cc314be3

Branch: refs/heads/master
Commit: cc314be3b84fbf8c874c9532f1470ea2d9edd116
Parents: e47edbe
Author: Aditya Toomula <[email protected]>
Authored: Tue Dec 4 15:50:07 2018 -0800
Committer: Aditya Toomula <[email protected]>
Committed: Tue Dec 4 15:50:07 2018 -0800

----------------------------------------------------------------------
 .../samza/sql/interfaces/SamzaRelConverter.java | 11 ++++
 .../samza/sql/translator/ScanTranslator.java    | 33 +++++++++++-
 .../sql/testutil/SampleRelConverterFactory.java | 56 ++++++++++++++++++++
 .../test/samzasql/TestSamzaSqlEndToEnd.java     | 28 +++++++++-
 4 files changed, 124 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/cc314be3/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
index 12d5f28..6e5b395 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
@@ -31,6 +31,16 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage;
  */
 public interface SamzaRelConverter {
   /**
+   * Determine if the input samza message is a system message.
+   * This API will soon be removed when descriptor creation is done by 
SamzaRelConverter.
+   * @param message input samza message.
+   * @return true if the input message is system message.
+   */
+  default boolean isSystemMessage(KV<Object, Object> message) {
+    return false;
+  }
+
+  /**
    * Converts the object to relational message corresponding to the tableName 
with relational schema.
    * @param message samza message that needs to be converted.
    * @return Relational message extracted from the object.
@@ -43,4 +53,5 @@ public interface SamzaRelConverter {
    * @return the key and value of the Samza message
    */
   KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage);
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cc314be3/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 2a5a0e8..e564cae 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.core.TableScan;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.context.Context;
+import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -49,6 +50,30 @@ class ScanTranslator {
   private final Map<String, SqlIOConfig> systemStreamConfig;
   private final int queryId;
 
+  // FilterFunction to filter out any messages that are system specific.
+  private static class FilterSystemMessageFunction implements 
FilterFunction<KV<Object, Object>> {
+    private transient SamzaRelConverter relConverter;
+    private final String source;
+    private final int queryId;
+
+    FilterSystemMessageFunction(String source, int queryId) {
+      this.source = source;
+      this.queryId = queryId;
+    }
+
+    @Override
+    public void init(Context context) {
+      TranslatorContext translatorContext =
+          ((SamzaSqlApplicationContext) 
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+      relConverter = translatorContext.getMsgConverter(source);
+    }
+
+    @Override
+    public boolean apply(KV<Object, Object> message) {
+      return !relConverter.isSystemMessage(message);
+    }
+  }
+
   ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, 
SqlIOConfig> ssc, int queryId) {
     relMsgConverters = converters;
     this.systemStreamConfig = ssc;
@@ -109,8 +134,12 @@ class ScanTranslator {
         sd = systemDescriptors.computeIfAbsent(systemName, 
DelegatingSystemDescriptor::new);
     GenericInputDescriptor<KV<Object, Object>> isd = 
sd.getInputDescriptor(streamId, noOpKVSerde);
 
-    MessageStream<KV<Object, Object>> inputStream = 
inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd));
-    MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = 
inputStream.map(new ScanMapFunction(sourceName, queryId));
+    MessageStream<KV<Object, Object>> inputStream =
+        inputMsgStreams.computeIfAbsent(source, v -> 
streamAppDesc.getInputStream(isd));
+    MessageStream<KV<Object, Object>> outputStream =
+        inputStream.filter(new FilterSystemMessageFunction(sourceName, 
queryId));
+    MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
+        outputStream.map(new ScanMapFunction(sourceName, queryId));
     context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cc314be3/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
new file mode 100644
index 0000000..7c67082
--- /dev/null
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelConverterFactory.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.avro.AvroRelConverter;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * SampleRelConverter is an {@link AvroRelConverter} which identifies 
alternate messages as system messages.
+ * This is used purely for testing system messages.
+ */
+public class SampleRelConverterFactory implements SamzaRelConverterFactory {
+
+  private int i = 0;
+
+  @Override
+  public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider 
relSchemaProvider, Config config) {
+    return new SampleRelConverter(systemStream, (AvroRelSchemaProvider) 
relSchemaProvider, config);
+  }
+
+  public class SampleRelConverter extends AvroRelConverter {
+    public SampleRelConverter(SystemStream systemStream, AvroRelSchemaProvider 
schemaProvider, Config config) {
+      super(systemStream, schemaProvider, config);
+    }
+
+    @Override
+    public boolean isSystemMessage(KV<Object, Object> kv) {
+      // Return alternate ones as system messages.
+      return (i++) % 2 == 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/cc314be3/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 3fc5750..d593870 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -39,6 +39,7 @@ import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
 import org.apache.samza.sql.testutil.JsonUtil;
 import org.apache.samza.sql.testutil.MyTestUdf;
+import org.apache.samza.sql.testutil.SampleRelConverterFactory;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
@@ -91,6 +92,29 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
   }
 
   @Test
+  public void testEndToEndWithSystemMessages() {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String avroSamzaToRelMsgConverterDomain =
+        
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
"avro");
+    staticConfigs.put(avroSamzaToRelMsgConverterDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
+        SampleRelConverterFactory.class.getName());
+    String sql = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals((numMessages + 1) / 2, outMessages.size());
+  }
+
+  @Test
   public void testEndToEndWithNullRecords() {
     int numMessages = 20;
 
@@ -138,7 +162,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
   @Test
   public void testEndToEndMultiSqlStmts() {
-    int numMessages = 4;
+    int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql1 = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
@@ -205,7 +229,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
   @Test
   public void testEndToEndFanOut() {
-    int numMessages = 4;
+    int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql1 = "Insert into testavro.SIMPLE2 select * from 
testavro.SIMPLE1";

Reply via email to