mjsax commented on code in PR #14157:
URL: https://github.com/apache/kafka/pull/14157#discussion_r1285341728


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java:
##########
@@ -44,72 +45,98 @@
  * @param <VO> Type of foreign values
  * @param <VR> Type of joined result of primary and foreign values
  */
-public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements 
ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> {
+public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements 
ProcessorSupplier<Bytes, SubscriptionResponseWrapper<byte[]>, K, VR> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class);
-    private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
-    private final Serializer<V> constructionTimeValueSerializer;
+    private final KTableValueGetterSupplier<Bytes, byte[]> 
rawValueGetterSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> leftValueDeserializer;
+    private final Deserializer<VO> rightValueDeserializer;
     private final Supplier<String> valueHashSerdePseudoTopicSupplier;
     private final ValueJoiner<V, VO, VR> joiner;
     private final boolean leftJoin;
 
-    public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> 
valueGetterSupplier,
-                                         final Serializer<V> valueSerializer,
+    public ResponseJoinProcessorSupplier(final 
KTableValueGetterSupplier<Bytes, byte[]> rawValueGetterSupplier,
+                                         final Deserializer<K> keyDeserializer,
+                                         final Deserializer<V> 
leftValueDeserializer,
+                                         final Deserializer<VO> 
rightValueDeserializer,
                                          final Supplier<String> 
valueHashSerdePseudoTopicSupplier,
                                          final ValueJoiner<V, VO, VR> joiner,
                                          final boolean leftJoin) {
-        this.valueGetterSupplier = valueGetterSupplier;
-        constructionTimeValueSerializer = valueSerializer;
+        this.rawValueGetterSupplier = rawValueGetterSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.leftValueDeserializer = leftValueDeserializer;
+        this.rightValueDeserializer = rightValueDeserializer;
         this.valueHashSerdePseudoTopicSupplier = 
valueHashSerdePseudoTopicSupplier;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
     }
 
     @Override
-    public Processor<K, SubscriptionResponseWrapper<VO>, K, VR> get() {
-        return new ContextualProcessor<K, SubscriptionResponseWrapper<VO>, K, 
VR>() {
+    public Processor<Bytes, SubscriptionResponseWrapper<byte[]>, K, VR> get() {
+        return new ContextualProcessor<Bytes, 
SubscriptionResponseWrapper<byte[]>, K, VR>() {
             private String valueHashSerdePseudoTopic;
-            private Serializer<V> runtimeValueSerializer = 
constructionTimeValueSerializer;
-
-            private KTableValueGetter<K, V> valueGetter;
+            private Deserializer<K> keyDeserializer = 
ResponseJoinProcessorSupplier.this.keyDeserializer;
+            private Deserializer<V> leftValueDeserializer = 
ResponseJoinProcessorSupplier.this.leftValueDeserializer;
+            private Deserializer<VO> rightValueDeserializer = 
ResponseJoinProcessorSupplier.this.rightValueDeserializer;
+            private KTableValueGetter<Bytes, byte[]> rawValueGetter;
 
             @SuppressWarnings("unchecked")
             @Override
             public void init(final ProcessorContext<K, VR> context) {
                 super.init(context);
                 valueHashSerdePseudoTopic = 
valueHashSerdePseudoTopicSupplier.get();
-                valueGetter = valueGetterSupplier.get();
-                valueGetter.init(context);
-                if (runtimeValueSerializer == null) {
-                    runtimeValueSerializer = (Serializer<V>) 
context.valueSerde().serializer();
+                rawValueGetter = rawValueGetterSupplier.get();
+                rawValueGetter.init(context);
+                if (keyDeserializer == null) {
+                    keyDeserializer = (Deserializer<K>) 
context.keySerde().deserializer();
+                }
+                if (leftValueDeserializer == null) {
+                    leftValueDeserializer = (Deserializer<V>) 
context.valueSerde().deserializer();
+                }
+                if (rightValueDeserializer == null) {
+                    rightValueDeserializer = (Deserializer<VO>) 
context.valueSerde().deserializer();
                 }
             }
 
             @Override
-            public void process(final Record<K, 
SubscriptionResponseWrapper<VO>> record) {
+            public void process(final Record<Bytes, 
SubscriptionResponseWrapper<byte[]>> record) {
                 if (record.value().getVersion() != 
SubscriptionResponseWrapper.CURRENT_VERSION) {
                     //Guard against modifications to 
SubscriptionResponseWrapper. Need to ensure that there is
                     //compatibility with previous versions to enable rolling 
upgrades. Must develop a strategy for
                     //upgrading from older SubscriptionWrapper versions to 
newer versions.
                     throw new 
UnsupportedVersionException("SubscriptionResponseWrapper is of an incompatible 
version.");
                 }
-                final ValueAndTimestamp<V> currentValueWithTimestamp = 
valueGetter.get(record.key());
+                final ValueAndTimestamp<byte[]> currentValueWithTimestamp = 
rawValueGetter.get(record.key());
 
                 final long[] currentHash = currentValueWithTimestamp == null ?
                     null :
-                    
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, 
currentValueWithTimestamp.value()));
+                    
//Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, 
currentValueWithTimestamp.value()));
+                    Murmur3.hash128(currentValueWithTimestamp.value());

Review Comment:
   This is the actual fix: we don't re-serialize the value to re-compute the 
hash.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to