vvcephei commented on code in PR #14269:
URL: https://github.com/apache/kafka/pull/14269#discussion_r1300308721


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -83,9 +83,12 @@ public void init(final ProcessorContext<K1, VOut> context) {
             if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
                 throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
             }
-
-            buffer.get().setSerdesIfNull(new SerdeGetter(context));
-            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+            buffer = new RocksDBTimeOrderedKeyValueBuffer<>(
+                requireNonNull(context.getStateStore(storeName)),
+                gracePeriod.orElse(Duration.ZERO),
+                ((org.apache.kafka.streams.processor.ProcessorContext) 
context).topic(),

Review Comment:
   This cast shouldn't be necessary. Maybe we should reference the field 
instead of the argument.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##########
@@ -206,6 +291,11 @@ private void logValue(final Bytes key, final BufferKey 
bufferKey, final BufferVa
         final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
         buffer.putLong(bufferKey.time());
         final byte[] array = buffer.array();
+        this.context = 
ProcessorContextUtils.asInternalProcessorContext(wrapped().context);

Review Comment:
   I'm not sure if we should do this on every invocation (also below).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to