This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c116cce Added Azure Storage Queue source test case
c116cce is described below
commit c116ccefea512c81301524dfa66516074c3ced16
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Jan 29 09:43:44 2021 +0100
Added Azure Storage Queue source test case
---
.../{sink => common}/TestQueueConfiguration.java | 2 +-
.../sink/CamelSinkAzureStorageQueueITCase.java | 1 +
.../CamelSourceAzureStorageQueueITCase.java} | 96 +++++++---------------
...amelSourceAzureStorageQueuePropertyFactory.java | 52 ++++++++++++
4 files changed, 83 insertions(+), 68 deletions(-)
diff --git
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/common/TestQueueConfiguration.java
similarity index 95%
rename from
tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
rename to
tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/common/TestQueueConfiguration.java
index d91c43e..570c633 100644
---
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
+++
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/common/TestQueueConfiguration.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+package org.apache.camel.kafkaconnector.azure.storage.queue.common;
import com.azure.storage.queue.QueueServiceClient;
import org.apache.camel.component.azure.storage.queue.QueueConfiguration;
diff --git
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index b0636ff..a7c16ed 100644
---
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -26,6 +26,7 @@ import com.azure.storage.queue.QueueClient;
import com.azure.storage.queue.QueueServiceClient;
import com.azure.storage.queue.models.PeekedMessageItem;
import org.apache.camel.kafkaconnector.CamelSinkTask;
+import
org.apache.camel.kafkaconnector.azure.storage.queue.common.TestQueueConfiguration;
import
org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
diff --git
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
similarity index 56%
copy from
tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
copy to
tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
index b0636ff..b862255 100644
---
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
@@ -15,17 +15,14 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+package org.apache.camel.kafkaconnector.azure.storage.queue.source;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ExecutionException;
import com.azure.storage.queue.QueueClient;
import com.azure.storage.queue.QueueServiceClient;
-import com.azure.storage.queue.models.PeekedMessageItem;
-import org.apache.camel.kafkaconnector.CamelSinkTask;
+import
org.apache.camel.kafkaconnector.azure.storage.queue.common.TestQueueConfiguration;
import
org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
@@ -34,6 +31,7 @@ import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
import org.apache.camel.test.infra.azure.common.services.AzureService;
import
org.apache.camel.test.infra.azure.storage.queue.services.AzureStorageQueueServiceFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -46,11 +44,10 @@ import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
+public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest {
@RegisterExtension
public static AzureService service =
AzureStorageQueueServiceFactory.createAzureService();
-
- private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkAzureStorageQueueITCase.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(CamelSourceAzureStorageQueueITCase.class);
private QueueServiceClient client;
private QueueClient queueClient;
@@ -79,54 +76,37 @@ public class CamelSinkAzureStorageQueueITCase extends
AbstractKafkaTest {
}
}
- private void acknowledgeReceived(PeekedMessageItem peekedMessageItem) {
- received++;
- LOG.info("Received: {}", peekedMessageItem.getMessageText());
- }
-
- private boolean canConsume() {
- return queueClient.getProperties().getApproximateMessagesCount() >=
expect;
- }
-
- private void consume() {
- LOG.debug("Created the consumer ...");
- TestUtils.waitFor(this::canConsume);
-
- LOG.debug("About to receive messages");
- int count = queueClient.getProperties().getApproximateMessagesCount();
-
- queueClient.peekMessages(count, null,
null).forEach(this::acknowledgeReceived);
-
+ private void sendMessages() {
+ for (int i = 0; i < expect; i++) {
+ queueClient.sendMessage("Test message " + i);
+ }
}
- private void putRecords() {
- Map<String, String> messageParameters = new HashMap<>();
- KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());
+ private boolean checkRecord(ConsumerRecord<String, String> record) {
+ LOG.debug("Received: {}", record.value());
+ received++;
- for (int i = 0; i < expect; i++) {
- try {
- // This is for 3.4 only. From 3.5 and newer, the text is taken
from the body
- messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX +
"CamelAzureStorageQueueMessageText", "test " + i);
-
-
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " +
i, messageParameters);
- } catch (ExecutionException e) {
- LOG.error("Unable to produce messages: {}", e.getMessage(), e);
- } catch (InterruptedException e) {
- break;
- }
+ if (received == expect) {
+ return false;
}
- }
+ return true;
+ }
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory)
throws ExecutionException, InterruptedException {
+ public void runtTest(ConnectorPropertyFactory connectorPropertyFactory)
throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
-
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory,
1);
+ getKafkaConnectService().initializeConnector(connectorPropertyFactory);
- putRecords();
+ sendMessages();
- consume();
+ LOG.debug("Initialized the connector and put the data for the test
execution");
- assertEquals(expect, received, "Did not receive the same amount of
messages that were sent");
+ LOG.debug("Creating the consumer ...");
+ KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());
+ kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()),
this::checkRecord);
+ LOG.debug("Created the consumer ...");
+
+ assertEquals(received, expect, "Didn't process the expected amount of
messages");
}
@@ -135,33 +115,15 @@ public class CamelSinkAzureStorageQueueITCase extends
AbstractKafkaTest {
public void testBasicSendReceive() throws InterruptedException,
ExecutionException, IOException {
AzureCredentialsHolder azureCredentialsHolder =
service.azureCredentials();
- ConnectorPropertyFactory connectorPropertyFactory =
CamelSinkAzureStorageQueuePropertyFactory
+ ConnectorPropertyFactory connectorPropertyFactory =
CamelSourceAzureStorageQueuePropertyFactory
.basic()
.withConfiguration(TestQueueConfiguration.class.getName())
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withAccessKey(azureCredentialsHolder.accountKey())
.withAccountName(azureCredentialsHolder.accountName())
- .withOperation("sendMessage")
.withQueueName(queueName);
- runTest(connectorPropertyFactory);
+ runtTest(connectorPropertyFactory);
}
-
- @Test
- @Timeout(90)
- public void testBasicSendReceiveUrl() throws InterruptedException,
ExecutionException, IOException {
- AzureCredentialsHolder azureCredentialsHolder =
service.azureCredentials();
-
- ConnectorPropertyFactory connectorPropertyFactory =
CamelSinkAzureStorageQueuePropertyFactory
- .basic()
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
- .withConfiguration(TestQueueConfiguration.class.getName())
- .withUrl(azureCredentialsHolder.accountName() + "/" +
queueName)
- .append("accessKey", azureCredentialsHolder.accountKey())
- .append("operation", "sendMessage")
- .buildUrl();
-
- runTest(connectorPropertyFactory);
- }
}
diff --git
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueuePropertyFactory.java
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueuePropertyFactory.java
new file mode 100644
index 0000000..13024cf
--- /dev/null
+++
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueuePropertyFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.queue.source;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+public class CamelSourceAzureStorageQueuePropertyFactory extends
SourceConnectorPropertyFactory<CamelSourceAzureStorageQueuePropertyFactory> {
+ public CamelSourceAzureStorageQueuePropertyFactory withAccountName(String
value) {
+ return setProperty("camel.source.path.accountName", value);
+ }
+
+ public CamelSourceAzureStorageQueuePropertyFactory withQueueName(String
value) {
+ return setProperty("camel.source.path.queueName", value);
+ }
+
+ public CamelSourceAzureStorageQueuePropertyFactory withAccessKey(String
value) {
+ return setProperty("camel.component.azure-storage-queue.accessKey",
value);
+ }
+
+ public CamelSourceAzureStorageQueuePropertyFactory
withConfiguration(String configurationClass) {
+ return
setProperty("camel.component.azure-storage-queue.configuration",
classRef(configurationClass));
+ }
+
+ public CamelSourceAzureStorageQueuePropertyFactory withOperation(String
value) {
+ return setProperty("camel.component.azure-storage-queue.operation",
value);
+ }
+
+ public static CamelSourceAzureStorageQueuePropertyFactory basic() {
+ return new CamelSourceAzureStorageQueuePropertyFactory()
+ .withTasksMax(1)
+ .withName("CamelAzurequeueSourceConnector")
+
.withConnectorClass("org.apache.camel.kafkaconnector.azurestoragequeue.CamelAzurestoragequeueSourceConnector")
+
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+ }
+}