rkhachatryan commented on a change in pull request #15339:
URL: https://github.com/apache/flink/pull/15339#discussion_r602427936
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
##########
@@ -186,6 +189,10 @@ private void startMiniCluster() throws Exception {
configuration.setInteger(JobManagerOptions.PORT, 0);
configuration.setString(RestOptions.BIND_PORT, "0");
+ if (miniClusterResourceConfiguration.allowChangelogState()) {
+ randomize(configuration,
CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true, false);
+ }
Review comment:
I couldn't find how to disable this (e.g. via properties, as it is done
for Unaligned Checkpoints).
Is it possible?
##########
File path:
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
##########
@@ -74,30 +74,17 @@ public static void setAsContext(
TestStreamEnvironment env =
new TestStreamEnvironment(
miniCluster, parallelism, jarFiles,
classpaths);
- randomize(conf);
+ if (RANDOMIZE_CHECKPOINTING_CONFIG) {
+ randomize(
+ conf,
ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false);
Review comment:
Could you please elaborate why the randomization is done non-uniformly?
I guess for backend you need to update the configuration earlier. Does it
make sense to do the same for UC?
##########
File path:
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
##########
@@ -460,6 +460,7 @@ object ScalaShellITCase {
val _clusterResource = new MiniClusterResource(new
MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(parallelism)
+ .disallowChangelogState()
Review comment:
(Optional)
I think it would be helpful to add (as a comment) the reason why it is
disallowed in each case.
##########
File path: flink-connectors/flink-connector-base/pom.xml
##########
@@ -64,5 +64,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-statebackend-changelog_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
Review comment:
I'm wondering wether it makes sense to add this dependency at higher
level (like `flink-connectors`). Top-level pom.xml` will not work I guess as it
will be a recursive dependency.
Motivation:
1. tests for components will not suddenly fail
2. less duplication
WDYT?
cc: @AHeise
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
##########
@@ -132,14 +158,20 @@ public Builder withHaLeadershipControl() {
return this;
}
+ public Builder disallowChangelogState() {
+ this.allowChangelogState = false;
+ return this;
+ }
Review comment:
nit:
IIUC, disallowing can also be achieved by setting a configuration option
like this:
```
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(new
Configuration().set(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, false))
```
This would be more flexible IMO.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]