rkhachatryan commented on a change in pull request #15339:
URL: https://github.com/apache/flink/pull/15339#discussion_r602427936



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
##########
@@ -186,6 +189,10 @@ private void startMiniCluster() throws Exception {
         configuration.setInteger(JobManagerOptions.PORT, 0);
         configuration.setString(RestOptions.BIND_PORT, "0");
 
+        if (miniClusterResourceConfiguration.allowChangelogState()) {
+            randomize(configuration, 
CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true, false);
+        }

Review comment:
       I couldn't find how to disable this (e.g. via properties, as it is done 
for Unaligned Checkpoints).
   Is it possible?

##########
File path: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
##########
@@ -74,30 +74,17 @@ public static void setAsContext(
                     TestStreamEnvironment env =
                             new TestStreamEnvironment(
                                     miniCluster, parallelism, jarFiles, 
classpaths);
-                    randomize(conf);
+                    if (RANDOMIZE_CHECKPOINTING_CONFIG) {
+                        randomize(
+                                conf, 
ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false);

Review comment:
       Could you please elaborate why the randomization is done non-uniformly?
   I guess for backend you need to update the configuration earlier. Does it 
make sense to do the same for UC?

##########
File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
##########
@@ -460,6 +460,7 @@ object ScalaShellITCase {
 
   val _clusterResource = new MiniClusterResource(new 
MiniClusterResourceConfiguration.Builder()
       .setNumberSlotsPerTaskManager(parallelism)
+      .disallowChangelogState()

Review comment:
       (Optional) 
   I think it would be helpful to add (as a comment) the reason why it is 
disallowed in each case.

##########
File path: flink-connectors/flink-connector-base/pom.xml
##########
@@ -64,5 +64,12 @@
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-statebackend-changelog_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>

Review comment:
       I'm wondering wether it makes sense to add this dependency at higher 
level (like `flink-connectors`). Top-level pom.xml` will not work I guess as it 
will be a recursive dependency.
   
   Motivation:
   1. tests for components will not suddenly fail
   2. less duplication
   
   WDYT? 
   
   cc: @AHeise 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
##########
@@ -132,14 +158,20 @@ public Builder withHaLeadershipControl() {
             return this;
         }
 
+        public Builder disallowChangelogState() {
+            this.allowChangelogState = false;
+            return this;
+        }

Review comment:
       nit:
   IIUC, disallowing can also be achieved by setting a configuration option 
like this:
   ```
   new MiniClusterResourceConfiguration.Builder()
       .setConfiguration(new 
Configuration().set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, false))
   ```
   This would be more flexible IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to