This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 267ab143527b1969a8acc4b6204fa27b355827f5 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Wed Mar 10 15:18:24 2021 +0100 Adjusted the code to CAMEL-16299 --- .../blob/sink/CamelSinkAzureStorageBlobITCase.java | 2 +- .../sink/CamelSinkAzureStorageQueueITCase.java | 2 +- .../source/CamelSourceAzureStorageQueueITCase.java | 2 +- .../kafkaconnector/common/AbstractKafkaTest.java | 15 ++- .../common/services/kafka/KafkaServiceFactory.java | 45 --------- .../common/services/kafka/StrimziService.java | 103 --------------------- .../kafkaconnector/sjms2/common/SJMS2Common.java | 9 ++ .../sjms2/sink/CamelSinkIdempotentJMSITCase.java | 9 +- .../sjms2/sink/CamelSinkJMSITCase.java | 9 +- .../sjms2/source/CamelSourceJMSITCase.java | 9 +- .../source/CamelSourceJMSWithAggregation.java | 9 +- 11 files changed, 41 insertions(+), 173 deletions(-) diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java index 727d3fb..e7cb8e5 100644 --- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java +++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java @@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport { @RegisterExtension - public static AzureService service = AzureStorageBlobServiceFactory.createAzureService(); + public static AzureService service = AzureStorageBlobServiceFactory.createService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageBlobITCase.class); private BlobServiceClient client; diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java index ef12c18..78a297c 100644 --- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java +++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java @@ -46,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkAzureStorageQueueITCase extends CamelSinkTestSupport { @RegisterExtension - public static AzureService service = AzureStorageQueueServiceFactory.createAzureService(); + public static AzureService service = AzureStorageQueueServiceFactory.createService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageQueueITCase.class); diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java index 26faa6e..da640da 100644 --- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java +++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java @@ -45,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSourceAzureStorageQueueITCase extends CamelSourceTestSupport { @RegisterExtension - public static AzureService service = AzureStorageQueueServiceFactory.createAzureService(); + public static AzureService service = AzureStorageQueueServiceFactory.createService(); private QueueServiceClient client; private QueueClient queueClient; diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java index fb24ca9..ee332a8 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java @@ -17,12 +17,16 @@ package org.apache.camel.kafkaconnector.common; -import org.apache.camel.kafkaconnector.common.services.kafka.KafkaServiceFactory; +import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService; import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory; import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService; import org.apache.camel.kafkaconnector.common.utils.PropertyUtils; import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.kafka.services.ContainerLocalKafkaService; import org.apache.camel.test.infra.kafka.services.KafkaService; +import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory; +import org.apache.camel.test.infra.kafka.services.RemoteKafkaService; +import org.apache.camel.test.infra.kafka.services.StrimziService; import org.junit.jupiter.api.extension.RegisterExtension; public abstract class AbstractKafkaTest { @@ -39,7 +43,14 @@ public abstract class AbstractKafkaTest { public AbstractKafkaTest() { PluginPathHelper.getInstance().registerConnector(getConnectorsInTest()); - kafkaService = KafkaServiceFactory.createService(); + kafkaService = KafkaServiceFactory + .builder() + .addLocalMapping(EmbeddedKafkaService::new) + .addRemoteMapping(RemoteKafkaService::new) + .addMapping("embedded", EmbeddedKafkaService::new) + .addMapping("local-strimzi-container", StrimziService::new) + .addMapping("local-cp-kafka-container", ContainerLocalKafkaService::new) + .build(); kafkaService.initialize(); diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java deleted file mode 100644 index 052ba9c..0000000 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/KafkaServiceFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.common.services.kafka; - -import org.apache.camel.test.infra.kafka.services.KafkaService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class KafkaServiceFactory { - private static final Logger LOG = LoggerFactory.getLogger(KafkaServiceFactory.class); - - private KafkaServiceFactory() { - - } - - public static KafkaService createService() { - String kafkaInstanceType = System.getProperty("kafka.instance.type"); - - if (kafkaInstanceType == null || kafkaInstanceType.equals("embedded")) { - return new EmbeddedKafkaService(); - } - - if (kafkaInstanceType.equals("local-strimzi-container")) { - return new StrimziService(); - } - - return org.apache.camel.test.infra.kafka.services.KafkaServiceFactory.createService(); - - } -} diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java deleted file mode 100644 index 6f3b03e..0000000 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafka/StrimziService.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.common.services.kafka; - -import org.apache.camel.test.infra.common.TestUtils; -import org.apache.camel.test.infra.common.services.ContainerService; -import org.apache.camel.test.infra.kafka.common.KafkaProperties; -import org.apache.camel.test.infra.kafka.services.KafkaService; -import org.apache.camel.test.infra.kafka.services.StrimziContainer; -import org.apache.camel.test.infra.kafka.services.ZookeeperContainer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.Network; - -class StrimziService implements KafkaService, ContainerService<StrimziContainer> { - private static final Logger LOG = LoggerFactory.getLogger(StrimziService.class); - - private final ZookeeperContainer zookeeperContainer; - private final StrimziContainer strimziContainer; - - public StrimziService() { - Network network = Network.newNetwork(); - - String zookeeperInstanceName = "zookeeper-" + TestUtils.randomWithRange(1, 100); - zookeeperContainer = new ZookeeperContainer(network, zookeeperInstanceName); - - String strimziInstanceName = "strimzi-" + TestUtils.randomWithRange(1, 100); - strimziContainer = new StrimziContainer(network, strimziInstanceName, zookeeperInstanceName); - } - - public StrimziService(ZookeeperContainer zookeeperContainer, StrimziContainer strimziContainer) { - this.zookeeperContainer = zookeeperContainer; - this.strimziContainer = strimziContainer; - } - - protected Integer getKafkaPort() { - return strimziContainer.getKafkaPort(); - } - - @Override - public String getBootstrapServers() { - return strimziContainer.getContainerIpAddress() + ":" + getKafkaPort(); - } - - @Override - public void registerProperties() { - System.setProperty(KafkaProperties.KAFKA_BOOTSTRAP_SERVERS, getBootstrapServers()); - } - - @Override - public void initialize() { - zookeeperContainer.start(); - - String zookeeperConnect = zookeeperContainer.getContainerIpAddress() + ":" + zookeeperContainer.getZookeeperPort(); - LOG.info("Apache Zookeeper running at address {}", zookeeperConnect); - - strimziContainer.start(); - - registerProperties(); - LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers()); - } - - private boolean stopped() { - return !strimziContainer.isRunning() && !zookeeperContainer.isRunning(); - } - - @Override - public void shutdown() { - try { - LOG.info("Stopping Kafka container"); - strimziContainer.stop(); - } finally { - LOG.info("Stopping Zookeeper container"); - zookeeperContainer.stop(); - - TestUtils.waitFor(this::stopped); - } - } - - @Override - public StrimziContainer getContainer() { - return strimziContainer; - } - - protected ZookeeperContainer getZookeeperContainer() { - return zookeeperContainer; - } -} diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java index 24e9f9a..44e6a94 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/common/SJMS2Common.java @@ -17,6 +17,9 @@ package org.apache.camel.kafkaconnector.sjms2.common; +import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer; +import org.apache.camel.test.infra.messaging.services.MessagingLocalContainerService; + public final class SJMS2Common { /** * The default JMS queue name used during the tests @@ -26,4 +29,10 @@ public final class SJMS2Common { private SJMS2Common() { } + + public static MessagingLocalContainerService<DispatchRouterContainer> createLocalService() { + DispatchRouterContainer container = new DispatchRouterContainer(); + + return new MessagingLocalContainerService<>(container, c -> container.defaultEndpoint()); + } } diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java index 0b8bb52..d0273dc 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java @@ -35,9 +35,8 @@ import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; -import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer; import org.apache.camel.test.infra.messaging.services.MessagingService; -import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder; +import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -55,9 +54,9 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkIdempotentJMSITCase extends CamelSinkTestSupport { @RegisterExtension - public static MessagingService jmsService = MessagingServiceBuilder - .newBuilder(DispatchRouterContainer::new) - .withEndpointProvider(DispatchRouterContainer::defaultEndpoint) + public static MessagingService jmsService = MessagingServiceFactory + .builder() + .addLocalMapping(SJMS2Common::createLocalService) .build(); private static final Logger LOG = LoggerFactory.getLogger(CamelSinkIdempotentJMSITCase.class); diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java index 50dabe6..b56c580 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java @@ -30,9 +30,8 @@ import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; -import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer; import org.apache.camel.test.infra.messaging.services.MessagingService; -import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder; +import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -50,9 +49,9 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkJMSITCase extends CamelSinkTestSupport { @RegisterExtension - public static MessagingService jmsService = MessagingServiceBuilder - .newBuilder(DispatchRouterContainer::new) - .withEndpointProvider(DispatchRouterContainer::defaultEndpoint) + public static MessagingService jmsService = MessagingServiceFactory + .builder() + .addLocalMapping(SJMS2Common::createLocalService) .build(); private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSITCase.class); diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java index 781e029..ffbee12 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java @@ -27,9 +27,8 @@ import org.apache.camel.kafkaconnector.common.test.IntegerMessageConsumer; import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; -import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer; import org.apache.camel.test.infra.messaging.services.MessagingService; -import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder; +import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -47,9 +46,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSourceJMSITCase extends CamelSourceTestSupport { @RegisterExtension - public static MessagingService jmsService = MessagingServiceBuilder - .newBuilder(DispatchRouterContainer::new) - .withEndpointProvider(DispatchRouterContainer::defaultEndpoint) + public static MessagingService jmsService = MessagingServiceFactory + .builder() + .addLocalMapping(SJMS2Common::createLocalService) .build(); private String topicName; diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java index 6b95304..ce25c7b 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java @@ -28,9 +28,8 @@ import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; -import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer; import org.apache.camel.test.infra.messaging.services.MessagingService; -import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder; +import org.apache.camel.test.infra.messaging.services.MessagingServiceFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,9 +43,9 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSourceJMSWithAggregation extends CamelSourceTestSupport { @RegisterExtension - public static MessagingService jmsService = MessagingServiceBuilder - .newBuilder(DispatchRouterContainer::new) - .withEndpointProvider(DispatchRouterContainer::defaultEndpoint) + public static MessagingService jmsService = MessagingServiceFactory + .builder() + .addLocalMapping(SJMS2Common::createLocalService) .build(); private final int sentSize = 10;
