This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 09df84fe919a3e87ffad97b2be55fcbda5875150
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Feb 12 16:05:44 2021 +0100

    Cleanup the check state logic on the KafkaConnectEmbeddedService
---
 .../common/services/kafkaconnect/KafkaConnectEmbedded.java    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
index bc6b868..52af0a5 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
@@ -64,6 +64,12 @@ public class KafkaConnectEmbedded implements 
KafkaConnectService {
         LOG.trace("Added the new connector");
     }
 
+    private boolean doCheckState(ConnectorStateInfo connectorStateInfo, 
Integer expectedTaskNumber) {
+        return connectorStateInfo.tasks().size() >= expectedTaskNumber
+                && 
connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                && connectorStateInfo.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
+    }
+
     @Override
     public void initializeConnectorBlocking(ConnectorPropertyFactory 
propertyFactory, Integer expectedTaskNumber) throws InterruptedException {
         initializeConnector(propertyFactory);
@@ -73,9 +79,8 @@ public class KafkaConnectEmbedded implements 
KafkaConnectService {
                 connectorStateInfo = cluster.connectorStatus(connectorName);
                 Thread.sleep(20L);
             } while (connectorStateInfo == null);
-            return  connectorStateInfo.tasks().size() >= expectedTaskNumber
-                    && 
connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString())
-                    && connectorStateInfo.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
+
+            return doCheckState(connectorStateInfo, expectedTaskNumber);
         }, 30000L, "The connector " + connectorName + " did not start within a 
reasonable time");
     }
 

Reply via email to