Repository: samza
Updated Branches:
  refs/heads/master b25df5f53 -> ebeccd203


SAMZA-1999: Fix NullPointerException when sink is log.outputstream

## What changes were proposed in this pull request?
The PR is to fix a bug which throws NullPointerException when sink is 
log.outputstream

## How was this patch tested?
Pass build and current tests.
Test in Samza SQL shell.

Author: Weiqing Yang <[email protected]>

Reviewers: Srinivasulu Punuru <[email protected]>

Closes #808 from weiqingy/SAMZA-1999


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ebeccd20
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ebeccd20
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ebeccd20

Branch: refs/heads/master
Commit: ebeccd203166aa20574a95c07dea70d5db72cd83
Parents: b25df5f
Author: Weiqing Yang <[email protected]>
Authored: Mon Nov 19 09:59:23 2018 -0800
Committer: Srinivasulu Punuru <[email protected]>
Committed: Mon Nov 19 09:59:23 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/sql/translator/QueryTranslator.java     | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ebeccd20/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index a826f9f..33781a6 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -193,16 +193,16 @@ public class QueryTranslator {
     sqlConfig.getOutputSystemStreamConfigsBySource().keySet().forEach(
         key -> {
           if 
(key.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
-            sendToOutputStream(streamAppDescriptor, translatorContext, node, 
queryId);
+            sendToOutputStream(key, streamAppDescriptor, translatorContext, 
node, queryId);
           }
         }
     );
   }
 
-  private void sendToOutputStream(StreamApplicationDescriptor appDesc, 
TranslatorContext context, RelNode node, int queryId) {
-    SqlIOConfig sinkConfig = 
sqlConfig.getOutputSystemStreamConfigsBySource().get(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG);
+  private void sendToOutputStream(String sinkStream, 
StreamApplicationDescriptor appDesc, TranslatorContext context, RelNode node, 
int queryId) {
+    SqlIOConfig sinkConfig = 
sqlConfig.getOutputSystemStreamConfigsBySource().get(sinkStream);
     MessageStream<SamzaSqlRelMessage> stream = 
context.getMessageStream(node.getId());
-    MessageStream<KV<Object, Object>> outputStream = stream.map(new 
OutputMapFunction(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG, queryId));
+    MessageStream<KV<Object, Object>> outputStream = stream.map(new 
OutputMapFunction(sinkStream, queryId));
     Optional<TableDescriptor> tableDescriptor = 
sinkConfig.getTableDescriptor();
     if (!tableDescriptor.isPresent()) {
       KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>());

Reply via email to