This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new de43972516c Add integration test for Kinesis sink with schema and JSON 
output (#15030)
de43972516c is described below

commit de43972516c783dd3c6f79ae9352e6f299066582
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Apr 12 09:49:38 2022 +0200

    Add integration test for Kinesis sink with schema and JSON output (#15030)
---
 .../integration/io/sinks/KinesisSinkTester.java    | 102 +++++++++++++++++++--
 .../integration/io/sinks/PulsarSinksTest.java      |  45 ++++-----
 2 files changed, 112 insertions(+), 35 deletions(-)

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
index 434924e0818..fbed1fa5b42 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
@@ -18,9 +18,20 @@
  */
 package org.apache.pulsar.tests.integration.io.sinks;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectReader;
 import java.util.LinkedHashMap;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.testcontainers.containers.localstack.LocalStackContainer;
@@ -33,6 +44,7 @@ import 
software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 import java.net.URI;
@@ -47,14 +59,31 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
     private static final String NAME = "kinesis";
     private static final int LOCALSTACK_SERVICE_PORT = 4566;
     public static final String STREAM_NAME = "my-stream-1";
+    public static final ObjectReader READER = 
ObjectMapperFactory.getThreadLocal().reader();
+    private final boolean withSchema;
     private KinesisAsyncClient client;
 
-    public KinesisSinkTester() {
+    public KinesisSinkTester(boolean withSchema) {
         super(NAME, SinkType.KINESIS);
+        this.withSchema = withSchema;
 
         sinkConfig.put("awsKinesisStreamName", STREAM_NAME);
         sinkConfig.put("awsRegion", "us-east-1");
         sinkConfig.put("awsCredentialPluginParam", 
"{\"accessKey\":\"access\",\"secretKey\":\"secret\"}");
+        if (withSchema) {
+            sinkConfig.put("messageFormat", 
"FULL_MESSAGE_IN_JSON_EXPAND_VALUE");
+        }
+    }
+
+    @Override
+    public Schema<?> getInputTopicSchema() {
+        if (withSchema) {
+            // we do not want to enforce a Schema
+            // at the beginning of the test
+            return Schema.AUTO_CONSUME();
+        } else {
+            return Schema.STRING;
+        }
     }
 
 
@@ -94,6 +123,46 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
                 .withServices(LocalStackContainer.Service.KINESIS);
     }
 
+    @Override
+    public void produceMessage(int numMessages, PulsarClient client,
+                               String inputTopicName, LinkedHashMap<String, 
String> kvs) throws Exception {
+        if (withSchema) {
+            Schema<KeyValue<SimplePojo, SimplePojo>> kvSchema =
+                    Schema.KeyValue(Schema.JSON(SimplePojo.class),
+                            Schema.AVRO(SimplePojo.class), 
KeyValueEncodingType.SEPARATED);
+
+            @Cleanup
+            Producer<KeyValue<SimplePojo, SimplePojo>> producer = 
client.newProducer(kvSchema)
+                    .topic(inputTopicName)
+                    .create();
+
+            for (int i = 0; i < numMessages; i++) {
+                String key = String.valueOf(i);
+                kvs.put(key, key);
+                KeyValue<SimplePojo, SimplePojo> value = new KeyValue<>(new 
SimplePojo("f1_" + i, "f2_" + i),
+                        new SimplePojo(String.valueOf(i), "v2_" + i));
+                producer.newMessage()
+                        .value(value)
+                        .send();
+            }
+        } else {
+            @Cleanup
+            Producer<String> producer = client.newProducer(Schema.STRING)
+                    .topic(inputTopicName)
+                    .create();
+
+            for (int i = 0; i < numMessages; i++) {
+                String key = "key-" + i;
+                String value = "value-" + i;
+                kvs.put(key, value);
+                producer.newMessage()
+                        .key(key)
+                        .value(value)
+                        .send();
+            }
+        }
+    }
+
     @Override
     public void validateSinkResult(Map<String, String> kvs) {
         Awaitility.await().untilAsserted(() -> 
internalValidateSinkResult(kvs));
@@ -118,18 +187,18 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
                 .get()
                 .shardIterator();
 
-        Map<String, String> records = new LinkedHashMap<>();
+        Map<String, String> actualKvs = new LinkedHashMap<>();
 
         // millisBehindLatest equals zero when record processing is caught up,
         // and there are no new records to process at this moment.
         // See 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
-        Awaitility.await().until(() -> 
addMoreRecordsAndGetMillisBehindLatest(records, iterator) == 0);
+        Awaitility.await().until(() -> 
addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0);
 
-        assertEquals(kvs, records);
+        assertEquals(actualKvs, kvs);
     }
 
     @SneakyThrows
-    private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String> 
records, String iterator) {
+    private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String> 
kvs, String iterator) {
         final GetRecordsResponse response = client.getRecords(
                 GetRecordsRequest
                         .builder()
@@ -137,10 +206,27 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
                         .build())
                 .get();
         if(response.hasRecords()) {
-            response.records().forEach(
-                record -> records.put(record.partitionKey(), 
record.data().asString(StandardCharsets.UTF_8))
-            );
+            for (Record record : response.records()) {
+                String data = record.data().asString(StandardCharsets.UTF_8);
+                if (withSchema) {
+                    JsonNode payload = READER.readTree(data).at("/payload");
+                    String i = payload.at("/value/field1").asText();
+                    assertEquals(payload.at("/value/field2").asText(), "v2_" + 
i);
+                    assertEquals(payload.at("/key/field1").asText(), "f1_" + 
i);
+                    assertEquals(payload.at("/key/field2").asText(), "f2_" + 
i);
+                    kvs.put(i, i);
+                } else {
+                    kvs.put(record.partitionKey(), data);
+                }
+            }
         }
         return response.millisBehindLatest();
     }
+
+    @Data
+    @AllArgsConstructor
+    public static final class SimplePojo {
+        private String field1;
+        private String field2;
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
index d9d454011a5..04eda16c57b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
@@ -23,11 +23,17 @@ import 
org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
 import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester;
 import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester;
 import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
 public class PulsarSinksTest extends PulsarIOTestBase {
 
+    @DataProvider(name = "withSchema")
+    public Object[][] withSchema() {
+        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
+    }
+
     @Test(groups = "sink")
     public void testKafkaSink() throws Exception {
         final String kafkaContainerName = "kafka-" + randomName(8);
@@ -54,34 +60,19 @@ public class PulsarSinksTest extends PulsarIOTestBase {
         testSink(new JdbcPostgresSinkTester(), true);
     }
 
-    @Test(groups = "sink")
-    public void testElasticSearch7SinkRawData() throws Exception {
-        testSink(new ElasticSearch7SinkTester(false), true);
-    }
-
-    @Test(groups = "sink")
-    public void testElasticSearchSink7SchemaEnabled() throws Exception {
-        testSink(new ElasticSearch7SinkTester(true), true);
+    @Test(groups = "sink", dataProvider = "withSchema")
+    public void testElasticSearch7Sink(boolean withSchema) throws Exception {
+        testSink(new ElasticSearch7SinkTester(withSchema), true);
     }
 
-    @Test(groups = "sink")
-    public void testElasticSearch8SinkRawData() throws Exception {
-        testSink(new ElasticSearch8SinkTester(false), true);
+    @Test(groups = "sink", dataProvider = "withSchema")
+    public void testElasticSearch8Sink(boolean withSchema) throws Exception {
+        testSink(new ElasticSearch8SinkTester(withSchema), true);
     }
 
-    @Test(groups = "sink")
-    public void testElasticSearch8SinkSchemaEnabled() throws Exception {
-        testSink(new ElasticSearch8SinkTester(true), true);
-    }
-
-    @Test(groups = "sink")
-    public void testOpenSearchSinkRawData() throws Exception {
-        testSink(new OpenSearchSinkTester(false), true);
-    }
-
-    @Test(groups = "sink")
-    public void testOpenSearchSinkSchemaEnabled() throws Exception {
-        testSink(new OpenSearchSinkTester(true), true);
+    @Test(groups = "sink", dataProvider = "withSchema")
+    public void testOpenSearchSinkRawData(boolean withSchema) throws Exception 
{
+        testSink(new OpenSearchSinkTester(withSchema), true);
     }
 
     @Test(groups = "sink")
@@ -90,9 +81,9 @@ public class PulsarSinksTest extends PulsarIOTestBase {
         testSink(new RabbitMQSinkTester(containerName), true, new 
RabbitMQSourceTester(containerName));
     }
 
-    @Test(groups = "sink")
-    public void testKinesis() throws Exception {
-        testSink(new KinesisSinkTester(), true);
+    @Test(groups = "sink", dataProvider = "withSchema")
+    public void testKinesis(boolean withSchema) throws Exception {
+        testSink(new KinesisSinkTester(withSchema), true);
     }
 
 }

Reply via email to