akalash commented on a change in pull request #15628:
URL: https://github.com/apache/flink/pull/15628#discussion_r616442871
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1285,7 +1285,8 @@ public void triggerCheckpointBarrier(
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData =
- new CheckpointMetaData(checkpointID, checkpointTimestamp);
+ new CheckpointMetaData(
+ checkpointID, checkpointTimestamp,
System.currentTimeMillis());
Review comment:
In fact, it can be the old constructor with 2 parameters.
##########
File path: flink-end-to-end-tests/run-nightly-tests.sh
##########
@@ -250,12 +250,12 @@ fi
################################################################################
if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then #FLINK-21450
- run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap
false false" "skip_check_exceptions"
- run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap
false true" "skip_check_exceptions"
- run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks
false false" "skip_check_exceptions"
- run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks
true false" "skip_check_exceptions"
- run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks
false true" "skip_check_exceptions"
- run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks
true true" "skip_check_exceptions"
+ run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap
false false 100" "skip_check_exceptions"
+ run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap
false true 100" "skip_check_exceptions"
+ run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks
false false 100" "skip_check_exceptions"
+ run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks
true false 100" "skip_check_exceptions"
+ run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks
false true 100" "skip_check_exceptions"
+ run_test "Local recovery and sticky scheduling end-to-end test"
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks
true true 100" "skip_check_exceptions"
Review comment:
I actually don't fully get the idea here. As I understand, after adding
the delay the backpressure will be decreased. But won't it decrease the test
effectiveness? I mean right now it tests something under high pressure but
after this changes, the pressure will be decreased. Perhaps, it is ok because
it is not a stress test but it is exactly why I ask.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -711,4 +715,13 @@ private static OperatorSnapshotFutures
checkpointStreamOperator(
throw ex;
}
}
+
+ private void logCheckpointProcessingDelay(CheckpointMetaData
checkpointMetaData) {
Review comment:
can be static
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
##########
@@ -108,7 +108,10 @@ public long getCheckpointStartDelayNanos() {
protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier)
throws IOException {
CheckpointMetaData checkpointMetaData =
- new CheckpointMetaData(checkpointBarrier.getId(),
checkpointBarrier.getTimestamp());
+ new CheckpointMetaData(
+ checkpointBarrier.getId(),
+ checkpointBarrier.getTimestamp(),
+ System.currentTimeMillis());
Review comment:
it is the same about constructor
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -75,6 +75,8 @@
LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
+ private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS =
30_000;
Review comment:
Should it be connected somehow with failure timeout(or something similar
I don't know the exact name)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]