wcarlson5 commented on code in PR #14269:
URL: https://github.com/apache/kafka/pull/14269#discussion_r1300411569


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -83,9 +83,12 @@ public void init(final ProcessorContext<K1, VOut> context) {
             if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
                 throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
             }
-
-            buffer.get().setSerdesIfNull(new SerdeGetter(context));
-            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+            buffer = new RocksDBTimeOrderedKeyValueBuffer<>(
+                requireNonNull(context.getStateStore(storeName)),
+                gracePeriod.orElse(Duration.ZERO),
+                ((org.apache.kafka.streams.processor.ProcessorContext) 
context).topic(),

Review Comment:
   ah good idea



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##########
@@ -206,6 +291,11 @@ private void logValue(final Bytes key, final BufferKey 
bufferKey, final BufferVa
         final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
         buffer.putLong(bufferKey.time());
         final byte[] array = buffer.array();
+        this.context = 
ProcessorContextUtils.asInternalProcessorContext(wrapped().context);

Review Comment:
   Okay, so the reason it is like this is because `getStateStore` in the 
processor only gives the store rather than the buffer. So the init call isn't 
keeping the changes here. I can move the field updates to the set serde update. 
Maybe that is better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to