This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 05b168552cb Fix Kinesis integration test (#14948)
05b168552cb is described below

commit 05b168552cb2e46d89bdd5b114430951c0383d65
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Apr 5 12:00:07 2022 +0200

    Fix Kinesis integration test (#14948)
---
 .../integration/io/sinks/KinesisSinkTester.java    | 54 ++++++++++++++--------
 1 file changed, 34 insertions(+), 20 deletions(-)

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
index 968e19faaf7..434924e0818 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.io.sinks;
 
+import java.util.LinkedHashMap;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
@@ -25,8 +26,6 @@ import org.awaitility.Awaitility;
 import org.testcontainers.containers.localstack.LocalStackContainer;
 import org.testcontainers.utility.DockerImageName;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
@@ -34,19 +33,19 @@ import 
software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
-import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
 
 @Slf4j
 public class KinesisSinkTester extends SinkTester<LocalStackContainer> {
 
     private static final String NAME = "kinesis";
+    private static final int LOCALSTACK_SERVICE_PORT = 4566;
     public static final String STREAM_NAME = "my-stream-1";
     private KinesisAsyncClient client;
 
@@ -64,16 +63,11 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
         final LocalStackContainer localStackContainer = getServiceContainer();
         final URI endpointOverride = 
localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
         sinkConfig.put("awsEndpoint", NAME);
-        sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
+        sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);
         sinkConfig.put("skipCertificateValidation", true);
-        client = KinesisAsyncClient.builder().credentialsProvider(new 
AwsCredentialsProvider() {
-                    @Override
-                    public AwsCredentials resolveCredentials() {
-                        return AwsBasicCredentials.create(
-                                "access",
-                                "secret");
-                    }
-                })
+        client = KinesisAsyncClient.builder().credentialsProvider(() -> 
AwsBasicCredentials.create(
+                "access",
+                "secret"))
                 .region(Region.US_EAST_1)
                 .endpointOverride(endpointOverride)
                 .build();
@@ -84,8 +78,14 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
                 .build())
                 .get();
         log.info("prepareSink for kinesis: created stream {}", STREAM_NAME);
+    }
 
-
+    @Override
+    public void stopServiceContainer(PulsarCluster cluster) {
+        if (client != null) {
+            client.close();
+        }
+        super.stopServiceContainer(cluster);
     }
 
     @Override
@@ -95,13 +95,12 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
     }
 
     @Override
-    @SneakyThrows
     public void validateSinkResult(Map<String, String> kvs) {
-        Awaitility.await().untilAsserted(() -> validateSinkResult());
+        Awaitility.await().untilAsserted(() -> 
internalValidateSinkResult(kvs));
     }
 
     @SneakyThrows
-    private void validateSinkResult() {
+    private void internalValidateSinkResult(Map<String, String> kvs) {
         final String shardId = client.listShards(
                 ListShardsRequest.builder()
                         .streamName(STREAM_NAME)
@@ -118,15 +117,30 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
                 .build())
                 .get()
                 .shardIterator();
+
+        Map<String, String> records = new LinkedHashMap<>();
+
+        // millisBehindLatest equals zero when record processing is caught up,
+        // and there are no new records to process at this moment.
+        // See 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
+        Awaitility.await().until(() -> 
addMoreRecordsAndGetMillisBehindLatest(records, iterator) == 0);
+
+        assertEquals(kvs, records);
+    }
+
+    @SneakyThrows
+    private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String> 
records, String iterator) {
         final GetRecordsResponse response = client.getRecords(
                 GetRecordsRequest
                         .builder()
                         .shardIterator(iterator)
                         .build())
                 .get();
-        assertTrue(response.hasRecords());
-        for (Record record : response.records()) {
-            
assertTrue(record.data().asString(StandardCharsets.UTF_8).startsWith("value-"));
+        if(response.hasRecords()) {
+            response.records().forEach(
+                record -> records.put(record.partitionKey(), 
record.data().asString(StandardCharsets.UTF_8))
+            );
         }
+        return response.millisBehindLatest();
     }
 }

Reply via email to