heesung-sn commented on code in PR #15264:
URL: https://github.com/apache/pulsar/pull/15264#discussion_r861945568
##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java:
##########
@@ -189,38 +194,56 @@ private void internalValidateSinkResult(Map<String,
String> kvs) {
Map<String, String> actualKvs = new LinkedHashMap<>();
- // millisBehindLatest equals zero when record processing is caught up,
- // and there are no new records to process at this moment.
- // See
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
- Awaitility.await().until(() ->
addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0);
+ addMoreRecords(actualKvs, iterator);
assertEquals(actualKvs, kvs);
}
@SneakyThrows
- private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String>
kvs, String iterator) {
- final GetRecordsResponse response = client.getRecords(
- GetRecordsRequest
- .builder()
- .shardIterator(iterator)
- .build())
- .get();
- if(response.hasRecords()) {
- for (Record record : response.records()) {
- String data = record.data().asString(StandardCharsets.UTF_8);
- if (withSchema) {
- JsonNode payload = READER.readTree(data).at("/payload");
- String i = payload.at("/value/field1").asText();
- assertEquals(payload.at("/value/field2").asText(), "v2_" +
i);
- assertEquals(payload.at("/key/field1").asText(), "f1_" +
i);
- assertEquals(payload.at("/key/field2").asText(), "f2_" +
i);
- kvs.put(i, i);
- } else {
- kvs.put(record.partitionKey(), data);
+ private void parseRecordData(Map<String, String> actualKvs, String data,
String partitionKey) {
+ if (withSchema) {
+ JsonNode payload = READER.readTree(data).at("/payload");
+ String i = payload.at("/value/field1").asText();
+ assertEquals(payload.at("/value/field2").asText(), "v2_" + i);
+ assertEquals(payload.at("/key/field1").asText(), "f1_" + i);
+ assertEquals(payload.at("/key/field2").asText(), "f2_" + i);
+ actualKvs.put(i, i);
+ } else {
+ actualKvs.put(partitionKey, data);
+ }
+ }
+
+ @SneakyThrows
+ private void addMoreRecords(Map<String, String> actualKvs, String
iterator) {
+ GetRecordsResponse response;
+ List<KinesisClientRecord> aggRecords = new ArrayList<>();
+ do {
+ GetRecordsRequest request =
GetRecordsRequest.builder().shardIterator(iterator).build();
+ response = client.getRecords(request).get();
+ if (response.hasRecords()) {
+ for (Record record : response.records()) {
+ // KinesisSink uses KPL with aggregation enabled (by
default).
+ // However, due to the async state initialization of the
KPL internal ShardMap,
+ // the first sinked records might not be aggregated in
Kinesis.
+ // ref:
https://github.com/awslabs/amazon-kinesis-producer/issues/131
+ try {
+ String data =
record.data().asString(StandardCharsets.UTF_8);
+ parseRecordData(actualKvs, data,
record.partitionKey());
+ } catch (UncheckedIOException e) {
+ aggRecords.add(KinesisClientRecord.fromRecord(record));
+ }
}
}
+ iterator = response.nextShardIterator();
+ // millisBehindLatest equals zero when record processing is caught
up,
+ // and there are no new records to process at this moment.
+ // See
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
+ } while (response.millisBehindLatest() != 0);
+
+ for (KinesisClientRecord record : new
AggregatorUtil().deaggregate(aggRecords)) {
+ String data = new String(record.data().array(),
StandardCharsets.UTF_8);
+ parseRecordData(actualKvs, data, record.partitionKey());
Review Comment:
Removed. Created a separate pr, https://github.com/apache/pulsar/pull/15394
##########
pom.xml:
##########
@@ -77,8 +77,9 @@ flexible messaging model and an intuitive client
API.</description>
</issueManagement>
<properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
+ <pulsar.client.javac.release>8</pulsar.client.javac.release>
Review Comment:
updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]