mjsax commented on code in PR #14027:
URL: https://github.com/apache/kafka/pull/14027#discussion_r1298793261


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java:
##########
@@ -42,7 +46,29 @@ protected KeyValue<Bytes, byte[]> getIndexKeyValue(final 
Bytes baseKey, final by
 
     @Override
     Map<KeyValueSegment, WriteBatch> getWriteBatches(final 
Collection<ConsumerRecord<byte[], byte[]>> records) {
-        throw new UnsupportedOperationException("Do not use for 
TimeOrderedKeyValueStore");
+        final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            final long timestamp = 
WindowKeySchema.extractStoreTimestamp(record.key());
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long segmentId = segments.segmentId(timestamp);
+            final KeyValueSegment segment = 
segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
+            if (segment != null) {

Review Comment:
   What does `segment == null` imply? That data is expired and we can drop it? 
(Might be worth to add a comment.) 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##########
@@ -45,17 +53,24 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends 
WrappedStateStore<Ro
     private Serde<V> valueSerde;
     private final String topic;
     private int seqnum;
+    private final boolean loggingEnabled;
+    private final Map<Bytes, BufferValue> dirtyKeys = new HashMap<>();

Review Comment:
   Why do we introduce this dirtyKeys buffer? If we buffer KStream key-value 
pairs, key must be unique, because we cannot overwrite records with the same 
key. Thus, thus Map would only grow. Why do we need to buffer anything to begin 
with? Seems we could just call `logValue` / `logTombstone` directly instead of 
waiting for `flush()`?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1271,7 +1271,7 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final 
KTable<K, VO> table,
             final String bufferStoreName = name + "-Buffer";
             final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier(bufferStoreName).get();
 
-            buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, 
joined.gracePeriod(), name));
+            buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, 
joined.gracePeriod(), name, true));

Review Comment:
   This reminds me on https://issues.apache.org/jira/browse/KAFKA-15337 -- Can 
we file a ticket to make this configurable similar to the ticket for FK-joins?
   
   Or should we piggy-back it on the KTable config for the join table, and 
disable logging if the KTable has logging disabled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to