This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09358d1fead500169a887c81a81afc26f5aef315
Author: Fabian Paul <[email protected]>
AuthorDate: Wed May 4 11:29:08 2022 +0200

    [FLINK-27486][tests] Fix archunit test violations in connector-base module
---
 .../84abeb9c-8355-4165-96aa-dda65b04e5e7           |  2 +-
 .../8ab2328f-b38b-4e34-b768-0deb6b6171fb           | 12 ---------
 .../connector/base/sink/AsyncSinkBaseITCase.java   | 31 +++++++++++++++-------
 .../reader/CoordinatedSourceRescaleITCase.java     | 14 +++++++++-
 4 files changed, 35 insertions(+), 24 deletions(-)

diff --git 
a/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
 
b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
index 202fdf191ca..7c90aa63637 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
+++ 
b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
@@ -3,4 +3,4 @@ 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkITCase does not satisf
 * reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
+ or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
\ No newline at end of file
diff --git 
a/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
 
b/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
index 03ba5016523..e69de29bb2d 100644
--- 
a/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
+++ 
b/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
@@ -1,12 +0,0 @@
-org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: 
only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
-org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase 
does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
\ No newline at end of file
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
index 1236c72a2c4..d780b85df57 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
@@ -20,16 +20,29 @@ package org.apache.flink.connector.base.sink;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Integration tests of a baseline generic sink that implements the 
AsyncSinkBase. */
+@ExtendWith(TestLoggerExtension.class)
 public class AsyncSinkBaseITCase {
 
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
     final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
     @Test
@@ -43,13 +56,11 @@ public class AsyncSinkBaseITCase {
         env.fromSequence(999_999, 1_000_100)
                 .map(Object::toString)
                 .sinkTo(new ArrayListAsyncSink(1, 1, 2, 10, 1000, 10));
-        Exception e =
-                assertThrows(
-                        JobExecutionException.class,
-                        () -> env.execute("Integration Test: 
AsyncSinkBaseITCase"));
-        assertEquals(
-                "Intentional error on persisting 1_000_000 to 
ArrayListDestination",
-                e.getCause().getCause().getMessage());
+        assertThatThrownBy(() -> env.execute("Integration Test: 
AsyncSinkBaseITCase"))
+                .isInstanceOf(JobExecutionException.class)
+                .getRootCause()
+                .hasMessageContaining(
+                        "Intentional error on persisting 1_000_000 to 
ArrayListDestination");
     }
 
     @Test
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
index 0120fee224f..3a23ba8681c 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
@@ -26,12 +26,15 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -55,6 +58,14 @@ public class CoordinatedSourceRescaleITCase extends 
TestLogger {
     public static final String CREATED_CHECKPOINT = "successfully created 
checkpoint";
     public static final String RESTORED_CHECKPOINT = "successfully restored 
checkpoint";
 
+    @ClassRule
+    public static final MiniClusterWithClientResource MINI_CLUSTER =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(7)
+                            .build());
+
     @Rule public final TemporaryFolder temp = new TemporaryFolder();
 
     @Test
@@ -115,7 +126,8 @@ public class CoordinatedSourceRescaleITCase extends 
TestLogger {
         conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, p);
 
         final StreamExecutionEnvironment env =
-                StreamExecutionEnvironment.createLocalEnvironment(p, conf);
+                StreamExecutionEnvironment.getExecutionEnvironment(conf);
+        env.setParallelism(p);
         env.enableCheckpointing(100);
         env.getCheckpointConfig()
                 .setExternalizedCheckpointCleanup(

Reply via email to