This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 09358d1fead500169a887c81a81afc26f5aef315 Author: Fabian Paul <[email protected]> AuthorDate: Wed May 4 11:29:08 2022 +0200 [FLINK-27486][tests] Fix archunit test violations in connector-base module --- .../84abeb9c-8355-4165-96aa-dda65b04e5e7 | 2 +- .../8ab2328f-b38b-4e34-b768-0deb6b6171fb | 12 --------- .../connector/base/sink/AsyncSinkBaseITCase.java | 31 +++++++++++++++------- .../reader/CoordinatedSourceRescaleITCase.java | 14 +++++++++- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 index 202fdf191ca..7c90aa63637 100644 --- a/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 +++ b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 @@ -3,4 +3,4 @@ org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkITCase does not satisf * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ - or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule \ No newline at end of file diff --git a/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb b/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb index 03ba5016523..e69de29bb2d 100644 --- a/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb +++ b/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb @@ -1,12 +0,0 @@ -org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: only one of the following predicates match:\ -* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ -* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ -* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ -* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ - or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule -org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase does not satisfy: only one of the following predicates match:\ -* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ -* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ -* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ -* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ - or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule \ No newline at end of file diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java index 1236c72a2c4..d780b85df57 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java @@ -20,16 +20,29 @@ package org.apache.flink.connector.base.sink; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration tests of a baseline generic sink that implements the AsyncSinkBase. */ +@ExtendWith(TestLoggerExtension.class) public class AsyncSinkBaseITCase { + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @Test @@ -43,13 +56,11 @@ public class AsyncSinkBaseITCase { env.fromSequence(999_999, 1_000_100) .map(Object::toString) .sinkTo(new ArrayListAsyncSink(1, 1, 2, 10, 1000, 10)); - Exception e = - assertThrows( - JobExecutionException.class, - () -> env.execute("Integration Test: AsyncSinkBaseITCase")); - assertEquals( - "Intentional error on persisting 1_000_000 to ArrayListDestination", - e.getCause().getCause().getMessage()); + assertThatThrownBy(() -> env.execute("Integration Test: AsyncSinkBaseITCase")) + .isInstanceOf(JobExecutionException.class) + .getRootCause() + .hasMessageContaining( + "Intentional error on persisting 1_000_000 to ArrayListDestination"); } @Test diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java index 0120fee224f..3a23ba8681c 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java @@ -26,12 +26,15 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -55,6 +58,14 @@ public class CoordinatedSourceRescaleITCase extends TestLogger { public static final String CREATED_CHECKPOINT = "successfully created checkpoint"; public static final String RESTORED_CHECKPOINT = "successfully restored checkpoint"; + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(7) + .build()); + @Rule public final TemporaryFolder temp = new TemporaryFolder(); @Test @@ -115,7 +126,8 @@ public class CoordinatedSourceRescaleITCase extends TestLogger { conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, p); final StreamExecutionEnvironment env = - StreamExecutionEnvironment.createLocalEnvironment(p, conf); + StreamExecutionEnvironment.getExecutionEnvironment(conf); + env.setParallelism(p); env.enableCheckpointing(100); env.getCheckpointConfig() .setExternalizedCheckpointCleanup(
