wcarlson5 commented on code in PR #14027: URL: https://github.com/apache/kafka/pull/14027#discussion_r1298818523
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -1271,7 +1271,7 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table, final String bufferStoreName = name + "-Buffer"; final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier(bufferStoreName).get(); - buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, joined.gracePeriod(), name)); + buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, joined.gracePeriod(), name, true)); Review Comment: Yeah I was thinking about this issue, I can file a ticket https://issues.apache.org/jira/browse/KAFKA-15379 ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ########## @@ -45,17 +53,24 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends WrappedStateStore<Ro private Serde<V> valueSerde; private final String topic; private int seqnum; + private final boolean loggingEnabled; + private final Map<Bytes, BufferValue> dirtyKeys = new HashMap<>(); Review Comment: Yeah we can do that. I was just going based on the InMemoryBuffer, but that doesn't apply here I guess. I was worried about too many commits at once ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ########## @@ -45,17 +53,24 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends WrappedStateStore<Ro private Serde<V> valueSerde; private final String topic; private int seqnum; + private final boolean loggingEnabled; + private final Map<Bytes, BufferValue> dirtyKeys = new HashMap<>(); + private int partition; + private String changelogTopic; + private InternalProcessorContext context; public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, final Duration gracePeriod, - final String topic) { + final String topic, + final boolean loggingEnabled) { super(store); this.gracePeriod = gracePeriod.toMillis(); minTimestamp = Long.MAX_VALUE; numRecords = 0; bufferSize = 0; seqnum = 0; this.topic = topic; + this.loggingEnabled = true; Review Comment: Ah good catch ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStore.java: ########## @@ -42,7 +46,29 @@ protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final by @Override Map<KeyValueSegment, WriteBatch> getWriteBatches(final Collection<ConsumerRecord<byte[], byte[]>> records) { - throw new UnsupportedOperationException("Do not use for TimeOrderedKeyValueStore"); + final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>(); + for (final ConsumerRecord<byte[], byte[]> record : records) { + final long timestamp = WindowKeySchema.extractStoreTimestamp(record.key()); + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long segmentId = segments.segmentId(timestamp); + final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); + if (segment != null) { Review Comment: sure I can add a comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org