pnowojski commented on a change in pull request #11868:
URL: https://github.com/apache/flink/pull/11868#discussion_r442099032



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
##########
@@ -40,6 +40,11 @@ import scala.collection.mutable
 @RunWith(classOf[Parameterized])
 class OverWindowITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
 
+  @Before
+  def setupEnv(): Unit = {
+    env.getCheckpointConfig.disableUnalignedCheckpoints()

Review comment:
       Add a comment and link to a jira ticket why?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -958,10 +960,10 @@ private void configureCheckpointing() {
                                
.setMinPauseBetweenCheckpoints(cfg.getMinPauseBetweenCheckpoints())
                                
.setMaxConcurrentCheckpoints(cfg.getMaxConcurrentCheckpoints())
                                
.setCheckpointRetentionPolicy(retentionAfterTermination)
-                               .setExactlyOnce(getCheckpointingMode(cfg) == 
CheckpointingMode.EXACTLY_ONCE)
+                               .setExactlyOnce(exactlyOnce)
                                
.setPreferCheckpointForRecovery(cfg.isPreferCheckpointForRecovery())
                                
.setTolerableCheckpointFailureNumber(cfg.getTolerableCheckpointFailureNumber())
-                               
.setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled())
+                               .setUnalignedCheckpointsEnabled(exactlyOnce && 
cfg.isUnalignedCheckpointsEnabled())

Review comment:
       This check seems duplicated `exactlyOnce && 
cfg.isUnalignedCheckpointsEnabled()` in two places. Maybe hide `exactlyOnce &&` 
check inside `isUnalignedCheckpointsEnabled`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -500,8 +500,9 @@ private void setVertexConfig(Integer vertexID, StreamConfig 
config,
 
                config.setStateBackend(streamGraph.getStateBackend());
                
config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled());
-               
config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
                config.setCheckpointMode(getCheckpointingMode(checkpointCfg));
+               
config.setUnalignedCheckpointsEnabled(config.getCheckpointMode() == 
CheckpointingMode.EXACTLY_ONCE &&

Review comment:
       Isn't this an independent bug fix?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
##########
@@ -414,9 +417,19 @@ public void enableUnalignedCheckpoints() {
        }
 
        /**
-        * Returns whether checkpoints should be persisted externally.
+        * Disables unaligned checkpoints.
         *
-        * @return <code>true</code> if checkpoints should be externalized.
+        * @see #enableUnalignedCheckpoints()
+        */
+       @PublicEvolving
+       public void disableUnalignedCheckpoints() {

Review comment:
       Do we really need this method? Isn't 
`enableUnalignedCheckpoints(false);` enough? `enableUnalignedCheckpoints()` 
also is kind of redundant but probably more common.
   
   For another `boolean` property we have just a single setter 
`setPreferCheckpointForRecovery`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to