rkhachatryan commented on a change in pull request #16102:
URL: https://github.com/apache/flink/pull/16102#discussion_r649138609
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -98,75 +123,122 @@ public void testIgnoreInFlightDataDuringRecovery() throws
Exception {
// then: Actual result should be less than the ideal result because
some of data was
// ignored.
- assertThat(SumFailSink.result, lessThan(resultWithoutIgnoringData));
+ assertThat(result.get().longValue(),
lessThan(resultWithoutIgnoringData));
// and: Actual result should be equal to sum of result before fail +
source value after
// recovery.
- long expectedResult = SumFailSink.resultBeforeFail +
sourceValueAfterRestore;
- assertEquals(expectedResult, SumFailSink.result);
+ long expectedResult = resultBeforeFail.get().longValue() +
sourceValueAfterRestore;
+ assertEquals(expectedResult, result.get().longValue());
}
private static class SumFailSink implements SinkFunction<Integer>,
CheckpointedFunction {
- public static long result;
- public static long resultBeforeFail;
+ private final SharedReference<OneShotLatch> checkpointReachSinkLatch;
+ private final SharedReference<AtomicLong> resultBeforeFail;
+ private final SharedReference<AtomicLong> result;
+
+ public SumFailSink(
+ SharedReference<OneShotLatch> checkpointReachSinkLatch,
+ SharedReference<AtomicLong> resultBeforeFail,
+ SharedReference<AtomicLong> result) {
+ this.checkpointReachSinkLatch = checkpointReachSinkLatch;
+ this.resultBeforeFail = resultBeforeFail;
+ this.result = result;
+ }
@Override
- public void invoke(Integer value) throws Exception {
- result += value;
+ public void invoke(Integer value, Context context) throws Exception {
+ result.get().addAndGet(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- resultBeforeFail = result;
+ if (resultBeforeFail.get().longValue() == 0) {
+ resultBeforeFail.get().set(result.get().longValue());
+ sinkCheckpointStarted();
+ }
}
@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
- result = resultBeforeFail;
+ result.get().set(resultBeforeFail.get().longValue());
+ }
+
+ /**
+ * Allow to send data from the awaited map in this case if number of
waiters more than 0, we
+ * can be sure that in-flight data exists(at least the data which is
processing by waiters
+ * during the waiting will be sent to the sink before the checkpoint
barrier would be
+ * handled).
+ */
+ public void sinkCheckpointStarted() {
+ checkpointReachSinkLatch.get().trigger();
}
}
private static class NumberSource implements SourceFunction<Integer>,
CheckpointedFunction {
private static final long serialVersionUID = 1L;
+ private final SharedReference<AtomicInteger> lastCheckpointValue;
private ListState<Integer> valueState;
- public static int lastCheckpointedValue;
+ private boolean isRunning = true;
+
+ public NumberSource(SharedReference<AtomicInteger>
lastCheckpointValue) {
+ this.lastCheckpointValue = lastCheckpointValue;
+ }
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
Iterator<Integer> stateIt = valueState.get().iterator();
boolean isRecovered = stateIt.hasNext();
if (isRecovered) {
- Integer lastValue = stateIt.next();
+ synchronized (ctx.getCheckpointLock()) {
+ Integer lastValue = stateIt.next();
- // Checking that ListState is recovered correctly.
- assertEquals(lastCheckpointedValue, lastValue.intValue());
+ // Checking that ListState is recovered correctly.
+ assertEquals(lastCheckpointValue.get().intValue(),
lastValue.intValue());
- // if it is started after recovery, just send one more value
and finish.
- ctx.collect(lastValue + 1);
+ // if it is started after recovery, just send one more
value and finish.
+ ctx.collect(lastValue + 1);
+ }
} else {
int next = 0;
- while (true) {
+ while (isRunning) {
synchronized (ctx.getCheckpointLock()) {
- next++;
- valueState.update(singletonList(next));
- ctx.collect(next);
+ // Emit data by batches to reduce the probability that
before the first
+ // checkpoint will be generated not enough data.
+ do {
+ next++;
+ valueState.update(singletonList(next));
+ ctx.collect(next);
+ } while (next % 50 != 0 && isRunning);
}
+
+ // Avoid the huge backpressure.
+ LockSupport.parkNanos(100000);
}
}
}
@Override
- public void cancel() {}
+ public void cancel() {
+ isRunning = false;
+ }
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- if (lastCheckpointedValue > 0) {
+ if (lastCheckpointValue.get().get() > 0) {
throw new RuntimeException("Error during snapshot");
}
- lastCheckpointedValue = valueState.get().iterator().next();
+ Integer state = valueState.get().iterator().next();
+
+ if (state < PARALLELISM) {
+ // Try to restart task.
+ throw new RuntimeException(
+ "Not enough data to guarantee the in-flight data were
generated before the first checkpoint");
Review comment:
This will fail with NoSuchElementException (the effect is the same, but
less obvious).
And I think the allowed number of restarts should be increased (maybe
infinite - it depends on how soon the source will start producing records vs
checkpoint is triggered).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]