dannycranmer commented on a change in pull request #17244:
URL: https://github.com/apache/flink/pull/17244#discussion_r707277820



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -118,12 +137,25 @@
     protected abstract void submitRequestEntries(
             List<RequestEntryT> requestEntries, 
Consumer<Collection<RequestEntryT>> requestResult);
 
+    /**
+     * This method allows the getting of the size of a {@code RequestEntryT} 
in bytes. The size in
+     * this case is measured as the total bytes that is written to the 
destination as a result of
+     * persisting this particular {@code RequestEntryT} rather than the 
serialized length (which may
+     * be the same).
+     *
+     * @param requestEntry the requestEntry for which we want to know the size
+     * @return the size of the requestEntry, as defined previously

Review comment:
       Would be worth converting the return type to `long` incase `int` 
overflows

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -139,21 +171,40 @@ public AsyncSinkWriter(
         this.maxBatchSize = maxBatchSize;
         this.maxInFlightRequests = maxInFlightRequests;
         this.maxBufferedRequests = maxBufferedRequests;
+        this.flushOnBufferSizeMB = flushOnBufferSizeMB;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;

Review comment:
       Can you add sanity validations for all the numeric inputs? Check for 
min/max etc

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -95,6 +99,21 @@
      */
     private int inFlightRequestsCount;
 
+    /**
+     * Tracks the cumulative size of all elements in {@code 
bufferedRequestEntries} to facilitate
+     * the criterion for flushing after {@code flushOnBufferSizeMB} is reached.
+     */
+    private double bufferedRequestEntriesTotalSizeMB;
+
+    /**
+     * Tracks the size of each element in {@code bufferedRequestEntries}. The 
sizes are stored in MB
+     * and the position in the deque reflects the position of the 
corresponding element in {@code
+     * bufferedRequestEntries}.
+     */
+    private final Deque<Double> bufferedRequestEntriesSizeMB = new 
ArrayDeque<>();

Review comment:
       Same as above, why `Double`?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -95,6 +99,21 @@
      */
     private int inFlightRequestsCount;
 
+    /**
+     * Tracks the cumulative size of all elements in {@code 
bufferedRequestEntries} to facilitate
+     * the criterion for flushing after {@code flushOnBufferSizeMB} is reached.
+     */
+    private double bufferedRequestEntriesTotalSizeMB;

Review comment:
       Why `double`? Are we expecting fractional values? Is it worth using 
`long` and bytes instead of MB for more flexibility?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -199,7 +252,32 @@ private void flush() throws InterruptedException {
      */
     private void completeRequest(Collection<RequestEntryT> 
failedRequestEntries) {
         inFlightRequestsCount--;
-        failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
+        failedRequestEntries.forEach(failedEntry -> 
addEntryToBuffer(failedEntry, true));
+    }
+
+    private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
+        if (bufferedRequestEntries.isEmpty() && !existsActiveTimerCallback) {
+            registerCallback();
+        }
+        if (insertAtHead) {
+            bufferedRequestEntries.addFirst(entry);
+        } else {
+            bufferedRequestEntries.add(entry);
+        }
+
+        double requestEntrySizeMB = getSizeInMB(entry);
+
+        if (insertAtHead) {
+            bufferedRequestEntriesSizeMB.addFirst(requestEntrySizeMB);
+        } else {
+            bufferedRequestEntriesSizeMB.add(requestEntrySizeMB);
+        }
+
+        bufferedRequestEntriesTotalSizeMB += requestEntrySizeMB;
+    }
+
+    private double getSizeInMB(RequestEntryT requestEntry) {
+        return getSizeInBytes(requestEntry) / (double) BYTES_IN_MB;

Review comment:
       Yeah I think we should just deal with `bytes` then we do not need to do 
any scaling. Also it removes the ambiguity around MB and MiB.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -199,7 +252,32 @@ private void flush() throws InterruptedException {
      */
     private void completeRequest(Collection<RequestEntryT> 
failedRequestEntries) {
         inFlightRequestsCount--;
-        failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
+        failedRequestEntries.forEach(failedEntry -> 
addEntryToBuffer(failedEntry, true));
+    }
+
+    private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
+        if (bufferedRequestEntries.isEmpty() && !existsActiveTimerCallback) {
+            registerCallback();
+        }
+        if (insertAtHead) {
+            bufferedRequestEntries.addFirst(entry);
+        } else {
+            bufferedRequestEntries.add(entry);
+        }
+
+        double requestEntrySizeMB = getSizeInMB(entry);
+
+        if (insertAtHead) {
+            bufferedRequestEntriesSizeMB.addFirst(requestEntrySizeMB);
+        } else {
+            bufferedRequestEntriesSizeMB.add(requestEntrySizeMB);
+        }
+
+        bufferedRequestEntriesTotalSizeMB += requestEntrySizeMB;
+    }

Review comment:
       You are maintaining 2 queues and keeping them in sync. This is a complex 
model and prone to error with items becoming unsynchronised while evolving the 
implementation. Instead of  having 2x `Deque` I suggest you implement a wrapper 
than contains the record and size, then use a single queue, for example:
   
   ```
   public class RequestEntryWrapper<T> {
       private final T requestEntry;
       private final long size;
   
       ...
   }
   ```
   
   Then just have:
   
   ```
   private Deque<RequestEntryWrapper> theSingleQueue;
   ```

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -139,21 +171,40 @@ public AsyncSinkWriter(
         this.maxBatchSize = maxBatchSize;
         this.maxInFlightRequests = maxInFlightRequests;
         this.maxBufferedRequests = maxBufferedRequests;
+        this.flushOnBufferSizeMB = flushOnBufferSizeMB;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;
+
+        this.inFlightRequestsCount = 0;
+        this.bufferedRequestEntriesTotalSizeMB = 0;
+    }
+
+    private void registerCallback() {
+        Sink.ProcessingTimeService.ProcessingTimeCallback ptc =
+                instant -> {

Review comment:
       Which thread is this callback executed in? Is there a chance of race 
conditions with `existsActiveTimerCallback`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to