Repository: samza Updated Branches: refs/heads/master e4719b448 -> 40f74308f
Stream-table join for Samza-sql: Use SamzaSQLRelMsgSerde as value serde for repartitioning the stream Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes #643 from atoomula/join-serde Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40f74308 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40f74308 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40f74308 Branch: refs/heads/master Commit: 40f74308fec10029c149d5757d6f129bc7c2088e Parents: e4719b4 Author: Aditya Toomula <[email protected]> Authored: Mon Sep 17 16:37:30 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Mon Sep 17 16:37:30 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/sql/translator/JoinTranslator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/40f74308/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java index 5aaad26..7071b39 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java @@ -45,6 +45,7 @@ import org.apache.samza.sql.data.SamzaSqlCompositeKey; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; import org.apache.samza.table.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,7 +114,8 @@ class JoinTranslator { tableFieldNames); Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class); - Serde<SamzaSqlRelMessage> valueSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class); + SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde = + (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); // Always re-partition the messages from the input stream by the composite key and then join the messages // with the table.
