This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 95963aa92518fdec869dd471572b28cda1b6c17c Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Fri Feb 12 16:08:21 2021 +0100 Avoid blocking connector initialization on SSH tests as it leads to failures on GH actions --- .../common/test/CamelSinkTestSupport.java | 25 ++++++++++++++++++++++ .../ssh/sink/CamelSinkSshITCase.java | 2 +- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java index b414726..ec9e9dc 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java @@ -97,6 +97,31 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest { verifyMessages(latch); } + /** + * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results + * + * @param connectorPropertyFactory A factory for connector properties + * @throws Exception For test-specific exceptions + */ + protected void runTestNonBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws Exception { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + + LOG.debug("Creating the consumer ..."); + ExecutorService service = Executors.newCachedThreadPool(); + + CountDownLatch latch = new CountDownLatch(1); + service.submit(() -> consumeMessages(latch)); + + producer.produceMessages(); + + LOG.debug("Waiting for the messages to be processed"); + service.shutdown(); + + LOG.debug("Waiting for the test to complete"); + verifyMessages(latch); + } + protected boolean waitForData() { try { diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java index 02f6f21..abfdccb 100644 --- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java @@ -94,6 +94,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport { .withUsername("root") .withPassword("root"); - runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect)); + runTestNonBlocking(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect)); } }
