This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3b2a54a15f9cbab0a98ece4a5f83ff518dc693f
Author: davidliu <[email protected]>
AuthorDate: Tue Jul 26 12:48:07 2022 +0800

    [FLINK-27536][Connectors / Common] Rename method parameter in 
AsyncSinkWriter
---
 .../base/sink/writer/AsyncSinkWriter.java          | 41 +++++++++++-----------
 .../base/sink/writer/AsyncSinkWriterTest.java      | 34 +++++++++---------
 2 files changed, 36 insertions(+), 39 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 090504ab8bc..faab5889ad0 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -17,15 +17,6 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.util.Preconditions;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayDeque;
@@ -36,6 +27,14 @@ import java.util.Deque;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.function.Consumer;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.Preconditions;
 
 /**
  * A generic sink writer that handles the general behaviour of a sink such as 
batching and flushing,
@@ -170,14 +169,14 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
      * the valid limits of the destination). The logic then needs to create 
and execute the request
      * asynchronously against the destination (ideally by batching together 
multiple request entries
      * to increase efficiency). The logic also needs to identify individual 
request entries that
-     * were not persisted successfully and resubmit them using the {@code 
requestResult} callback.
+     * were not persisted successfully and resubmit them using the {@code 
requestToRetry} callback.
      *
      * <p>From a threading perspective, the mailbox thread will call this 
method and initiate the
      * asynchronous request to persist the {@code requestEntries}. NOTE: The 
client must support
      * asynchronous requests and the method called to persist the records must 
asynchronously
      * execute and return a future with the results of that request. A thread 
from the destination
      * client thread pool should complete the request and submit the failed 
entries that should be
-     * retried. The {@code requestResult} will then trigger the mailbox thread 
to requeue the
+     * retried. The {@code requestToRetry} will then trigger the mailbox 
thread to requeue the
      * unsuccessful elements.
      *
      * <p>An example implementation of this method is included:
@@ -185,15 +184,15 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
      * <pre>{@code
      * @Override
      * protected void submitRequestEntries
-     *   (List<RequestEntryT> records, Consumer<Collection<RequestEntryT>> 
requestResult) {
+     *   (List<RequestEntryT> records, Consumer<Collection<RequestEntryT>> 
requestToRetry) {
      *     Future<Response> response = destinationClient.putRecords(records);
      *     response.whenComplete(
      *         (response, error) -> {
      *             if(error){
      *                 List<RequestEntryT> retryableFailedRecords = 
getRetryableFailed(response);
-     *                 requestResult.accept(retryableFailedRecords);
+     *                 requestToRetry.accept(retryableFailedRecords);
      *             }else{
-     *                 requestResult.accept(Collections.emptyList());
+     *                 requestToRetry.accept(Collections.emptyList());
      *             }
      *         }
      *     );
@@ -205,14 +204,14 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
      * requests.
      *
      * @param requestEntries a set of request entries that should be sent to 
the destination
-     * @param requestResult the {@code accept} method should be called on this 
Consumer once the
+     * @param requestToRetry the {@code accept} method should be called on 
this Consumer once the
      *     processing of the {@code requestEntries} are complete. Any entries 
that encountered
-     *     difficulties in persisting should be re-queued through {@code 
requestResult} by including
-     *     that element in the collection of {@code RequestEntryT}s passed to 
the {@code accept}
-     *     method. All other elements are assumed to have been successfully 
persisted.
+     *     difficulties in persisting should be re-queued through {@code 
requestToRetry} by
+     *     including that element in the collection of {@code RequestEntryT}s 
passed to the {@code
+     *     accept} method. All other elements are assumed to have been 
successfully persisted.
      */
     protected abstract void submitRequestEntries(
-            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> 
requestResult);
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> 
requestToRetry);
 
     /**
      * This method allows the getting of the size of a {@code RequestEntryT} 
in bytes. The size in
@@ -381,7 +380,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT 
extends Serializable
         }
 
         long timestampOfRequest = System.currentTimeMillis();
-        Consumer<List<RequestEntryT>> requestResult =
+        Consumer<List<RequestEntryT>> requestToRetry =
                 failedRequestEntries ->
                         mailboxExecutor.execute(
                                 () ->
@@ -394,7 +393,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT 
extends Serializable
 
         inFlightRequestsCount++;
         inFlightMessages += batchSize;
-        submitRequestEntries(batch, requestResult);
+        submitRequestEntries(batch, requestToRetry);
     }
 
     /**
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index c8535bfcef1..52f38242953 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -17,12 +17,11 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -37,12 +36,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-
-import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assertions.fail;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 /**
  * Unit Tests the functionality of AsyncSinkWriter without any assumptions of 
what a concrete
@@ -1059,12 +1057,12 @@ public class AsyncSinkWriterTest {
          * <p>A limitation of this basic implementation is that each element 
written must be unique.
          *
          * @param requestEntries a set of request entries that should be 
persisted to {@code res}
-         * @param requestResult a Consumer that needs to accept a collection 
of failure elements
+         * @param requestToRetry a Consumer that needs to accept a collection 
of failure elements
          *     once all request entries have been persisted
          */
         @Override
         protected void submitRequestEntries(
-                List<Integer> requestEntries, Consumer<List<Integer>> 
requestResult) {
+                List<Integer> requestEntries, Consumer<List<Integer>> 
requestToRetry) {
             maybeDelay();
 
             if (requestEntries.stream().anyMatch(val -> val > 100 && val <= 
200)) {
@@ -1087,10 +1085,10 @@ public class AsyncSinkWriterTest {
 
                 requestEntries.removeAll(firstTimeFailed);
                 res.addAll(requestEntries);
-                requestResult.accept(firstTimeFailed);
+                requestToRetry.accept(firstTimeFailed);
             } else {
                 res.addAll(requestEntries);
-                requestResult.accept(new ArrayList<>());
+                requestToRetry.accept(new ArrayList<>());
             }
         }
 
@@ -1239,7 +1237,7 @@ public class AsyncSinkWriterTest {
 
         @Override
         protected void submitRequestEntries(
-                List<Integer> requestEntries, Consumer<List<Integer>> 
requestResult) {
+                List<Integer> requestEntries, Consumer<List<Integer>> 
requestToRetry) {
             if (requestEntries.size() == 3) {
                 try {
                     delayedStartLatch.countDown();
@@ -1258,7 +1256,7 @@ public class AsyncSinkWriterTest {
             }
 
             res.addAll(requestEntries);
-            requestResult.accept(new ArrayList<>());
+            requestToRetry.accept(new ArrayList<>());
         }
     }
 }

Reply via email to