Repository: samza
Updated Branches:
  refs/heads/master e4719b448 -> 40f74308f


Stream-table join for Samza-sql: Use SamzaSQLRelMsgSerde as value serde for 
repartitioning the stream

Author: Aditya Toomula <[email protected]>

Reviewers: Srinivasulu Punuru <[email protected]>

Closes #643 from atoomula/join-serde


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40f74308
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40f74308
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40f74308

Branch: refs/heads/master
Commit: 40f74308fec10029c149d5757d6f129bc7c2088e
Parents: e4719b4
Author: Aditya Toomula <[email protected]>
Authored: Mon Sep 17 16:37:30 2018 -0700
Committer: Srinivasulu Punuru <[email protected]>
Committed: Mon Sep 17 16:37:30 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/sql/translator/JoinTranslator.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/40f74308/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 5aaad26..7071b39 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -45,6 +45,7 @@ import org.apache.samza.sql.data.SamzaSqlCompositeKey;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.table.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,7 +114,8 @@ class JoinTranslator {
             tableFieldNames);
 
     Serde<SamzaSqlCompositeKey> keySerde = new 
JsonSerdeV2<>(SamzaSqlCompositeKey.class);
-    Serde<SamzaSqlRelMessage> valueSerde = new 
JsonSerdeV2<>(SamzaSqlRelMessage.class);
+    SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+        (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new 
SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
 
     // Always re-partition the messages from the input stream by the composite 
key and then join the messages
     // with the table.

Reply via email to