mjsax commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1142848103
########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -302,7 +319,54 @@ public void shouldAllowCustomIQv2ForCustomStoreImplementations() { .withPartitions(Collections.singleton(0)); final StateQueryResult<String> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); - assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); + assertThat(result.getOnlyPartitionResult().getResult(), equalTo("success")); + } + + @Test + public void shouldCreateGlobalTable() throws Exception { + // produce data to global store topic and track in-memory for processor to verify + final DataTracker data = new DataTracker(); + produceDataToTopic(globalTableTopic, data, baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .globalTable( + globalTableTopic, + Consumed.with(Serdes.Integer(), Serdes.String()), + Materialized + .<Integer, String>as(new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION)) + .withKeySerde(Serdes.Integer()) + .withValueSerde(Serdes.String()) + ); + streamsBuilder + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new VersionedStoreContentCheckerProcessor(false, data)) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data to trigger store verifications in processor + int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp + 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(receivedRecord.value, equalTo(0)); Review Comment: I was referring to this comment: https://github.com/apache/kafka/pull/13340#discussion_r1128550162 > and we will not be able to write to a global store from the processor If you specify a global-store, we pass in the "global processor" that is able to write into the store (well, has to do this, to maintain the global store), and thus, we can easily track what goes into the store "on the side" is an in-memory data structure similar to what we do for a regular processor that maintains the store. > This test already has a processor which inspects/validates the contents of the global store. Have I misunderstood? I think I did not understand how the test works -- not I see that you use a regular processor to read the global state store to verify the content. So I guess my comment is void (I did basically propose to add this via a "global processor"). ########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -302,7 +319,54 @@ public void shouldAllowCustomIQv2ForCustomStoreImplementations() { .withPartitions(Collections.singleton(0)); final StateQueryResult<String> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); - assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); + assertThat(result.getOnlyPartitionResult().getResult(), equalTo("success")); + } + + @Test + public void shouldCreateGlobalTable() throws Exception { + // produce data to global store topic and track in-memory for processor to verify + final DataTracker data = new DataTracker(); + produceDataToTopic(globalTableTopic, data, baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .globalTable( + globalTableTopic, + Consumed.with(Serdes.Integer(), Serdes.String()), + Materialized + .<Integer, String>as(new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION)) + .withKeySerde(Serdes.Integer()) + .withValueSerde(Serdes.String()) + ); + streamsBuilder + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new VersionedStoreContentCheckerProcessor(false, data)) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data to trigger store verifications in processor + int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp + 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(receivedRecord.value, equalTo(0)); Review Comment: I was referring to this comment: https://github.com/apache/kafka/pull/13340#discussion_r1128550162 > and we will not be able to write to a global store from the processor If you specify a global-store, we pass in the "global processor" that is able to write into the store (well, has to do this, to maintain the global store), and thus, we can easily track what goes into the store "on the side" is an in-memory data structure similar to what we do for a regular processor that maintains the store. > This test already has a processor which inspects/validates the contents of the global store. Have I misunderstood? I think I did not understand how the test works -- not I see that you use a regular processor to read the global state store to verify the content. So I guess my comment is void (I did basically propose to add this via a "global processor"). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org