This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 09df84fe919a3e87ffad97b2be55fcbda5875150 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Fri Feb 12 16:05:44 2021 +0100 Cleanup the check state logic on the KafkaConnectEmbeddedService --- .../common/services/kafkaconnect/KafkaConnectEmbedded.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java index bc6b868..52af0a5 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java @@ -64,6 +64,12 @@ public class KafkaConnectEmbedded implements KafkaConnectService { LOG.trace("Added the new connector"); } + private boolean doCheckState(ConnectorStateInfo connectorStateInfo, Integer expectedTaskNumber) { + return connectorStateInfo.tasks().size() >= expectedTaskNumber + && connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString()) + && connectorStateInfo.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); + } + @Override public void initializeConnectorBlocking(ConnectorPropertyFactory propertyFactory, Integer expectedTaskNumber) throws InterruptedException { initializeConnector(propertyFactory); @@ -73,9 +79,8 @@ public class KafkaConnectEmbedded implements KafkaConnectService { connectorStateInfo = cluster.connectorStatus(connectorName); Thread.sleep(20L); } while (connectorStateInfo == null); - return connectorStateInfo.tasks().size() >= expectedTaskNumber - && connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString()) - && connectorStateInfo.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); + + return doCheckState(connectorStateInfo, expectedTaskNumber); }, 30000L, "The connector " + connectorName + " did not start within a reasonable time"); }
