This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c116cce  Added Azure Storage Queue source test case
c116cce is described below

commit c116ccefea512c81301524dfa66516074c3ced16
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Jan 29 09:43:44 2021 +0100

    Added Azure Storage Queue source test case
---
 .../{sink => common}/TestQueueConfiguration.java   |  2 +-
 .../sink/CamelSinkAzureStorageQueueITCase.java     |  1 +
 .../CamelSourceAzureStorageQueueITCase.java}       | 96 +++++++---------------
 ...amelSourceAzureStorageQueuePropertyFactory.java | 52 ++++++++++++
 4 files changed, 83 insertions(+), 68 deletions(-)

diff --git 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/common/TestQueueConfiguration.java
similarity index 95%
rename from 
tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
rename to 
tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/common/TestQueueConfiguration.java
index d91c43e..570c633 100644
--- 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/TestQueueConfiguration.java
+++ 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/common/TestQueueConfiguration.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+package org.apache.camel.kafkaconnector.azure.storage.queue.common;
 
 import com.azure.storage.queue.QueueServiceClient;
 import org.apache.camel.component.azure.storage.queue.QueueConfiguration;
diff --git 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index b0636ff..a7c16ed 100644
--- 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -26,6 +26,7 @@ import com.azure.storage.queue.QueueClient;
 import com.azure.storage.queue.QueueServiceClient;
 import com.azure.storage.queue.models.PeekedMessageItem;
 import org.apache.camel.kafkaconnector.CamelSinkTask;
+import 
org.apache.camel.kafkaconnector.azure.storage.queue.common.TestQueueConfiguration;
 import 
org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
diff --git 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
similarity index 56%
copy from 
tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
copy to 
tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
index b0636ff..b862255 100644
--- 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
@@ -15,17 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
+package org.apache.camel.kafkaconnector.azure.storage.queue.source;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 import com.azure.storage.queue.QueueClient;
 import com.azure.storage.queue.QueueServiceClient;
-import com.azure.storage.queue.models.PeekedMessageItem;
-import org.apache.camel.kafkaconnector.CamelSinkTask;
+import 
org.apache.camel.kafkaconnector.azure.storage.queue.common.TestQueueConfiguration;
 import 
org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
@@ -34,6 +31,7 @@ import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
 import org.apache.camel.test.infra.azure.common.services.AzureService;
 import 
org.apache.camel.test.infra.azure.storage.queue.services.AzureStorageQueueServiceFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -46,11 +44,10 @@ import org.slf4j.LoggerFactory;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
+public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest {
     @RegisterExtension
     public static AzureService service = 
AzureStorageQueueServiceFactory.createAzureService();
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkAzureStorageQueueITCase.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceAzureStorageQueueITCase.class);
 
     private QueueServiceClient client;
     private QueueClient queueClient;
@@ -79,54 +76,37 @@ public class CamelSinkAzureStorageQueueITCase extends 
AbstractKafkaTest {
         }
     }
 
-    private void acknowledgeReceived(PeekedMessageItem peekedMessageItem) {
-        received++;
-        LOG.info("Received: {}", peekedMessageItem.getMessageText());
-    }
-
-    private boolean canConsume() {
-        return queueClient.getProperties().getApproximateMessagesCount() >= 
expect;
-    }
-
-    private void consume() {
-        LOG.debug("Created the consumer ...");
-        TestUtils.waitFor(this::canConsume);
-
-        LOG.debug("About to receive messages");
-        int count = queueClient.getProperties().getApproximateMessagesCount();
-
-        queueClient.peekMessages(count, null, 
null).forEach(this::acknowledgeReceived);
-
+    private void sendMessages() {
+        for (int i = 0; i < expect; i++) {
+            queueClient.sendMessage("Test message " + i);
+        }
     }
 
-    private void putRecords() {
-        Map<String, String> messageParameters = new HashMap<>();
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+    private boolean checkRecord(ConsumerRecord<String, String> record) {
+        LOG.debug("Received: {}", record.value());
+        received++;
 
-        for (int i = 0; i < expect; i++) {
-            try {
-                // This is for 3.4 only. From 3.5 and newer, the text is taken 
from the body
-                messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAzureStorageQueueMessageText", "test " + i);
-
-                
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " + 
i, messageParameters);
-            } catch (ExecutionException e) {
-                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-            } catch (InterruptedException e) {
-                break;
-            }
+        if (received == expect) {
+            return false;
         }
-    }
 
+        return true;
+    }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
+    public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
-        
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
-        putRecords();
+        sendMessages();
 
-        consume();
+        LOG.debug("Initialized the connector and put the data for the test 
execution");
 
-        assertEquals(expect, received, "Did not receive the same amount of 
messages that were sent");
+        LOG.debug("Creating the consumer ...");
+        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
+        LOG.debug("Created the consumer ...");
+
+        assertEquals(received, expect, "Didn't process the expected amount of 
messages");
     }
 
 
@@ -135,33 +115,15 @@ public class CamelSinkAzureStorageQueueITCase extends 
AbstractKafkaTest {
     public void testBasicSendReceive() throws InterruptedException, 
ExecutionException, IOException {
         AzureCredentialsHolder azureCredentialsHolder = 
service.azureCredentials();
 
-        ConnectorPropertyFactory connectorPropertyFactory = 
CamelSinkAzureStorageQueuePropertyFactory
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelSourceAzureStorageQueuePropertyFactory
                 .basic()
                 .withConfiguration(TestQueueConfiguration.class.getName())
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
                 .withAccessKey(azureCredentialsHolder.accountKey())
                 .withAccountName(azureCredentialsHolder.accountName())
-                .withOperation("sendMessage")
                 .withQueueName(queueName);
 
-        runTest(connectorPropertyFactory);
+        runtTest(connectorPropertyFactory);
     }
 
-
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceiveUrl() throws InterruptedException, 
ExecutionException, IOException {
-        AzureCredentialsHolder azureCredentialsHolder = 
service.azureCredentials();
-
-        ConnectorPropertyFactory connectorPropertyFactory = 
CamelSinkAzureStorageQueuePropertyFactory
-                .basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                .withConfiguration(TestQueueConfiguration.class.getName())
-                .withUrl(azureCredentialsHolder.accountName() + "/" + 
queueName)
-                .append("accessKey", azureCredentialsHolder.accountKey())
-                .append("operation", "sendMessage")
-                .buildUrl();
-
-        runTest(connectorPropertyFactory);
-    }
 }
diff --git 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueuePropertyFactory.java
 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueuePropertyFactory.java
new file mode 100644
index 0000000..13024cf
--- /dev/null
+++ 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueuePropertyFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.azure.storage.queue.source;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+public class CamelSourceAzureStorageQueuePropertyFactory extends 
SourceConnectorPropertyFactory<CamelSourceAzureStorageQueuePropertyFactory> {
+    public CamelSourceAzureStorageQueuePropertyFactory withAccountName(String 
value) {
+        return setProperty("camel.source.path.accountName", value);
+    }
+
+    public CamelSourceAzureStorageQueuePropertyFactory withQueueName(String 
value) {
+        return setProperty("camel.source.path.queueName", value);
+    }
+
+    public CamelSourceAzureStorageQueuePropertyFactory withAccessKey(String 
value) {
+        return setProperty("camel.component.azure-storage-queue.accessKey", 
value);
+    }
+
+    public CamelSourceAzureStorageQueuePropertyFactory 
withConfiguration(String configurationClass) {
+        return 
setProperty("camel.component.azure-storage-queue.configuration", 
classRef(configurationClass));
+    }
+
+    public CamelSourceAzureStorageQueuePropertyFactory withOperation(String 
value) {
+        return setProperty("camel.component.azure-storage-queue.operation", 
value);
+    }
+
+    public static CamelSourceAzureStorageQueuePropertyFactory basic() {
+        return new CamelSourceAzureStorageQueuePropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelAzurequeueSourceConnector")
+                    
.withConnectorClass("org.apache.camel.kafkaconnector.azurestoragequeue.CamelAzurestoragequeueSourceConnector")
+                    
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+    }
+}

Reply via email to