mjsax commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1134781624


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -48,7 +49,9 @@ final class StateManagerUtil {
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
-        return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
+        // should not prepend timestamp when restoring records for versioned 
store, as
+        // timestamp is used separately during put() process for restore of 
versioned stores
+        return (isTimestamped(store) && !isVersioned(store)) ? 
rawValueToTimestampedValue() : identity();

Review Comment:
   Nice one!



##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -334,18 +399,65 @@ private final int produceSourceData(final long timestamp,
     }
 
     /**
-     * Test-only processor for inserting records into a versioned store while 
also tracking
-     * them separately in-memory, and performing checks to validate expected 
store contents.
-     * Forwards the number of failed checks downstream for consumption.
+     * @param topic       topic to produce to
+     * @param dataTracker map of key -> timestamp -> value for tracking data 
which is produced to
+     *                    the topic. This method will add the produced data 
into this in-memory
+     *                    tracker in addition to producing to the topic, in 
order to keep the two
+     *                    in sync.
+     * @param timestamp   timestamp to produce with
+     * @param keyValues   key-value pairs to produce
+     *
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private final int produceDataToTopic(final String topic,
+                                         final DataTracker dataTracker,
+                                         final long timestamp,
+                                         final KeyValue<Integer, String>... 
keyValues) {
+        produceDataToTopic(topic, timestamp, keyValues);
+
+        for (final KeyValue<Integer, String> keyValue : keyValues) {
+            dataTracker.add(keyValue.key, timestamp, keyValue.value);
+        }
+
+        return keyValues.length;
+    }
+
+    /**
+     * Test-only processor for validating expected contents of a versioned 
store, and forwards
+     * the number of failed checks downstream for consumption. Callers specify 
whether the
+     * processor should also be responsible for inserting records into the 
store (while also
+     * tracking them separately in-memory for use in validation).
      */
     private static class VersionedStoreContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
 
         private ProcessorContext<Integer, Integer> context;
         private VersionedKeyValueStore<Integer, String> store;
 
+        // whether or not the processor should write records to the store as 
they arrive.
+        // must be false for global stores.

Review Comment:
   Why must it be false for global stores?



##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -302,7 +319,54 @@ public void 
shouldAllowCustomIQv2ForCustomStoreImplementations() {
                 .withPartitions(Collections.singleton(0));
         final StateQueryResult<String> result =
             IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
-        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+        assertThat(result.getOnlyPartitionResult().getResult(), 
equalTo("success"));
+    }
+
+    @Test
+    public void shouldCreateGlobalTable() throws Exception {
+        // produce data to global store topic and track in-memory for 
processor to verify
+        final DataTracker data = new DataTracker();
+        produceDataToTopic(globalTableTopic, data, baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .globalTable(
+                globalTableTopic,
+                Consumed.with(Serdes.Integer(), Serdes.String()),
+                Materialized
+                    .<Integer, String>as(new 
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+                    .withKeySerde(Serdes.Integer())
+                    .withValueSerde(Serdes.String())
+            );
+        streamsBuilder
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new VersionedStoreContentCheckerProcessor(false, 
data))
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data to trigger store verifications in processor
+        int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp 
+ 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(receivedRecord.value, equalTo(0));

Review Comment:
   Seems we only verify if the regular `stream().process().to()` did not 
produce any errors. But we don't verify the global store at all?
   
   In your original comment, you say we cannot get the data from global-ktable 
because we cannot inject a `Processor` -- well, we could use `addGlobalStore` 
instead of `globalTable` to add a `Processor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to