This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new de43972516c Add integration test for Kinesis sink with schema and JSON
output (#15030)
de43972516c is described below
commit de43972516c783dd3c6f79ae9352e6f299066582
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Apr 12 09:49:38 2022 +0200
Add integration test for Kinesis sink with schema and JSON output (#15030)
---
.../integration/io/sinks/KinesisSinkTester.java | 102 +++++++++++++++++++--
.../integration/io/sinks/PulsarSinksTest.java | 45 ++++-----
2 files changed, 112 insertions(+), 35 deletions(-)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
index 434924e0818..fbed1fa5b42 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
@@ -18,9 +18,20 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectReader;
import java.util.LinkedHashMap;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.testcontainers.containers.localstack.LocalStackContainer;
@@ -33,6 +44,7 @@ import
software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import java.net.URI;
@@ -47,14 +59,31 @@ public class KinesisSinkTester extends
SinkTester<LocalStackContainer> {
private static final String NAME = "kinesis";
private static final int LOCALSTACK_SERVICE_PORT = 4566;
public static final String STREAM_NAME = "my-stream-1";
+ public static final ObjectReader READER =
ObjectMapperFactory.getThreadLocal().reader();
+ private final boolean withSchema;
private KinesisAsyncClient client;
- public KinesisSinkTester() {
+ public KinesisSinkTester(boolean withSchema) {
super(NAME, SinkType.KINESIS);
+ this.withSchema = withSchema;
sinkConfig.put("awsKinesisStreamName", STREAM_NAME);
sinkConfig.put("awsRegion", "us-east-1");
sinkConfig.put("awsCredentialPluginParam",
"{\"accessKey\":\"access\",\"secretKey\":\"secret\"}");
+ if (withSchema) {
+ sinkConfig.put("messageFormat",
"FULL_MESSAGE_IN_JSON_EXPAND_VALUE");
+ }
+ }
+
+ @Override
+ public Schema<?> getInputTopicSchema() {
+ if (withSchema) {
+ // we do not want to enforce a Schema
+ // at the beginning of the test
+ return Schema.AUTO_CONSUME();
+ } else {
+ return Schema.STRING;
+ }
}
@@ -94,6 +123,46 @@ public class KinesisSinkTester extends
SinkTester<LocalStackContainer> {
.withServices(LocalStackContainer.Service.KINESIS);
}
+ @Override
+ public void produceMessage(int numMessages, PulsarClient client,
+ String inputTopicName, LinkedHashMap<String,
String> kvs) throws Exception {
+ if (withSchema) {
+ Schema<KeyValue<SimplePojo, SimplePojo>> kvSchema =
+ Schema.KeyValue(Schema.JSON(SimplePojo.class),
+ Schema.AVRO(SimplePojo.class),
KeyValueEncodingType.SEPARATED);
+
+ @Cleanup
+ Producer<KeyValue<SimplePojo, SimplePojo>> producer =
client.newProducer(kvSchema)
+ .topic(inputTopicName)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ String key = String.valueOf(i);
+ kvs.put(key, key);
+ KeyValue<SimplePojo, SimplePojo> value = new KeyValue<>(new
SimplePojo("f1_" + i, "f2_" + i),
+ new SimplePojo(String.valueOf(i), "v2_" + i));
+ producer.newMessage()
+ .value(value)
+ .send();
+ }
+ } else {
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopicName)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+ String value = "value-" + i;
+ kvs.put(key, value);
+ producer.newMessage()
+ .key(key)
+ .value(value)
+ .send();
+ }
+ }
+ }
+
@Override
public void validateSinkResult(Map<String, String> kvs) {
Awaitility.await().untilAsserted(() ->
internalValidateSinkResult(kvs));
@@ -118,18 +187,18 @@ public class KinesisSinkTester extends
SinkTester<LocalStackContainer> {
.get()
.shardIterator();
- Map<String, String> records = new LinkedHashMap<>();
+ Map<String, String> actualKvs = new LinkedHashMap<>();
// millisBehindLatest equals zero when record processing is caught up,
// and there are no new records to process at this moment.
// See
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
- Awaitility.await().until(() ->
addMoreRecordsAndGetMillisBehindLatest(records, iterator) == 0);
+ Awaitility.await().until(() ->
addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0);
- assertEquals(kvs, records);
+ assertEquals(actualKvs, kvs);
}
@SneakyThrows
- private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String>
records, String iterator) {
+ private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String>
kvs, String iterator) {
final GetRecordsResponse response = client.getRecords(
GetRecordsRequest
.builder()
@@ -137,10 +206,27 @@ public class KinesisSinkTester extends
SinkTester<LocalStackContainer> {
.build())
.get();
if(response.hasRecords()) {
- response.records().forEach(
- record -> records.put(record.partitionKey(),
record.data().asString(StandardCharsets.UTF_8))
- );
+ for (Record record : response.records()) {
+ String data = record.data().asString(StandardCharsets.UTF_8);
+ if (withSchema) {
+ JsonNode payload = READER.readTree(data).at("/payload");
+ String i = payload.at("/value/field1").asText();
+ assertEquals(payload.at("/value/field2").asText(), "v2_" +
i);
+ assertEquals(payload.at("/key/field1").asText(), "f1_" +
i);
+ assertEquals(payload.at("/key/field2").asText(), "f2_" +
i);
+ kvs.put(i, i);
+ } else {
+ kvs.put(record.partitionKey(), data);
+ }
+ }
}
return response.millisBehindLatest();
}
+
+ @Data
+ @AllArgsConstructor
+ public static final class SimplePojo {
+ private String field1;
+ private String field2;
+ }
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
index d9d454011a5..04eda16c57b 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
@@ -23,11 +23,17 @@ import
org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester;
import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester;
import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class PulsarSinksTest extends PulsarIOTestBase {
+ @DataProvider(name = "withSchema")
+ public Object[][] withSchema() {
+ return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
+ }
+
@Test(groups = "sink")
public void testKafkaSink() throws Exception {
final String kafkaContainerName = "kafka-" + randomName(8);
@@ -54,34 +60,19 @@ public class PulsarSinksTest extends PulsarIOTestBase {
testSink(new JdbcPostgresSinkTester(), true);
}
- @Test(groups = "sink")
- public void testElasticSearch7SinkRawData() throws Exception {
- testSink(new ElasticSearch7SinkTester(false), true);
- }
-
- @Test(groups = "sink")
- public void testElasticSearchSink7SchemaEnabled() throws Exception {
- testSink(new ElasticSearch7SinkTester(true), true);
+ @Test(groups = "sink", dataProvider = "withSchema")
+ public void testElasticSearch7Sink(boolean withSchema) throws Exception {
+ testSink(new ElasticSearch7SinkTester(withSchema), true);
}
- @Test(groups = "sink")
- public void testElasticSearch8SinkRawData() throws Exception {
- testSink(new ElasticSearch8SinkTester(false), true);
+ @Test(groups = "sink", dataProvider = "withSchema")
+ public void testElasticSearch8Sink(boolean withSchema) throws Exception {
+ testSink(new ElasticSearch8SinkTester(withSchema), true);
}
- @Test(groups = "sink")
- public void testElasticSearch8SinkSchemaEnabled() throws Exception {
- testSink(new ElasticSearch8SinkTester(true), true);
- }
-
- @Test(groups = "sink")
- public void testOpenSearchSinkRawData() throws Exception {
- testSink(new OpenSearchSinkTester(false), true);
- }
-
- @Test(groups = "sink")
- public void testOpenSearchSinkSchemaEnabled() throws Exception {
- testSink(new OpenSearchSinkTester(true), true);
+ @Test(groups = "sink", dataProvider = "withSchema")
+ public void testOpenSearchSinkRawData(boolean withSchema) throws Exception
{
+ testSink(new OpenSearchSinkTester(withSchema), true);
}
@Test(groups = "sink")
@@ -90,9 +81,9 @@ public class PulsarSinksTest extends PulsarIOTestBase {
testSink(new RabbitMQSinkTester(containerName), true, new
RabbitMQSourceTester(containerName));
}
- @Test(groups = "sink")
- public void testKinesis() throws Exception {
- testSink(new KinesisSinkTester(), true);
+ @Test(groups = "sink", dataProvider = "withSchema")
+ public void testKinesis(boolean withSchema) throws Exception {
+ testSink(new KinesisSinkTester(withSchema), true);
}
}