This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit cdbeea8718e462cef5cb90148571fda3fa5d21fe Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Mon Feb 8 13:50:17 2021 +0100 Added a reusable base class for the source tests --- .../common/test/AbstractTestMessageConsumer.java | 66 +++++++++++++++++++ .../common/test/CamelSourceTestSupport.java | 75 ++++++++++++++++++++++ .../common/test/StringMessageConsumer.java | 29 +++++++++ .../common/test/TestMessageConsumer.java | 27 ++++++++ 4 files changed, 197 insertions(+) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java new file mode 100644 index 0000000..2fcf42f --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common.test; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsumer<T> { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTestMessageConsumer.class); + + private final KafkaClient<String, T> kafkaClient; + private final String topicName; + private final int count; + private final List<ConsumerRecord<String, T>> receivedMessages; + private volatile int received; + + public AbstractTestMessageConsumer(KafkaClient<String, T> kafkaClient, String topicName, int count) { + this.kafkaClient = kafkaClient; + this.topicName = topicName; + this.count = count; + + receivedMessages = new ArrayList<>(count); + } + + private boolean checkRecord(ConsumerRecord<String, T> record) { + LOG.debug("Received: {}", record.value()); + received++; + receivedMessages.add(record); + + if (received == count) { + return false; + } + + return true; + } + + @Override + public void consumeMessages() { + kafkaClient.consume(topicName, this::checkRecord); + } + + @Override + public List<ConsumerRecord<String, T>> consumedMessages() { + return receivedMessages; + } +} diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java new file mode 100644 index 0000000..7c9ee9b --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common.test; + +import java.util.concurrent.ExecutionException; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class CamelSourceTestSupport extends AbstractKafkaTest { + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceTestSupport.class); + + protected abstract void produceTestData(); + + protected abstract void verifyMessages(TestMessageConsumer<?> consumer); + + /** + * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results + * + * @param connectorPropertyFactory A factory for connector properties + * @param topic the topic to send the messages to + * @param count the number of messages to send + * @throws Exception For test-specific exceptions + */ + public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count); + + runTest(connectorPropertyFactory, consumer); + } + + + /** + * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results + * + * @param connectorPropertyFactory A factory for connector properties + * @param consumer A Kafka consumer consumer for the test messages + * @throws Exception For test-specific exceptions + */ + public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + LOG.debug("Initialized the connector and put the data for the test execution"); + getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + + LOG.debug("Producing test data to be collected by the connector and sent to Kafka"); + produceTestData(); + + LOG.debug("Creating the Kafka consumer ..."); + consumer.consumeMessages(); + LOG.debug("Ran the Kafka consumer ..."); + + LOG.debug("Verifying messages"); + verifyMessages(consumer); + LOG.debug("Verified messages"); + } + +} diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageConsumer.java new file mode 100644 index 0000000..f2105b0 --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageConsumer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common.test; + +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; + +/** + * A consumer that receives the 'count' amount of text messages from the Kafka broker + */ +public class StringMessageConsumer extends AbstractTestMessageConsumer<String> { + public StringMessageConsumer(KafkaClient<String, String> kafkaClient, String topicName, int count) { + super(kafkaClient, topicName, count); + } +} diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageConsumer.java new file mode 100644 index 0000000..2034539 --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageConsumer.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common.test; + +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public interface TestMessageConsumer<T> { + void consumeMessages(); + List<ConsumerRecord<String, T>> consumedMessages(); +}
