This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dfb827a38bc [FLINK-35060][Test/Connector] Provide compatibility of old
CheckpointMode for connector testing framework
dfb827a38bc is described below
commit dfb827a38bc81fe4610cd0c88c66b8d5da1c0147
Author: Zakelly <[email protected]>
AuthorDate: Thu Apr 11 15:30:07 2024 +0800
[FLINK-35060][Test/Connector] Provide compatibility of old CheckpointMode
for connector testing framework
---
.../flink/streaming/api/CheckpointingMode.java | 12 +++++++++++
.../external/sink/TestingSinkSettings.java | 25 +++++++++++++++++++---
.../external/source/TestingSourceSettings.java | 25 +++++++++++++++++++---
.../testframe/junit/annotations/TestSemantics.java | 3 ++-
.../extensions/ConnectorTestingExtension.java | 24 ++++++++++++++++++++-
.../utils/UnorderedCollectIteratorAssert.java | 13 +++++++++++
6 files changed, 94 insertions(+), 8 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
index 17e20dbe037..8c311b6bc71 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
@@ -90,4 +90,16 @@ public enum CheckpointingMode {
throw new IllegalArgumentException("Unsupported semantic: " +
semantic);
}
}
+
+ public static org.apache.flink.streaming.api.CheckpointingMode
convertFromCheckpointingMode(
+ org.apache.flink.core.execution.CheckpointingMode semantic) {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ return
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+ case AT_LEAST_ONCE:
+ return
org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+ default:
+ throw new IllegalArgumentException("Unsupported semantic: " +
semantic);
+ }
+ }
}
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java
index 2494b814aa2..d392576e252 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java
@@ -20,6 +20,8 @@ package org.apache.flink.connector.testframe.external.sink;
import org.apache.flink.core.execution.CheckpointingMode;
+import static
org.apache.flink.streaming.api.CheckpointingMode.convertFromCheckpointingMode;
+import static
org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Settings for configuring the sink under testing. */
@@ -34,9 +36,14 @@ public class TestingSinkSettings {
this.checkpointingMode = checkpointingMode;
}
- /** Checkpointing mode required for the sink. */
- public CheckpointingMode getCheckpointingMode() {
- return checkpointingMode;
+ /**
+ * Checkpointing mode required for the sink. This method is required for
downstream projects
+ * e.g. Flink connectors extending this test for the case when there
should be supported Flink
+ * versions below 1.20. Could be removed together with dropping support
for Flink 1.19.
+ */
+ @Deprecated
+ public org.apache.flink.streaming.api.CheckpointingMode
getCheckpointingMode() {
+ return convertFromCheckpointingMode(checkpointingMode);
}
/** Builder class for {@link TestingSinkSettings}. */
@@ -48,6 +55,18 @@ public class TestingSinkSettings {
return this;
}
+ /**
+ * This method is required for downstream projects e.g. Flink
connectors extending this test
+ * for the case when there should be supported Flink versions below
1.20. Could be removed
+ * together with dropping support for Flink 1.19.
+ */
+ @Deprecated
+ public Builder setCheckpointingMode(
+ org.apache.flink.streaming.api.CheckpointingMode
checkpointingMode) {
+ this.checkpointingMode =
convertToCheckpointingMode(checkpointingMode);
+ return this;
+ }
+
public TestingSinkSettings build() {
sanityCheck();
return new TestingSinkSettings(checkpointingMode);
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java
index 2d1626bc619..433ab27f8ad 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java
@@ -21,6 +21,8 @@ package org.apache.flink.connector.testframe.external.source;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.core.execution.CheckpointingMode;
+import static
org.apache.flink.streaming.api.CheckpointingMode.convertFromCheckpointingMode;
+import static
org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Settings for configuring the source under testing. */
@@ -37,9 +39,14 @@ public class TestingSourceSettings {
return boundedness;
}
- /** Checkpointing mode required for the source. */
- public CheckpointingMode getCheckpointingMode() {
- return checkpointingMode;
+ /**
+ * Checkpointing mode required for the source. This method is required for
downstream projects
+ * e.g. Flink connectors extending this test for the case when there
should be supported Flink
+ * versions below 1.20. Could be removed together with dropping support
for Flink 1.19.
+ */
+ @Deprecated
+ public org.apache.flink.streaming.api.CheckpointingMode
getCheckpointingMode() {
+ return convertFromCheckpointingMode(checkpointingMode);
}
private TestingSourceSettings(Boundedness boundedness, CheckpointingMode
checkpointingMode) {
@@ -62,6 +69,18 @@ public class TestingSourceSettings {
return this;
}
+ /**
+ * This method is required for downstream projects e.g. Flink
connectors extending this test
+ * for the case when there should be supported Flink versions below
1.20. Could be removed
+ * together with dropping support for Flink 1.19.
+ */
+ @Deprecated
+ public Builder setCheckpointingMode(
+ org.apache.flink.streaming.api.CheckpointingMode
checkpointingMode) {
+ this.checkpointingMode =
convertToCheckpointingMode(checkpointingMode);
+ return this;
+ }
+
public TestingSourceSettings build() {
sanityCheck();
return new TestingSourceSettings(boundedness, checkpointingMode);
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
index 291db078403..ece506ac199 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
@@ -27,7 +27,8 @@ import java.lang.annotation.Target;
/**
* Marks the field in test class defining supported semantic: {@link
- * org.apache.flink.core.execution.CheckpointingMode}.
+ * org.apache.flink.core.execution.CheckpointingMode} and {@link
+ * org.apache.flink.streaming.api.CheckpointingMode} (deprecated).
*
* <p>Only one field can be annotated in test class.
*/
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
index 7e1e1a8c06c..ebe9f077d3a 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
@@ -36,6 +36,7 @@ import org.junit.platform.commons.support.AnnotationSupport;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -97,11 +98,32 @@ public class ConnectorTestingExtension implements
BeforeAllCallback, AfterAllCal
}
// Store supported semantic
- final List<CheckpointingMode[]> semantics =
+ List<CheckpointingMode[]> semantics =
AnnotationSupport.findAnnotatedFieldValues(
context.getRequiredTestInstance(),
TestSemantics.class,
CheckpointingMode[].class);
+ // Fallback part start.
+ // This is for compatibility of
org.apache.flink.streaming.api.CheckpointingMode, which can
+ // be removed if we drop the support of 1.19 and old CheckpointingMode.
+ final List<org.apache.flink.streaming.api.CheckpointingMode[]>
fallbackSemantics =
+ AnnotationSupport.findAnnotatedFieldValues(
+ context.getRequiredTestInstance(),
+ TestSemantics.class,
+
org.apache.flink.streaming.api.CheckpointingMode[].class);
+ if (!fallbackSemantics.isEmpty()) {
+ semantics = new ArrayList<>(semantics);
+ }
+ for (org.apache.flink.streaming.api.CheckpointingMode[] oldModes :
fallbackSemantics) {
+ semantics.add(
+ Arrays.stream(oldModes)
+ .sequential()
+ .map(
+
org.apache.flink.streaming.api.CheckpointingMode
+ ::convertToCheckpointingMode)
+ .toArray(CheckpointingMode[]::new));
+ }
+ // Fallback part ends.
checkExactlyOneAnnotatedField(semantics, TestSemantics.class);
context.getStore(TEST_RESOURCE_NAMESPACE)
.put(SUPPORTED_SEMANTIC_STORE_KEY, semantics.get(0));
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
index 8fb4a3804a8..3d4945b3719 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
@@ -29,6 +29,7 @@ import java.util.Set;
import static java.util.stream.Collectors.toSet;
import static
org.apache.flink.shaded.guava31.com.google.common.base.Predicates.not;
+import static
org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -58,6 +59,18 @@ public class UnorderedCollectIteratorAssert<T>
return this;
}
+ /**
+ * This method is required for downstream projects e.g. Flink connectors
extending this test for
+ * the case when there should be supported Flink versions below 1.20.
Could be removed together
+ * with dropping support for Flink 1.19.
+ */
+ @Deprecated
+ public void matchesRecordsFromSource(
+ List<List<T>> recordsBySplitsFromSource,
+ org.apache.flink.streaming.api.CheckpointingMode semantic) {
+ matchesRecordsFromSource(recordsBySplitsFromSource,
convertToCheckpointingMode(semantic));
+ }
+
public void matchesRecordsFromSource(
List<List<T>> recordsBySplitsFromSource, CheckpointingMode
semantic) {
for (List<T> list : recordsBySplitsFromSource) {