kristoffSC commented on code in PR #21052:
URL: https://github.com/apache/flink/pull/21052#discussion_r997534738
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java:
##########
@@ -397,6 +400,54 @@ public void endOfInput() {
}
}
+ /**
+ * {@link GlobalCommitter} implementation that can throw on exception when
processing
+ * checkpoint. Exception is thrown once.
+ */
+ public static class FailOnCommitGlobalCommitter extends
DefaultGlobalCommitter {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FailOnCommitGlobalCommitter.class);
+
+ // local counter used to decide if exception should be thrown. This
field has to be static,
+ // since we want to keep its value after Flink's local cluster
recovery.
+ private static int globalCheckpointCounter = 0;
+
+ private final int commitNumberToFailOn;
+
+ /**
+ * Creates instance of FailOnCommitGlobalCommitter.
+ *
+ * @param checkpointNumberToFailOn number of checkpoints after which
exception should be
+ * thrown.
+ * @param queueSupplier queueSupplier.
+ */
+ public FailOnCommitGlobalCommitter(
+ int checkpointNumberToFailOn, Supplier<Queue<String>>
queueSupplier) {
+ super(queueSupplier);
+ this.commitNumberToFailOn = checkpointNumberToFailOn;
+ }
+
+ @Override
+ public List<String> commit(List<String> committables) {
+ LOG.info(
+ "Commit number "
+ + globalCheckpointCounter
+ + ", committables size "
+ + committables.size());
+ try {
+ if (globalCheckpointCounter == commitNumberToFailOn) {
+ throw new RuntimeException(
+ "GlobalCommitter Desired Exception on checkpoint "
+ + globalCheckpointCounter);
+ }
+ } finally {
+ globalCheckpointCounter++;
Review Comment:
Thanks,
its changed now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]