This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f64f6d  KAFKA-10321: fix infinite blocking for global stream thread 
startup (#9095)
2f64f6d is described below

commit 2f64f6deb906cdfe4d006e530edeff2e79c05f76
Author: Boyang Chen <[email protected]>
AuthorDate: Wed Jul 29 21:04:21 2020 -0700

    KAFKA-10321: fix infinite blocking for global stream thread startup (#9095)
    
    The start() function for global stream thread only checks whether the 
thread is not running, as it needs to block until it finishes the 
initialization. This PR fixes this behavior by adding a check whether the 
thread is already in error state as well.
    
    Reviewers: Guozhang Wang <[email protected]>, John Roesler 
<[email protected]>
---
 .../processor/internals/GlobalStreamThread.java    | 26 ++++++++++++++++++++--
 .../internals/GlobalStreamThreadTest.java          | 23 ++++++++++++-------
 2 files changed, 39 insertions(+), 10 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 14d1ef8..236940ca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -42,8 +42,10 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.CREATED;
 import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
+import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
 
 /**
  * This is the thread responsible for keeping all Global State Stores updated.
@@ -106,6 +108,10 @@ public class GlobalStreamThread extends Thread {
             return equals(RUNNING);
         }
 
+        public boolean inErrorState() {
+            return equals(DEAD) || equals(PENDING_SHUTDOWN);
+        }
+
         @Override
         public boolean isValidTransition(final ThreadStateTransitionValidator 
newState) {
             final State tmpState = (State) newState;
@@ -173,6 +179,18 @@ public class GlobalStreamThread extends Thread {
         }
     }
 
+    public boolean inErrorState() {
+        synchronized (stateLock) {
+            return state.inErrorState();
+        }
+    }
+
+    public boolean stillInitializing() {
+        synchronized (stateLock) {
+            return state.equals(CREATED);
+        }
+    }
+
     public GlobalStreamThread(final ProcessorTopology topology,
                               final StreamsConfig config,
                               final Consumer<byte[], byte[]> globalConsumer,
@@ -276,7 +294,7 @@ public class GlobalStreamThread extends Thread {
 
             return;
         }
-        setState(State.RUNNING);
+        setState(RUNNING);
 
         boolean wipeStateStore = false;
         try {
@@ -384,12 +402,16 @@ public class GlobalStreamThread extends Thread {
     @Override
     public synchronized void start() {
         super.start();
-        while (!stillRunning()) {
+        while (stillInitializing()) {
             Utils.sleep(1);
             if (startupException != null) {
                 throw startupException;
             }
         }
+
+        if (inErrorState()) {
+            throw new IllegalStateException("Initialization for the global 
stream thread failed");
+        }
     }
 
     public void shutdown() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 15bae37..1e31357 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -163,14 +163,14 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldBeRunningAfterSuccessfulStart() {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         assertTrue(globalStreamThread.stillRunning());
     }
 
     @Test(timeout = 30000)
     public void shouldStopRunningWhenClosedByUser() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         globalStreamThread.shutdown();
         globalStreamThread.join();
         assertEquals(GlobalStreamThread.State.DEAD, 
globalStreamThread.state());
@@ -179,7 +179,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldCloseStateStoresOnClose() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         final StateStore globalStore = 
builder.globalStateStores().get(GLOBAL_STORE_NAME);
         assertTrue(globalStore.isOpen());
         globalStreamThread.shutdown();
@@ -190,7 +190,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldTransitionToDeadOnClose() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         globalStreamThread.shutdown();
         globalStreamThread.join();
 
@@ -200,7 +200,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldStayDeadAfterTwoCloses() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         globalStreamThread.shutdown();
         globalStreamThread.join();
         globalStreamThread.shutdown();
@@ -211,7 +211,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldTransitionToRunningOnStart() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
 
         TestUtils.waitForCondition(
             () -> globalStreamThread.state() == RUNNING,
@@ -231,7 +231,7 @@ public class GlobalStreamThreadTest {
             }
         });
 
-        globalStreamThread.start();
+        startAndSwallowError();
 
         TestUtils.waitForCondition(
             () -> globalStreamThread.state() == DEAD,
@@ -245,7 +245,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldDieOnInvalidOffsetExceptionWhileRunning() throws 
Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
 
         TestUtils.waitForCondition(
             () -> globalStreamThread.state() == RUNNING,
@@ -289,4 +289,11 @@ public class GlobalStreamThreadTest {
         mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
0L));
         mockConsumer.assign(Collections.singleton(topicPartition));
     }
+
+    private void startAndSwallowError() {
+        try {
+            globalStreamThread.start();
+        } catch (final IllegalStateException ignored) {
+        }
+    }
 }

Reply via email to