dannycranmer commented on a change in pull request #18945:
URL: https://github.com/apache/flink/pull/18945#discussion_r816850728
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -504,6 +506,16 @@ private void yieldIfThereExistsInFlightRequests() throws
InterruptedException {
}
}
+ /**
+ * This method is called before a snapshot is requested. All inflight
requests should be
+ * completed to capture failed requests in the snapshot.
+ */
+ @Override
+ public Collection<Void> prepareCommit() throws InterruptedException {
+ yieldIfThereExistsInFlightRequests();
Review comment:
This should be done in a loop? Looks like this only yields once
https://github.com/vahmed-hamdy/flink/blob/0ad125ee07e6030ac1505cfac1ed72349e1b7d9f/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L503
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -504,6 +506,16 @@ private void yieldIfThereExistsInFlightRequests() throws
InterruptedException {
}
}
+ /**
+ * This method is called before a snapshot is requested. All inflight
requests should be
+ * completed to capture failed requests in the snapshot.
+ */
+ @Override
+ public Collection<Void> prepareCommit() throws InterruptedException {
+ yieldIfThereExistsInFlightRequests();
Review comment:
Based on this, looks like there is a gap in testing. Can you add a unit
test to address this gap
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -133,7 +133,7 @@ public void sinkToAllowBatchSizesEqualToByteWiseLimit()
}
@Test
- public void
testPreparingCommitAtSnapshotTimeEnsuresBufferedRecordsArePersistedToDestination()
+ public void
testFlushingAtEndEnsuresBufferedRecordsArePersistedToDestination()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink =
Review comment:
Looks like you have repurposed existing tests rather than adding new
ones? Are the existing tests no longer valid? Why did you modify them rather
than adding new ones?
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -51,7 +52,8 @@
*/
@PublicEvolving
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends
Serializable>
- implements StatefulSink.StatefulSinkWriter<InputT,
BufferedRequestState<RequestEntryT>> {
+ implements StatefulSink.StatefulSinkWriter<InputT,
BufferedRequestState<RequestEntryT>>,
+ TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, Void> {
Review comment:
Did you verify that Flink actually calls prepareCommit for non
`TwoPhaseCommittingSink` Sinks?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]