heesung-sn commented on code in PR #15264:
URL: https://github.com/apache/pulsar/pull/15264#discussion_r861945568


##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java:
##########
@@ -189,38 +194,56 @@ private void internalValidateSinkResult(Map<String, 
String> kvs) {
 
         Map<String, String> actualKvs = new LinkedHashMap<>();
 
-        // millisBehindLatest equals zero when record processing is caught up,
-        // and there are no new records to process at this moment.
-        // See 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
-        Awaitility.await().until(() -> 
addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0);
+        addMoreRecords(actualKvs, iterator);
 
         assertEquals(actualKvs, kvs);
     }
 
     @SneakyThrows
-    private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String> 
kvs, String iterator) {
-        final GetRecordsResponse response = client.getRecords(
-                GetRecordsRequest
-                        .builder()
-                        .shardIterator(iterator)
-                        .build())
-                .get();
-        if(response.hasRecords()) {
-            for (Record record : response.records()) {
-                String data = record.data().asString(StandardCharsets.UTF_8);
-                if (withSchema) {
-                    JsonNode payload = READER.readTree(data).at("/payload");
-                    String i = payload.at("/value/field1").asText();
-                    assertEquals(payload.at("/value/field2").asText(), "v2_" + 
i);
-                    assertEquals(payload.at("/key/field1").asText(), "f1_" + 
i);
-                    assertEquals(payload.at("/key/field2").asText(), "f2_" + 
i);
-                    kvs.put(i, i);
-                } else {
-                    kvs.put(record.partitionKey(), data);
+    private void parseRecordData(Map<String, String> actualKvs, String data, 
String partitionKey) {
+        if (withSchema) {
+            JsonNode payload = READER.readTree(data).at("/payload");
+            String i = payload.at("/value/field1").asText();
+            assertEquals(payload.at("/value/field2").asText(), "v2_" + i);
+            assertEquals(payload.at("/key/field1").asText(), "f1_" + i);
+            assertEquals(payload.at("/key/field2").asText(), "f2_" + i);
+            actualKvs.put(i, i);
+        } else {
+            actualKvs.put(partitionKey, data);
+        }
+    }
+
+    @SneakyThrows
+    private void addMoreRecords(Map<String, String> actualKvs, String 
iterator) {
+        GetRecordsResponse response;
+        List<KinesisClientRecord> aggRecords = new ArrayList<>();
+        do {
+            GetRecordsRequest request = 
GetRecordsRequest.builder().shardIterator(iterator).build();
+            response = client.getRecords(request).get();
+            if (response.hasRecords()) {
+                for (Record record : response.records()) {
+                    // KinesisSink uses KPL with aggregation enabled (by 
default).
+                    // However, due to the async state initialization of the 
KPL internal ShardMap,
+                    // the first sinked records might not be aggregated in 
Kinesis.
+                    // ref: 
https://github.com/awslabs/amazon-kinesis-producer/issues/131
+                    try {
+                        String data = 
record.data().asString(StandardCharsets.UTF_8);
+                        parseRecordData(actualKvs, data, 
record.partitionKey());
+                    } catch (UncheckedIOException e) {
+                        aggRecords.add(KinesisClientRecord.fromRecord(record));
+                    }
                 }
             }
+            iterator = response.nextShardIterator();
+            // millisBehindLatest equals zero when record processing is caught 
up,
+            // and there are no new records to process at this moment.
+            // See 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
+        } while (response.millisBehindLatest() != 0);
+
+        for (KinesisClientRecord record : new 
AggregatorUtil().deaggregate(aggRecords)) {
+            String data = new String(record.data().array(), 
StandardCharsets.UTF_8);
+            parseRecordData(actualKvs, data, record.partitionKey());

Review Comment:
   Removed. Created a separate pr, https://github.com/apache/pulsar/pull/15394



##########
pom.xml:
##########
@@ -77,8 +77,9 @@ flexible messaging model and an intuitive client 
API.</description>
   </issueManagement>
 
   <properties>
-    <maven.compiler.source>8</maven.compiler.source>
-    <maven.compiler.target>8</maven.compiler.target>
+    <maven.compiler.source>17</maven.compiler.source>
+    <maven.compiler.target>17</maven.compiler.target>
+    <pulsar.client.javac.release>8</pulsar.client.javac.release>

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to