lucasbru commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246586612
##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -113,6 +113,7 @@ public void before(final TestInfo testInfo) {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
100L);
+
streamsConfiguration.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED,
false);
Review Comment:
doesn't the integration test work with state updater? A comment would be
good here at least
##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -208,6 +208,8 @@ private Properties configProps(final String appId, final
String host) {
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10
* 1000);
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+
streamsConfiguration.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED,
false);
Review Comment:
Same here - why do we disable the state updater here?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1123,7 +1138,10 @@ public void
shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl
public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws
InterruptedException {
internalTopologyBuilder.addSource(null, "source1", null, null, null,
topic1);
- final StreamThread thread = createStreamThread(CLIENT_ID, new
StreamsConfig(configProps(true)), true);
+ final Properties props = configProps(true);
+ props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
Review Comment:
same here. if possible, a comment why this test is invalid for state updater
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3038,6 +3149,7 @@ public void
shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
@Test
public void
shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() {
final Properties streamsConfigProps =
StreamsTestUtils.getStreamsConfig();
+ streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false);
Review Comment:
In the corresponding SU test, can we remove the `streamsConfigProps.put`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -201,7 +201,7 @@ public boolean commitNeeded() {
@Override
public StateStore getStore(final String name) {
- throw new UnsupportedOperationException("This task is read-only");
+ return task.getStore(name);
Review Comment:
Would be nice to expose a read only state store here (hiding init, flush,
close and the like), but that's probably for a differenet PR
##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C",
400L), pair("C", -50L));
private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
- asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of
count operation with caching
+ asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L),
pair("C", 2L));
Review Comment:
why did the output data change?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1600,7 +1617,21 @@ List<Task> activeTaskIterable() {
return activeTaskStream().collect(Collectors.toList());
}
+ List<Task> activeRunningTaskIterable() {
Review Comment:
What does "Running" mean? Is this existing terminology or are you
introducing new terms here? What is the difference to "Processing" tasks, and
didn't we also have "Owned" tasks somewhere?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata
topologyMetadata,
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes,
streamsMetrics);
- final boolean stateUpdaterEnabled =
- InternalConfig.getBoolean(config.originals(),
InternalConfig.STATE_UPDATER_ENABLED, false);
+ final boolean stateUpdaterEnabled = InternalConfig.getBoolean(
+ config.originals(),
+ InternalConfig.STATE_UPDATER_ENABLED,
+ InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+ );
Review Comment:
could make sense to make a little
`InternalConfig.getStateUpdateEnabled(config)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]