Repository: samza Updated Branches: refs/heads/master b25df5f53 -> ebeccd203
SAMZA-1999: Fix NullPointerException when sink is log.outputstream ## What changes were proposed in this pull request? The PR is to fix a bug which throws NullPointerException when sink is log.outputstream ## How was this patch tested? Pass build and current tests. Test in Samza SQL shell. Author: Weiqing Yang <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes #808 from weiqingy/SAMZA-1999 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ebeccd20 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ebeccd20 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ebeccd20 Branch: refs/heads/master Commit: ebeccd203166aa20574a95c07dea70d5db72cd83 Parents: b25df5f Author: Weiqing Yang <[email protected]> Authored: Mon Nov 19 09:59:23 2018 -0800 Committer: Srinivasulu Punuru <[email protected]> Committed: Mon Nov 19 09:59:23 2018 -0800 ---------------------------------------------------------------------- .../org/apache/samza/sql/translator/QueryTranslator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ebeccd20/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index a826f9f..33781a6 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -193,16 +193,16 @@ public class QueryTranslator { sqlConfig.getOutputSystemStreamConfigsBySource().keySet().forEach( key -> { if (key.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) { - sendToOutputStream(streamAppDescriptor, translatorContext, node, queryId); + sendToOutputStream(key, streamAppDescriptor, translatorContext, node, queryId); } } ); } - private void sendToOutputStream(StreamApplicationDescriptor appDesc, TranslatorContext context, RelNode node, int queryId) { - SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG); + private void sendToOutputStream(String sinkStream, StreamApplicationDescriptor appDesc, TranslatorContext context, RelNode node, int queryId) { + SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sinkStream); MessageStream<SamzaSqlRelMessage> stream = context.getMessageStream(node.getId()); - MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG, queryId)); + MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(sinkStream, queryId)); Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor(); if (!tableDescriptor.isPresent()) { KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
