This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 05b168552cb Fix Kinesis integration test (#14948)
05b168552cb is described below
commit 05b168552cb2e46d89bdd5b114430951c0383d65
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Apr 5 12:00:07 2022 +0200
Fix Kinesis integration test (#14948)
---
.../integration/io/sinks/KinesisSinkTester.java | 54 ++++++++++++++--------
1 file changed, 34 insertions(+), 20 deletions(-)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
index 968e19faaf7..434924e0818 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;
+import java.util.LinkedHashMap;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
@@ -25,8 +26,6 @@ import org.awaitility.Awaitility;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
@@ -34,19 +33,19 @@ import
software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
-import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
@Slf4j
public class KinesisSinkTester extends SinkTester<LocalStackContainer> {
private static final String NAME = "kinesis";
+ private static final int LOCALSTACK_SERVICE_PORT = 4566;
public static final String STREAM_NAME = "my-stream-1";
private KinesisAsyncClient client;
@@ -64,16 +63,11 @@ public class KinesisSinkTester extends
SinkTester<LocalStackContainer> {
final LocalStackContainer localStackContainer = getServiceContainer();
final URI endpointOverride =
localStackContainer.getEndpointOverride(LocalStackContainer.Service.KINESIS);
sinkConfig.put("awsEndpoint", NAME);
- sinkConfig.put("awsEndpointPort", endpointOverride.getPort());
+ sinkConfig.put("awsEndpointPort", LOCALSTACK_SERVICE_PORT);
sinkConfig.put("skipCertificateValidation", true);
- client = KinesisAsyncClient.builder().credentialsProvider(new
AwsCredentialsProvider() {
- @Override
- public AwsCredentials resolveCredentials() {
- return AwsBasicCredentials.create(
- "access",
- "secret");
- }
- })
+ client = KinesisAsyncClient.builder().credentialsProvider(() ->
AwsBasicCredentials.create(
+ "access",
+ "secret"))
.region(Region.US_EAST_1)
.endpointOverride(endpointOverride)
.build();
@@ -84,8 +78,14 @@ public class KinesisSinkTester extends
SinkTester<LocalStackContainer> {
.build())
.get();
log.info("prepareSink for kinesis: created stream {}", STREAM_NAME);
+ }
-
+ @Override
+ public void stopServiceContainer(PulsarCluster cluster) {
+ if (client != null) {
+ client.close();
+ }
+ super.stopServiceContainer(cluster);
}
@Override
@@ -95,13 +95,12 @@ public class KinesisSinkTester extends
SinkTester<LocalStackContainer> {
}
@Override
- @SneakyThrows
public void validateSinkResult(Map<String, String> kvs) {
- Awaitility.await().untilAsserted(() -> validateSinkResult());
+ Awaitility.await().untilAsserted(() ->
internalValidateSinkResult(kvs));
}
@SneakyThrows
- private void validateSinkResult() {
+ private void internalValidateSinkResult(Map<String, String> kvs) {
final String shardId = client.listShards(
ListShardsRequest.builder()
.streamName(STREAM_NAME)
@@ -118,15 +117,30 @@ public class KinesisSinkTester extends
SinkTester<LocalStackContainer> {
.build())
.get()
.shardIterator();
+
+ Map<String, String> records = new LinkedHashMap<>();
+
+ // millisBehindLatest equals zero when record processing is caught up,
+ // and there are no new records to process at this moment.
+ // See
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
+ Awaitility.await().until(() ->
addMoreRecordsAndGetMillisBehindLatest(records, iterator) == 0);
+
+ assertEquals(kvs, records);
+ }
+
+ @SneakyThrows
+ private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String>
records, String iterator) {
final GetRecordsResponse response = client.getRecords(
GetRecordsRequest
.builder()
.shardIterator(iterator)
.build())
.get();
- assertTrue(response.hasRecords());
- for (Record record : response.records()) {
-
assertTrue(record.data().asString(StandardCharsets.UTF_8).startsWith("value-"));
+ if(response.hasRecords()) {
+ response.records().forEach(
+ record -> records.put(record.partitionKey(),
record.data().asString(StandardCharsets.UTF_8))
+ );
}
+ return response.millisBehindLatest();
}
}