This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 39ffdea6d32 KAFKA-10199: Enable state updater by default (#16107)
39ffdea6d32 is described below

commit 39ffdea6d321ef3dd5e787aef1b1102c33448c0f
Author: Bruno Cadonna <[email protected]>
AuthorDate: Wed Jun 12 07:51:38 2024 +0200

    KAFKA-10199: Enable state updater by default (#16107)
    
    We have already enabled the state updater by default once.
    However, we ran into issues that forced us to disable it again.
    We think that we fixed those issues. So we want to enable the
    state updater again by default.
    
    Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java   | 2 +-
 .../kafka/streams/processor/internals/StoreChangelogReaderTest.java | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index bfea3d43680..e77e4ca795d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1273,7 +1273,7 @@ public class StreamsConfig extends AbstractConfig {
         public static final String STATE_UPDATER_ENABLED = 
"__state.updater.enabled__";
 
         public static boolean getStateUpdaterEnabled(final Map<String, Object> 
configs) {
-            return InternalConfig.getBoolean(configs, 
InternalConfig.STATE_UPDATER_ENABLED, false);
+            return InternalConfig.getBoolean(configs, 
InternalConfig.STATE_UPDATER_ENABLED, true);
         }
         
         // Private API to enable processing threads (i.e. polling is decoupled 
from processing)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 763394611b9..457508cd20e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -468,10 +468,10 @@ public class StoreChangelogReaderTest {
             
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
         } else {
             if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
-                    || !((boolean) 
properties.get(InternalConfig.STATE_UPDATER_ENABLED))) {
-                assertEquals(Duration.ZERO, consumer.lastPollTimeout());
-            } else {
+                    || (boolean) 
properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
                 
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
+            } else {
+                assertEquals(Duration.ZERO, consumer.lastPollTimeout());
             }
         }
     }

Reply via email to