This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 249b2818d5e2c6c59b57e722706c5f29ba169712
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Aug 12 17:58:43 2021 +0800

    [FLINK-19554][connector/testing-framework] Connector testing framework 
utilities based on JUnit 5
---
 .../DefaultContainerizedExternalSystem.java        | 108 +++++++++++
 .../junit/annotations/ExternalContextFactory.java  |  43 +++++
 .../common/junit/annotations/ExternalSystem.java   |  39 ++++
 .../test/common/junit/annotations/TestEnv.java     |  40 ++++
 .../extensions/ConnectorTestingExtension.java      | 144 +++++++++++++++
 .../TestCaseInvocationContextProvider.java         | 203 +++++++++++++++++++++
 .../junit/extensions/TestLoggerExtension.java      |  83 +++++++++
 7 files changed, 660 insertions(+)

diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/DefaultContainerizedExternalSystem.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/DefaultContainerizedExternalSystem.java
new file mode 100644
index 0000000..ef71c8e
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/DefaultContainerizedExternalSystem.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.external;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.connectors.test.common.TestResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+
+/**
+ * Default implementation of external system based on container.
+ *
+ * @param <C> Type of underlying container
+ */
+@Experimental
+public class DefaultContainerizedExternalSystem<C extends GenericContainer<C>>
+        implements TestResource {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DefaultContainerizedExternalSystem.class);
+
+    private final C container;
+
+    /**
+     * Get a builder for {@link DefaultContainerizedExternalSystem}.
+     *
+     * @param <C> Type of underlying container
+     * @return An instance of builder
+     */
+    public static <C extends GenericContainer<C>> Builder<C> builder() {
+        return new Builder<>();
+    }
+
+    private DefaultContainerizedExternalSystem(C container) {
+        this.container = container;
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        if (container.isRunning()) {
+            return;
+        }
+        container.start();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (!container.isRunning()) {
+            return;
+        }
+        container.stop();
+    }
+
+    public C getContainer() {
+        return this.container;
+    }
+
+    /**
+     * Builder for {@link DefaultContainerizedExternalSystem}.
+     *
+     * @param <C> Type of underlying container
+     */
+    public static class Builder<C extends GenericContainer<C>> {
+
+        private C container;
+        private GenericContainer<?> flinkContainer;
+
+        public <T extends GenericContainer<T>> Builder<T> fromContainer(T 
container) {
+            @SuppressWarnings("unchecked")
+            Builder<T> self = (Builder<T>) this;
+            self.container = container;
+            return self;
+        }
+
+        public Builder<C> bindWithFlinkContainer(GenericContainer<?> 
flinkContainer) {
+            this.flinkContainer = flinkContainer;
+            
container.dependsOn(flinkContainer).withNetwork(flinkContainer.getNetwork());
+            return this;
+        }
+
+        public DefaultContainerizedExternalSystem<C> build() {
+            if (flinkContainer == null) {
+                LOG.warn(
+                        "External system container is not bound with Flink 
container. "
+                                + "This might lead to network isolation 
between external system and Flink");
+            }
+            return new DefaultContainerizedExternalSystem<>(container);
+        }
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/ExternalContextFactory.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/ExternalContextFactory.java
new file mode 100644
index 0000000..0b34555
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/ExternalContextFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.junit.annotations;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks the field in test class defining a {@link ExternalContext.Factory} 
for constructing {@link
+ * ExternalContext} before invocation of each test case.
+ *
+ * <p>Multiple fields can be annotated as external context factory, and these 
external contexts will
+ * be provided as different parameters of test cases.
+ *
+ * <p>The lifecycle of a {@link ExternalContext} will be PER-CASE, which means 
an instance of {@link
+ * ExternalContext} will be constructed before invocation of each test case, 
and closed right after
+ * the execution of the case for isolation between test cases.
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface ExternalContextFactory {}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/ExternalSystem.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/ExternalSystem.java
new file mode 100644
index 0000000..d6f9569
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/ExternalSystem.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.junit.annotations;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks the field in test class defining external system.
+ *
+ * <p>Only one field can be annotated as external system in test class.
+ *
+ * <p>The lifecycle of external system will be PER-CLASS for performance, 
because launching and
+ * tearing down external system could be relatively a heavy operation.
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface ExternalSystem {}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/TestEnv.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/TestEnv.java
new file mode 100644
index 0000000..574c87a
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/annotations/TestEnv.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.junit.annotations;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks the field in test class defining {@link TestEnvironment}.
+ *
+ * <p>Only one field can be annotated as test environment in a test class.
+ *
+ * <p>The lifecycle of {@link TestEnvironment} will be PER-CLASS for 
performance, because launching
+ * and tearing down Flink cluster could be relatively a heavy operation.
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface TestEnv {}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/ConnectorTestingExtension.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/ConnectorTestingExtension.java
new file mode 100644
index 0000000..6652de7
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/ConnectorTestingExtension.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.junit.extensions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connectors.test.common.TestResource;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
+import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
+import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.platform.commons.support.AnnotationSupport;
+
+import java.lang.annotation.Annotation;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A JUnit 5 {@link Extension} for supporting running of connector testing 
framework.
+ *
+ * <p>This extension is responsible for searching test resources annotated by 
{@link TestEnv},
+ * {@link ExternalSystem} and {@link ExternalContextFactory}, storing them 
into storage provided by
+ * JUnit, and manage lifecycle of these resources.
+ *
+ * <p>The extension uses {@link ExtensionContext.Store} for handing over test 
resources to {@link
+ * TestCaseInvocationContextProvider}, which will inject these resources into 
test cases as
+ * parameters.
+ *
+ * <p>The order of initialization is promised to be:
+ *
+ * <ol>
+ *   <li>Test environment annotated by {@link TestEnv}, before all test cases 
in this extension
+ *   <li>External system annotated by {@link ExternalSystem}, before all test 
cases in this
+ *       extension
+ *   <li>External contexts annotated by {@link ExternalContextFactory}, before 
each test case in
+ *       {@link TestCaseInvocationContextProvider}
+ * </ol>
+ */
+@Internal
+public class ConnectorTestingExtension implements BeforeAllCallback, 
AfterAllCallback {
+
+    public static final ExtensionContext.Namespace TEST_RESOURCE_NAMESPACE =
+            ExtensionContext.Namespace.create("testResourceNamespace");
+    public static final String TEST_ENV_STORE_KEY = "testEnvironment";
+    public static final String EXTERNAL_SYSTEM_STORE_KEY = "externalSystem";
+    public static final String EXTERNAL_CONTEXT_FACTORIES_STORE_KEY = 
"externalContext";
+
+    private TestEnvironment testEnvironment;
+    private TestResource externalSystem;
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void beforeAll(ExtensionContext context) throws Exception {
+
+        // Setup test environment and store
+        final List<TestEnvironment> testEnvironments =
+                AnnotationSupport.findAnnotatedFieldValues(
+                        context.getRequiredTestInstance(), TestEnv.class, 
TestEnvironment.class);
+        checkExactlyOneAnnotatedField(testEnvironments, TestEnv.class);
+        testEnvironment = testEnvironments.get(0);
+        testEnvironment.startUp();
+        context.getStore(TEST_RESOURCE_NAMESPACE).put(TEST_ENV_STORE_KEY, 
testEnvironment);
+
+        // Setup external system and store
+        final List<TestResource> externalSystems =
+                AnnotationSupport.findAnnotatedFieldValues(
+                        context.getRequiredTestInstance(),
+                        ExternalSystem.class,
+                        TestResource.class);
+        checkExactlyOneAnnotatedField(externalSystems, ExternalSystem.class);
+        externalSystem = externalSystems.get(0);
+        externalSystem.startUp();
+        
context.getStore(TEST_RESOURCE_NAMESPACE).put(EXTERNAL_SYSTEM_STORE_KEY, 
externalSystem);
+
+        // Search external context factories
+        final List<ExternalContext.Factory> externalContextFactories =
+                AnnotationSupport.findAnnotatedFieldValues(
+                        context.getRequiredTestInstance(),
+                        ExternalContextFactory.class,
+                        ExternalContext.Factory.class);
+        checkAtLeastOneAnnotationField(externalContextFactories, 
ExternalContextFactory.class);
+        context.getStore(TEST_RESOURCE_NAMESPACE)
+                .put(EXTERNAL_CONTEXT_FACTORIES_STORE_KEY, 
externalContextFactories);
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        // Tear down test environment
+        testEnvironment.tearDown();
+
+        // Tear down external system
+        externalSystem.tearDown();
+
+        // Clear store
+        context.getStore(TEST_RESOURCE_NAMESPACE).remove(TEST_ENV_STORE_KEY);
+        
context.getStore(TEST_RESOURCE_NAMESPACE).remove(EXTERNAL_SYSTEM_STORE_KEY);
+        
context.getStore(TEST_RESOURCE_NAMESPACE).remove(EXTERNAL_CONTEXT_FACTORIES_STORE_KEY);
+    }
+
+    private void checkExactlyOneAnnotatedField(
+            Collection<?> fields, Class<? extends Annotation> annotation) {
+        if (fields.size() > 1) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Multiple fields are annotated with '@%s'",
+                            annotation.getSimpleName()));
+        }
+        if (fields.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "No fields are annotated with '@%s'", 
annotation.getSimpleName()));
+        }
+    }
+
+    private void checkAtLeastOneAnnotationField(
+            Collection<?> fields, Class<? extends Annotation> annotation) {
+        if (fields.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "No fields are annotated with '@%s'", 
annotation.getSimpleName()));
+        }
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/TestCaseInvocationContextProvider.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/TestCaseInvocationContextProvider.java
new file mode 100644
index 0000000..b28dc94
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/TestCaseInvocationContextProvider.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.junit.extensions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension.EXTERNAL_CONTEXT_FACTORIES_STORE_KEY;
+import static 
org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension.TEST_ENV_STORE_KEY;
+import static 
org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension.TEST_RESOURCE_NAMESPACE;
+
+/**
+ * A helper class for injecting test resources into test case as parameters.
+ *
+ * <p>This provider will resolve {@link TestEnvironment} and {@link 
ExternalContext.Factory} from
+ * the storage in JUnit's {@link ExtensionContext}, inject them into test 
method, and register a
+ * {@link AfterTestExecutionCallback} for closing the external context after 
the execution of test
+ * case.
+ */
+@Internal
+public class TestCaseInvocationContextProvider implements 
TestTemplateInvocationContextProvider {
+
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context) {
+        // Only support test cases with TestEnvironment and ExternalContext as 
parameter
+        return 
Arrays.stream(context.getRequiredTestMethod().getParameterTypes())
+                .anyMatch(
+                        (type) ->
+                                TestEnvironment.class.isAssignableFrom(type)
+                                        || 
ExternalContext.class.isAssignableFrom(type));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Stream<TestTemplateInvocationContext> 
provideTestTemplateInvocationContexts(
+            ExtensionContext context) {
+
+        // Fetch test environment from store
+        TestEnvironment testEnv =
+                context.getStore(TEST_RESOURCE_NAMESPACE)
+                        .get(TEST_ENV_STORE_KEY, TestEnvironment.class);
+
+        // Fetch external context factories from store
+        List<ExternalContext.Factory<?>> externalContextFactories =
+                (List<ExternalContext.Factory<?>>)
+                        context.getStore(TEST_RESOURCE_NAMESPACE)
+                                .get(EXTERNAL_CONTEXT_FACTORIES_STORE_KEY);
+
+        // Create a invocation context for each external context factory
+        return externalContextFactories.stream()
+                .map(
+                        factory ->
+                                new TestResourceProvidingInvocationContext(
+                                        testEnv, 
factory.createExternalContext()));
+    }
+
+    /**
+     * Invocation context for injecting {@link TestEnvironment} and {@link 
ExtensionContext} into
+     * test cases as method parameters.
+     */
+    static class TestResourceProvidingInvocationContext implements 
TestTemplateInvocationContext {
+
+        private final TestEnvironment testEnvironment;
+        private final ExternalContext<?> externalContext;
+
+        public TestResourceProvidingInvocationContext(
+                TestEnvironment testEnvironment, ExternalContext<?> 
externalContext) {
+            this.testEnvironment = testEnvironment;
+            this.externalContext = externalContext;
+        }
+
+        @Override
+        public String getDisplayName(int invocationIndex) {
+            return String.format(
+                    "TestEnvironment: [%s], ExternalContext: [%s]",
+                    testEnvironment, externalContext);
+        }
+
+        @Override
+        public List<Extension> getAdditionalExtensions() {
+            return Arrays.asList(
+                    // Extension for injecting parameters
+                    new TestEnvironmentResolver(testEnvironment),
+                    new ExternalContextProvider(externalContext),
+                    new ClusterControllableProvider(testEnvironment),
+                    // Extension for closing external context
+                    (AfterTestExecutionCallback) ignore -> 
externalContext.close());
+        }
+    }
+
+    private static class TestEnvironmentResolver implements ParameterResolver {
+
+        private final TestEnvironment testEnvironment;
+
+        private TestEnvironmentResolver(TestEnvironment testEnvironment) {
+            this.testEnvironment = testEnvironment;
+        }
+
+        @Override
+        public boolean supportsParameter(
+                ParameterContext parameterContext, ExtensionContext 
extensionContext)
+                throws ParameterResolutionException {
+            return isAssignableFromParameterType(
+                    TestEnvironment.class, 
parameterContext.getParameter().getType());
+        }
+
+        @Override
+        public Object resolveParameter(
+                ParameterContext parameterContext, ExtensionContext 
extensionContext)
+                throws ParameterResolutionException {
+            return this.testEnvironment;
+        }
+    }
+
+    private static class ExternalContextProvider implements ParameterResolver {
+
+        private final ExternalContext<?> externalContext;
+
+        private ExternalContextProvider(ExternalContext<?> externalContext) {
+            this.externalContext = externalContext;
+        }
+
+        @Override
+        public boolean supportsParameter(
+                ParameterContext parameterContext, ExtensionContext 
extensionContext)
+                throws ParameterResolutionException {
+            return isAssignableFromParameterType(
+                    ExternalContext.class, 
parameterContext.getParameter().getType());
+        }
+
+        @Override
+        public Object resolveParameter(
+                ParameterContext parameterContext, ExtensionContext 
extensionContext)
+                throws ParameterResolutionException {
+            return this.externalContext;
+        }
+    }
+
+    private static class ClusterControllableProvider implements 
ParameterResolver {
+
+        private final TestEnvironment testEnvironment;
+
+        private ClusterControllableProvider(TestEnvironment testEnvironment) {
+            this.testEnvironment = testEnvironment;
+        }
+
+        @Override
+        public boolean supportsParameter(
+                ParameterContext parameterContext, ExtensionContext 
extensionContext)
+                throws ParameterResolutionException {
+            return isAssignableFromParameterType(
+                            ClusterControllable.class, 
parameterContext.getParameter().getType())
+                    && isTestEnvironmentControllable(this.testEnvironment);
+        }
+
+        @Override
+        public Object resolveParameter(
+                ParameterContext parameterContext, ExtensionContext 
extensionContext)
+                throws ParameterResolutionException {
+            return testEnvironment;
+        }
+
+        private boolean isTestEnvironmentControllable(TestEnvironment 
testEnvironment) {
+            return 
ClusterControllable.class.isAssignableFrom(testEnvironment.getClass());
+        }
+    }
+
+    private static boolean isAssignableFromParameterType(
+            Class<?> requiredType, Class<?> parameterType) {
+        return requiredType.isAssignableFrom(parameterType);
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/TestLoggerExtension.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/TestLoggerExtension.java
new file mode 100644
index 0000000..aaac2de
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/junit/extensions/TestLoggerExtension.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.junit.extensions;
+
+import org.apache.flink.annotation.Internal;
+
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+/** A JUnit-5-style test logger. */
+@Internal
+public class TestLoggerExtension implements TestWatcher, BeforeEachCallback {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestLoggerExtension.class);
+
+    @Override
+    public void beforeEach(ExtensionContext context) {
+        LOG.info(
+                
"\n================================================================================"
+                        + "\n🧪 Test {}.{} is running."
+                        + 
"\n--------------------------------------------------------------------------------",
+                context.getRequiredTestClass().getCanonicalName(),
+                context.getRequiredTestMethod().getName());
+    }
+
+    @Override
+    public void testSuccessful(ExtensionContext context) {
+        LOG.info(
+                
"\n--------------------------------------------------------------------------------"
+                        + "\nTest {}.{} successfully run."
+                        + 
"\n================================================================================",
+                context.getRequiredTestClass().getCanonicalName(),
+                context.getRequiredTestMethod().getName());
+    }
+
+    @Override
+    public void testFailed(ExtensionContext context, Throwable cause) {
+        LOG.error(
+                
"\n--------------------------------------------------------------------------------"
+                        + "\nTest {}.{} failed with:\n{}"
+                        + 
"\n================================================================================",
+                context.getRequiredTestClass().getCanonicalName(),
+                context.getRequiredTestMethod().getName(),
+                exceptionToString(cause));
+    }
+
+    private static String exceptionToString(Throwable t) {
+        if (t == null) {
+            return "(null)";
+        }
+
+        try {
+            StringWriter stm = new StringWriter();
+            PrintWriter wrt = new PrintWriter(stm);
+            t.printStackTrace(wrt);
+            wrt.close();
+            return stm.toString();
+        } catch (Throwable ignored) {
+            return t.getClass().getName() + " (error while printing stack 
trace)";
+        }
+    }
+}

Reply via email to