mjsax commented on code in PR #14027: URL: https://github.com/apache/kafka/pull/14027#discussion_r1298793261
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java: ########## @@ -42,7 +46,29 @@ protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final by @Override Map<KeyValueSegment, WriteBatch> getWriteBatches(final Collection<ConsumerRecord<byte[], byte[]>> records) { - throw new UnsupportedOperationException("Do not use for TimeOrderedKeyValueStore"); + final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>(); + for (final ConsumerRecord<byte[], byte[]> record : records) { + final long timestamp = WindowKeySchema.extractStoreTimestamp(record.key()); + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long segmentId = segments.segmentId(timestamp); + final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); + if (segment != null) { Review Comment: What does `segment == null` imply? That data is expired and we can drop it? (Might be worth to add a comment.) ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ########## @@ -45,17 +53,24 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends WrappedStateStore<Ro private Serde<V> valueSerde; private final String topic; private int seqnum; + private final boolean loggingEnabled; + private final Map<Bytes, BufferValue> dirtyKeys = new HashMap<>(); Review Comment: Why do we introduce this dirtyKeys buffer? If we buffer KStream key-value pairs, key must be unique, because we cannot overwrite records with the same key. Thus, thus Map would only grow. Why do we need to buffer anything to begin with? Seems we could just call `logValue` / `logTombstone` directly instead of waiting for `flush()`? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -1271,7 +1271,7 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table, final String bufferStoreName = name + "-Buffer"; final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier(bufferStoreName).get(); - buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, joined.gracePeriod(), name)); + buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, joined.gracePeriod(), name, true)); Review Comment: This reminds me on https://issues.apache.org/jira/browse/KAFKA-15337 -- Can we file a ticket to make this configurable similar to the ticket for FK-joins? Or should we piggy-back it on the KTable config for the join table, and disable logging if the KTable has logging disabled? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org