akalash commented on a change in pull request #15628:
URL: https://github.com/apache/flink/pull/15628#discussion_r616442871



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1285,7 +1285,8 @@ public void triggerCheckpointBarrier(
 
         final AbstractInvokable invokable = this.invokable;
         final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointID, checkpointTimestamp);
+                new CheckpointMetaData(
+                        checkpointID, checkpointTimestamp, 
System.currentTimeMillis());

Review comment:
       In fact, it can be the old constructor with 2 parameters.

##########
File path: flink-end-to-end-tests/run-nightly-tests.sh
##########
@@ -250,12 +250,12 @@ fi
 
################################################################################
 
 if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then #FLINK-21450
-       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap 
false false" "skip_check_exceptions"
-       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap 
false true" "skip_check_exceptions"
-       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
false false" "skip_check_exceptions"
-       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
true false" "skip_check_exceptions"
-       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
false true" "skip_check_exceptions"
-       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
true true" "skip_check_exceptions"
+       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap 
false false 100" "skip_check_exceptions"
+       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap 
false true 100" "skip_check_exceptions"
+       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
false false 100" "skip_check_exceptions"
+       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
true false 100" "skip_check_exceptions"
+       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
false true 100" "skip_check_exceptions"
+       run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
true true 100" "skip_check_exceptions"

Review comment:
       I actually don't fully get the idea here. As I understand,  after adding 
the delay the backpressure will be decreased. But won't it decrease the test 
effectiveness? I mean right now it tests something under high pressure but 
after this changes, the pressure will be decreased. Perhaps, it is ok because 
it is not a stress test but it is exactly why I ask.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -711,4 +715,13 @@ private static OperatorSnapshotFutures 
checkpointStreamOperator(
             throw ex;
         }
     }
+
+    private void logCheckpointProcessingDelay(CheckpointMetaData 
checkpointMetaData) {

Review comment:
       can be static

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.java
##########
@@ -108,7 +108,10 @@ public long getCheckpointStartDelayNanos() {
 
     protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier) 
throws IOException {
         CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointBarrier.getId(), 
checkpointBarrier.getTimestamp());
+                new CheckpointMetaData(
+                        checkpointBarrier.getId(),
+                        checkpointBarrier.getTimestamp(),
+                        System.currentTimeMillis());

Review comment:
       it is the same about constructor

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -75,6 +75,8 @@
             LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
     private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
 
+    private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 
30_000;

Review comment:
       Should it be connected somehow with failure timeout(or something similar 
I don't know the exact name)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to