lhotari commented on code in PR #15264:
URL: https://github.com/apache/pulsar/pull/15264#discussion_r861616726


##########
README.md:
##########
@@ -83,10 +83,27 @@ components in the Pulsar ecosystem, including connectors, 
adapters, and other la
 
 - [Pulsar CI](https://github.com/apache/pulsar-test-infra)
 
+## Pulsar Runtime Java Version Recommendation
+
+pulsar ver 2.11  >=
+
+| Pulsar Components | Java Version  |
+| ----------------- |:-------------:|
+| Broker            | 17            |
+| CLI               | 17            |
+| Java Client       | 8 or 11 or 17 |
+
+pulsar ver 2.11 <
+
+| Pulsar Components | Java Version  |
+| ----------------- |:-------------:|
+| All               | 8 or 11       |

Review Comment:
   For Broker, the recommended runtime environment has been Java 11 since 
Pulsar 2.8.0 release.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java:
##########
@@ -34,7 +34,7 @@
  * ManagedLedgerInfo compression configuration test.
  */
 
-@Test(groups = {"broker", "broker-jdk8"})
+@Test(groups = {"broker", "broker-jdk17"})

Review Comment:
   just remove it. The reason why this specific test was marked for broker-jdk8 
was that there's a change between Java 8 and Java 11 that showed up in this 
test. It's now obsolete, so remove the broker-jdk8 group completely.



##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java:
##########
@@ -189,38 +194,56 @@ private void internalValidateSinkResult(Map<String, 
String> kvs) {
 
         Map<String, String> actualKvs = new LinkedHashMap<>();
 
-        // millisBehindLatest equals zero when record processing is caught up,
-        // and there are no new records to process at this moment.
-        // See 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
-        Awaitility.await().until(() -> 
addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0);
+        addMoreRecords(actualKvs, iterator);
 
         assertEquals(actualKvs, kvs);
     }
 
     @SneakyThrows
-    private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String> 
kvs, String iterator) {
-        final GetRecordsResponse response = client.getRecords(
-                GetRecordsRequest
-                        .builder()
-                        .shardIterator(iterator)
-                        .build())
-                .get();
-        if(response.hasRecords()) {
-            for (Record record : response.records()) {
-                String data = record.data().asString(StandardCharsets.UTF_8);
-                if (withSchema) {
-                    JsonNode payload = READER.readTree(data).at("/payload");
-                    String i = payload.at("/value/field1").asText();
-                    assertEquals(payload.at("/value/field2").asText(), "v2_" + 
i);
-                    assertEquals(payload.at("/key/field1").asText(), "f1_" + 
i);
-                    assertEquals(payload.at("/key/field2").asText(), "f2_" + 
i);
-                    kvs.put(i, i);
-                } else {
-                    kvs.put(record.partitionKey(), data);
+    private void parseRecordData(Map<String, String> actualKvs, String data, 
String partitionKey) {
+        if (withSchema) {
+            JsonNode payload = READER.readTree(data).at("/payload");
+            String i = payload.at("/value/field1").asText();
+            assertEquals(payload.at("/value/field2").asText(), "v2_" + i);
+            assertEquals(payload.at("/key/field1").asText(), "f1_" + i);
+            assertEquals(payload.at("/key/field2").asText(), "f2_" + i);
+            actualKvs.put(i, i);
+        } else {
+            actualKvs.put(partitionKey, data);
+        }
+    }
+
+    @SneakyThrows
+    private void addMoreRecords(Map<String, String> actualKvs, String 
iterator) {
+        GetRecordsResponse response;
+        List<KinesisClientRecord> aggRecords = new ArrayList<>();
+        do {
+            GetRecordsRequest request = 
GetRecordsRequest.builder().shardIterator(iterator).build();
+            response = client.getRecords(request).get();
+            if (response.hasRecords()) {
+                for (Record record : response.records()) {
+                    // KinesisSink uses KPL with aggregation enabled (by 
default).
+                    // However, due to the async state initialization of the 
KPL internal ShardMap,
+                    // the first sinked records might not be aggregated in 
Kinesis.
+                    // ref: 
https://github.com/awslabs/amazon-kinesis-producer/issues/131
+                    try {
+                        String data = 
record.data().asString(StandardCharsets.UTF_8);
+                        parseRecordData(actualKvs, data, 
record.partitionKey());
+                    } catch (UncheckedIOException e) {
+                        aggRecords.add(KinesisClientRecord.fromRecord(record));
+                    }
                 }
             }
+            iterator = response.nextShardIterator();
+            // millisBehindLatest equals zero when record processing is caught 
up,
+            // and there are no new records to process at this moment.
+            // See 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
+        } while (response.millisBehindLatest() != 0);
+
+        for (KinesisClientRecord record : new 
AggregatorUtil().deaggregate(aggRecords)) {
+            String data = new String(record.data().array(), 
StandardCharsets.UTF_8);
+            parseRecordData(actualKvs, data, record.partitionKey());

Review Comment:
   Please remove this from this PR. Submit as a separate PR.



##########
build/run_unit_group.sh:
##########
@@ -78,8 +78,8 @@ function test_group_broker_client_impl() {
   mvn_test -pl pulsar-broker -Dgroups='broker-impl'
 }
 
-function test_group_broker_jdk8() {
-  mvn_test -pl pulsar-broker -Dgroups='broker-jdk8' 
-Dpulsar.allocator.pooled=true
+function test_group_broker_jdk17() {

Review Comment:
   > but it might be useful in the future when we start supporting jdk18+
   
   YAGNI. I think it's better to not go so far ahead. We can add that solution 
whenever that happens. 



##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java:
##########
@@ -189,38 +194,56 @@ private void internalValidateSinkResult(Map<String, 
String> kvs) {
 
         Map<String, String> actualKvs = new LinkedHashMap<>();
 
-        // millisBehindLatest equals zero when record processing is caught up,
-        // and there are no new records to process at this moment.
-        // See 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
-        Awaitility.await().until(() -> 
addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0);
+        addMoreRecords(actualKvs, iterator);

Review Comment:
   Please remove it from this change. Submit it as a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to