wcarlson5 commented on code in PR #14027:
URL: https://github.com/apache/kafka/pull/14027#discussion_r1298818523


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1271,7 +1271,7 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final 
KTable<K, VO> table,
             final String bufferStoreName = name + "-Buffer";
             final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier(bufferStoreName).get();
 
-            buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, 
joined.gracePeriod(), name));
+            buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, 
joined.gracePeriod(), name, true));

Review Comment:
   Yeah I was thinking about this issue, I can file a ticket 
https://issues.apache.org/jira/browse/KAFKA-15379



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##########
@@ -45,17 +53,24 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends 
WrappedStateStore<Ro
     private Serde<V> valueSerde;
     private final String topic;
     private int seqnum;
+    private final boolean loggingEnabled;
+    private final Map<Bytes, BufferValue> dirtyKeys = new HashMap<>();

Review Comment:
   Yeah we can do that. I was just going based on the InMemoryBuffer, but that 
doesn't apply here I guess. I was worried about too many commits at once



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##########
@@ -45,17 +53,24 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends 
WrappedStateStore<Ro
     private Serde<V> valueSerde;
     private final String topic;
     private int seqnum;
+    private final boolean loggingEnabled;
+    private final Map<Bytes, BufferValue> dirtyKeys = new HashMap<>();
+    private int partition;
+    private String changelogTopic;
+    private InternalProcessorContext context;
 
     public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueBytesStore store,
                                             final Duration gracePeriod,
-                                            final String topic) {
+                                            final String topic,
+                                            final boolean loggingEnabled) {
         super(store);
         this.gracePeriod = gracePeriod.toMillis();
         minTimestamp = Long.MAX_VALUE;
         numRecords = 0;
         bufferSize = 0;
         seqnum = 0;
         this.topic = topic;
+        this.loggingEnabled = true;

Review Comment:
   Ah good catch



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java:
##########
@@ -42,7 +46,29 @@ protected KeyValue<Bytes, byte[]> getIndexKeyValue(final 
Bytes baseKey, final by
 
     @Override
     Map<KeyValueSegment, WriteBatch> getWriteBatches(final 
Collection<ConsumerRecord<byte[], byte[]>> records) {
-        throw new UnsupportedOperationException("Do not use for 
TimeOrderedKeyValueStore");
+        final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            final long timestamp = 
WindowKeySchema.extractStoreTimestamp(record.key());
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long segmentId = segments.segmentId(timestamp);
+            final KeyValueSegment segment = 
segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
+            if (segment != null) {

Review Comment:
   sure I can add a comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to