This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 282b6f3 KAFKA-10658 ErrantRecordReporter.report always return
completed futur… (#9525)
282b6f3 is described below
commit 282b6f31e7b63f2d5c90d7e20ce48ec097d70d3a
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Thu Jan 28 14:12:54 2021 +0800
KAFKA-10658 ErrantRecordReporter.report always return completed futur…
(#9525)
Reviewers: Konstantine Karantasis <[email protected]>
---
.../connect/runtime/errors/ProcessingContext.java | 5 +-
.../runtime/errors/ProcessingContextTest.java | 55 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 3 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
index 0ddf894..b49c93c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -25,7 +25,6 @@ import org.apache.kafka.connect.source.SourceRecord;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -149,8 +148,8 @@ class ProcessingContext implements AutoCloseable {
List<Future<RecordMetadata>> futures = reporters.stream()
.map(r -> r.report(this))
- .filter(Future::isDone)
- .collect(Collectors.toCollection(LinkedList::new));
+ .filter(f -> !f.isDone())
+ .collect(Collectors.toList());
if (futures.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
new file mode 100644
index 0000000..3e5e082
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ProcessingContextTest {
+
+ @Test
+ public void testReportWithSingleReporter() {
+ testReport(1);
+ }
+
+ @Test
+ public void testReportWithMultipleReporters() {
+ testReport(2);
+ }
+
+ private void testReport(int numberOfReports) {
+ ProcessingContext context = new ProcessingContext();
+ List<CompletableFuture<RecordMetadata>> fs = IntStream.range(0,
numberOfReports).mapToObj(i -> new
CompletableFuture<RecordMetadata>()).collect(Collectors.toList());
+ context.reporters(IntStream.range(0, numberOfReports).mapToObj(i ->
(ErrorReporter) c -> fs.get(i)).collect(Collectors.toList()));
+ Future<Void> result = context.report();
+ fs.forEach(f -> {
+ assertFalse(result.isDone());
+ f.complete(new RecordMetadata(new TopicPartition("t", 0), 0, 0, 0,
0L, 0, 0));
+ });
+ assertTrue(result.isDone());
+ }
+}