This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 282b6f3  KAFKA-10658 ErrantRecordReporter.report always return 
completed futur… (#9525)
282b6f3 is described below

commit 282b6f31e7b63f2d5c90d7e20ce48ec097d70d3a
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Thu Jan 28 14:12:54 2021 +0800

    KAFKA-10658 ErrantRecordReporter.report always return completed futur… 
(#9525)
    
    Reviewers: Konstantine Karantasis <[email protected]>
---
 .../connect/runtime/errors/ProcessingContext.java  |  5 +-
 .../runtime/errors/ProcessingContextTest.java      | 55 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
index 0ddf894..b49c93c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -25,7 +25,6 @@ import org.apache.kafka.connect.source.SourceRecord;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -149,8 +148,8 @@ class ProcessingContext implements AutoCloseable {
 
         List<Future<RecordMetadata>> futures = reporters.stream()
                 .map(r -> r.report(this))
-                .filter(Future::isDone)
-                .collect(Collectors.toCollection(LinkedList::new));
+                .filter(f -> !f.isDone())
+                .collect(Collectors.toList());
         if (futures.isEmpty()) {
             return CompletableFuture.completedFuture(null);
         }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
new file mode 100644
index 0000000..3e5e082
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ProcessingContextTest {
+
+    @Test
+    public void testReportWithSingleReporter() {
+        testReport(1);
+    }
+
+    @Test
+    public void testReportWithMultipleReporters() {
+        testReport(2);
+    }
+
+    private void testReport(int numberOfReports) {
+        ProcessingContext context = new ProcessingContext();
+        List<CompletableFuture<RecordMetadata>> fs = IntStream.range(0, 
numberOfReports).mapToObj(i -> new 
CompletableFuture<RecordMetadata>()).collect(Collectors.toList());
+        context.reporters(IntStream.range(0, numberOfReports).mapToObj(i -> 
(ErrorReporter) c -> fs.get(i)).collect(Collectors.toList()));
+        Future<Void> result = context.report();
+        fs.forEach(f -> {
+            assertFalse(result.isDone());
+            f.complete(new RecordMetadata(new TopicPartition("t", 0), 0, 0, 0, 
0L, 0, 0));
+        });
+        assertTrue(result.isDone());
+    }
+}

Reply via email to