This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 39ba942cda4bd7752d3706dc70385a88182d8a3b
Author: Joao Boto <[email protected]>
AuthorDate: Wed Jul 10 19:11:04 2024 +0200

    [FLINK-35363] Modify tests to clean up unused docker images
---
 flink-connector-jdbc-core/pom.xml                  |  8 +++
 .../jdbc/derby/testutils/DerbyDatabase.java        | 57 +++++++++--------
 .../connector/jdbc/h2/testutils/H2XaDatabase.java  | 43 +++++++------
 .../jdbc/testutils/DatabaseExtension.java          | 66 ++++++++------------
 .../connector/jdbc/testutils/DatabaseResource.java | 15 +++++
 .../jdbc/testutils/resources/DockerResource.java   | 61 +++++++++++++++++++
 .../jdbc/testutils/resources/MemoryResource.java   |  6 ++
 .../jdbc/cratedb/testutils/CrateDBDatabase.java    | 10 +--
 .../connector/jdbc/db2/testutils/Db2Database.java  | 10 +--
 .../jdbc/mysql/testutils/MySqlDatabase.java        | 10 +--
 .../oceanbase/testutils/OceanBaseDatabase.java     | 26 +++++---
 .../jdbc/oracle/testutils/OracleDatabase.java      | 10 +--
 .../jdbc/postgres/testutils/PostgresDatabase.java  | 11 ++--
 .../jdbc/postgres/testutils/PostgresMetadata.java  |  3 +-
 .../sqlserver/testutils/SqlServerDatabase.java     | 10 +--
 .../jdbc/trino/testutils/TrinoDatabase.java        | 71 ++++++++++++++--------
 .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java |  7 ++-
 pom.xml                                            |  1 -
 18 files changed, 270 insertions(+), 155 deletions(-)

diff --git a/flink-connector-jdbc-core/pom.xml 
b/flink-connector-jdbc-core/pom.xml
index a7579845..851c1e07 100644
--- a/flink-connector-jdbc-core/pom.xml
+++ b/flink-connector-jdbc-core/pom.xml
@@ -158,6 +158,14 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <!-- TestContainer dependencies -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+
     </dependencies>
 
     <build>
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java
index b8a7439d..5c9e75ce 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java
@@ -20,6 +20,8 @@ package org.apache.flink.connector.jdbc.derby.testutils;
 
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.MemoryResource;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.io.OutputStream;
@@ -36,39 +38,44 @@ public class DerbyDatabase extends DatabaseExtension {
                 public void write(int b) {}
             };
 
-    private static DerbyMetadata metadata;
+    private static final DerbyMetadata metadata = new DerbyMetadata("test");
 
     public static DerbyMetadata getMetadata() {
-        if (metadata == null) {
-            metadata = new DerbyMetadata("test");
-        }
         return metadata;
     }
 
     @Override
-    public DatabaseMetadata startDatabase() throws Exception {
-        DatabaseMetadata metadata = getMetadata();
-        try {
-            System.setProperty(
-                    "derby.stream.error.field",
-                    DerbyDatabase.class.getCanonicalName() + ".DEV_NULL");
-            Class.forName(metadata.getDriverClass());
-            DriverManager.getConnection(String.format("%s;create=true", 
metadata.getJdbcUrl()))
-                    .close();
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(e);
-        }
-        return metadata;
+    protected DatabaseMetadata getMetadataDB() {
+        return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {
-        try {
-            DriverManager.getConnection(String.format("%s;shutdown=true", 
metadata.getJdbcUrl()))
-                    .close();
-        } catch (SQLException ignored) {
-        } finally {
-            metadata = null;
-        }
+    protected DatabaseResource getResource() {
+        return new MemoryResource() {
+            @Override
+            public void start() {
+                try {
+                    System.setProperty(
+                            "derby.stream.error.field",
+                            DerbyDatabase.class.getCanonicalName() + 
".DEV_NULL");
+                    Class.forName(metadata.getDriverClass());
+                    DriverManager.getConnection(
+                                    String.format("%s;create=true", 
metadata.getJdbcUrl()))
+                            .close();
+                } catch (Exception e) {
+                    throw new FlinkRuntimeException(e);
+                }
+            }
+
+            @Override
+            public void stop() {
+                try {
+                    DriverManager.getConnection(
+                                    String.format("%s;shutdown=true", 
metadata.getJdbcUrl()))
+                            .close();
+                } catch (SQLException ignored) {
+                }
+            }
+        };
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java
index edca8ad0..6c37f69e 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java
@@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.h2.testutils;
 
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.MemoryResource;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.sql.DriverManager;
@@ -26,31 +28,36 @@ import java.sql.DriverManager;
 /** H2 database for testing. */
 public class H2XaDatabase extends DatabaseExtension {
 
-    private static H2Metadata metadata;
+    private static final H2Metadata metadata = new H2Metadata("test");
 
     public static H2Metadata getMetadata() {
-        if (metadata == null) {
-            metadata = new H2Metadata("test");
-        }
         return metadata;
     }
 
     @Override
-    protected DatabaseMetadata startDatabase() throws Exception {
-        DatabaseMetadata metadata = getMetadata();
-        try {
-            Class.forName(metadata.getDriverClass());
-            DriverManager.getConnection(
-                            String.format(
-                                    "%s;DB_CLOSE_DELAY=-1;INIT=CREATE SCHEMA 
IF NOT EXISTS %s\\;SET SCHEMA %s",
-                                    metadata.getJdbcUrl(), "test", "test"))
-                    .close();
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(e);
-        }
-        return metadata;
+    protected DatabaseMetadata getMetadataDB() {
+        return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {}
+    protected DatabaseResource getResource() {
+        return new MemoryResource() {
+            @Override
+            public void start() {
+                try {
+                    Class.forName(metadata.getDriverClass());
+                    DriverManager.getConnection(
+                                    String.format(
+                                            "%s;DB_CLOSE_DELAY=-1;INIT=CREATE 
SCHEMA IF NOT EXISTS %s\\;SET SCHEMA %s",
+                                            metadata.getJdbcUrl(), "test", 
"test"))
+                            .close();
+                } catch (Exception e) {
+                    throw new FlinkRuntimeException(e);
+                }
+            }
+
+            @Override
+            public void stop() {}
+        };
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java
index 3cc73559..718b4f4f 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java
@@ -35,42 +35,21 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.junit.platform.commons.support.AnnotationSupport.findRepeatableAnnotations;
 
 /** Database extension for testing. */
 public abstract class DatabaseExtension
-        implements BeforeAllCallback,
-                AfterAllCallback,
-                BeforeEachCallback,
-                AfterEachCallback,
-                ExtensionContext.Store.CloseableResource {
-
-    /**
-     * Database Lifecycle for testing. The goal it's that all database 
containers are create only
-     * one time.
-     */
-    public enum Lifecycle {
-        /** Database will be instantiated only one time. */
-        PER_EXECUTION,
-        /** Database will be instantiated by class. */
-        PER_CLASS
-    }
+        implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, 
AfterEachCallback {
 
-    protected abstract DatabaseMetadata startDatabase() throws Exception;
+    protected abstract DatabaseMetadata getMetadataDB();
 
-    protected abstract void stopDatabase() throws Exception;
+    protected abstract DatabaseResource getResource();
 
     private final String uniqueKey = this.getClass().getSimpleName();
-
-    protected Lifecycle getLifecycle() {
-        return Lifecycle.PER_EXECUTION;
-    }
-
-    private ExtensionContext.Store getStore(ExtensionContext context) {
-        return context.getRoot().getStore(Namespace.GLOBAL);
-    }
+    private final String uniqueResource = String.format("%sResource", 
uniqueKey);
 
     private DatabaseTest getDatabaseBaseTest(Class<?> clazz) throws Exception {
         DatabaseTest dbClazz = null;
@@ -95,8 +74,7 @@ public abstract class DatabaseExtension
                 .filter(DatabaseTest.class::isAssignableFrom)
                 .ifPresent(
                         clazz -> {
-                            DatabaseMetadata metadata =
-                                    getStore(context).get(uniqueKey, 
DatabaseMetadata.class);
+                            DatabaseMetadata metadata = getMetadataDB();
                             if (metadata != null) {
                                 try (Connection conn = 
metadata.getConnection()) {
                                     for (TableManaged table :
@@ -132,15 +110,19 @@ public abstract class DatabaseExtension
         return false;
     }
 
+    private DatabaseResource getResource(ExtensionContext context) {
+        return context.getRoot()
+                .getStore(Namespace.GLOBAL)
+                .getOrComputeIfAbsent(uniqueResource, startResource(), 
DatabaseResource.class);
+    }
+
     @Override
     public final void beforeAll(ExtensionContext context) throws Exception {
         if (ignoreTestDatabase(context)) {
             return;
         }
 
-        if (getStore(context).get(uniqueKey) == null) {
-            getStore(context).put(uniqueKey, startDatabase());
-        }
+        getResource(context);
 
         getManagedTables(context, TableManaged::createTable);
     }
@@ -153,6 +135,7 @@ public abstract class DatabaseExtension
         if (ignoreTestDatabase(context)) {
             return;
         }
+
         getManagedTables(context, TableManaged::deleteTable);
     }
 
@@ -161,18 +144,9 @@ public abstract class DatabaseExtension
         if (ignoreTestDatabase(context)) {
             return;
         }
-        getManagedTables(context, TableManaged::dropTable);
-        if (Lifecycle.PER_CLASS == getLifecycle()) {
-            stopDatabase();
-            getStore(context).remove(uniqueKey, DatabaseMetadata.class);
-        }
-    }
 
-    @Override
-    public final void close() throws Throwable {
-        if (Lifecycle.PER_EXECUTION == getLifecycle()) {
-            stopDatabase();
-        }
+        getManagedTables(context, TableManaged::dropTable);
+        getResource(context);
     }
 
     private Set<String> retrieveDatabaseExtensions(final ExtensionContext 
context) {
@@ -199,4 +173,12 @@ public abstract class DatabaseExtension
 
         return retrieveExtensions.apply(context, new HashSet<>());
     }
+
+    private Function<String, DatabaseResource> startResource() {
+        return s -> {
+            DatabaseResource resource = getResource();
+            resource.start();
+            return resource;
+        };
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseResource.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseResource.java
new file mode 100644
index 00000000..07e95617
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseResource.java
@@ -0,0 +1,15 @@
+package org.apache.flink.connector.jdbc.testutils;
+
+import 
org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource;
+
+/** Database resource for testing. */
+public interface DatabaseResource extends CloseableResource {
+
+    void start();
+
+    void stop();
+
+    default void close() throws Throwable {
+        stop();
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java
new file mode 100644
index 00000000..8b721b12
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java
@@ -0,0 +1,61 @@
+package org.apache.flink.connector.jdbc.testutils.resources;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+
+import com.github.dockerjava.api.DockerClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+
+import java.util.Arrays;
+
+/** Docker based database resource. */
+public class DockerResource implements DatabaseResource {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(DockerResource.class);
+
+    private final JdbcDatabaseContainer<?> container;
+
+    public DockerResource(JdbcDatabaseContainer<?> container) {
+        this.container = container;
+    }
+
+    @Override
+    public void start() {
+        this.container.start();
+    }
+
+    @Override
+    public void stop() {
+        this.container.stop();
+    }
+
+    @Override
+    public void close() throws Throwable {
+        stop();
+        cleanContainers(container);
+    }
+
+    public static void cleanContainers(GenericContainer<?> container) {
+        try {
+            DockerClient client = DockerClientFactory.instance().client();
+            //            
client.removeImageCmd(container.getDockerImageName()).exec();
+            client.listImagesCmd().exec().stream()
+                    .filter(
+                            image ->
+                                    Arrays.stream(image.getRepoTags())
+                                            .anyMatch(
+                                                    tag ->
+                                                            
!tag.contains("testcontainers/ryuk")
+                                                                    && 
!tag.contains(
+                                                                            
container
+                                                                               
     .getDockerImageName())))
+                    .forEach(image -> 
client.removeImageCmd(image.getId()).exec());
+
+        } catch (Exception ignore) {
+            LOG.warn("Error deleting image.");
+        }
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/MemoryResource.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/MemoryResource.java
new file mode 100644
index 00000000..2404bf44
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/MemoryResource.java
@@ -0,0 +1,6 @@
+package org.apache.flink.connector.jdbc.testutils.resources;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+
+/** Memory based database resource. */
+public interface MemoryResource extends DatabaseResource {}
diff --git 
a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java
 
b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java
index 50d3ae4c..08bf113f 100644
--- 
a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java
+++ 
b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java
@@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.cratedb.testutils;
 
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.testcontainers.containers.JdbcDatabaseContainer;
@@ -66,15 +68,13 @@ public class CrateDBDatabase extends DatabaseExtension 
implements CrateDBImages
     }
 
     @Override
-    protected DatabaseMetadata startDatabase() throws Exception {
-        CONTAINER.start();
+    protected DatabaseMetadata getMetadataDB() {
         return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {
-        CONTAINER.stop();
-        metadata = null;
+    protected DatabaseResource getResource() {
+        return new DockerResource(CONTAINER);
     }
 
     /**
diff --git 
a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java
 
b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java
index 24e75858..930dbdb1 100644
--- 
a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java
+++ 
b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java
@@ -20,6 +20,8 @@ package org.apache.flink.connector.jdbc.db2.testutils;
 
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
@@ -56,14 +58,12 @@ public class Db2Database extends DatabaseExtension 
implements Db2Images {
     }
 
     @Override
-    protected DatabaseMetadata startDatabase() throws Exception {
-        CONTAINER.start();
+    protected DatabaseMetadata getMetadataDB() {
         return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {
-        CONTAINER.stop();
-        metadata = null;
+    protected DatabaseResource getResource() {
+        return new DockerResource(CONTAINER);
     }
 }
diff --git 
a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java
 
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java
index 99195063..0b5e7908 100644
--- 
a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java
+++ 
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java
@@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.mysql.testutils;
 
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.testcontainers.containers.MySQLContainer;
@@ -46,14 +48,12 @@ public class MySqlDatabase extends DatabaseExtension 
implements MySqlImages {
     }
 
     @Override
-    protected DatabaseMetadata startDatabase() throws Exception {
-        CONTAINER.start();
+    protected DatabaseMetadata getMetadataDB() {
         return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {
-        CONTAINER.stop();
-        metadata = null;
+    protected DatabaseResource getResource() {
+        return new DockerResource(CONTAINER);
     }
 }
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
index 62279748..c07ad5c2 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
@@ -20,6 +20,8 @@ package org.apache.flink.connector.jdbc.oceanbase.testutils;
 
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
@@ -28,6 +30,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.oceanbase.OceanBaseCEContainer;
 
 import java.sql.Connection;
+import java.sql.SQLException;
 import java.sql.Statement;
 
 /** OceanBase database for testing. */
@@ -56,18 +59,23 @@ public class OceanBaseDatabase extends DatabaseExtension 
implements OceanBaseIma
     }
 
     @Override
-    protected DatabaseMetadata startDatabase() throws Exception {
-        CONTAINER.start();
-        try (Connection connection = getMetadata().getConnection();
-                Statement statement = connection.createStatement()) {
-            statement.execute("SET GLOBAL time_zone = '+00:00'");
-        }
+    protected DatabaseMetadata getMetadataDB() {
         return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {
-        CONTAINER.stop();
-        metadata = null;
+    protected DatabaseResource getResource() {
+        return new DockerResource(CONTAINER) {
+            @Override
+            public void start() {
+                super.start();
+                try (Connection connection = getMetadata().getConnection();
+                        Statement statement = connection.createStatement()) {
+                    statement.execute("SET GLOBAL time_zone = '+00:00'");
+                } catch (SQLException e) {
+                    throw new FlinkRuntimeException(e);
+                }
+            }
+        };
     }
 }
diff --git 
a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java
 
b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java
index 37197292..c1a4e4f9 100644
--- 
a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java
+++ 
b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java
@@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.oracle.testutils;
 
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.testcontainers.containers.OracleContainer;
@@ -45,14 +47,12 @@ public class OracleDatabase extends DatabaseExtension 
implements OracleImages {
     }
 
     @Override
-    protected DatabaseMetadata startDatabase() throws Exception {
-        CONTAINER.start();
+    protected DatabaseMetadata getMetadataDB() {
         return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {
-        CONTAINER.stop();
-        metadata = null;
+    protected DatabaseResource getResource() {
+        return new DockerResource(CONTAINER);
     }
 }
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java
index 5b1700bd..0b871ec4 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java
@@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.postgres.testutils;
 
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.testcontainers.containers.PostgreSQLContainer;
@@ -44,16 +46,13 @@ public class PostgresDatabase extends DatabaseExtension 
implements PostgresImage
         return metadata;
     }
 
-    @Override
-    protected DatabaseMetadata startDatabase() throws Exception {
-        CONTAINER.start();
+    protected DatabaseMetadata getMetadataDB() {
         return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {
-        CONTAINER.stop();
-        metadata = null;
+    protected DatabaseResource getResource() {
+        return new DockerResource(CONTAINER);
     }
 
     /** {@link PostgreSQLContainer} with XA enabled (by setting 
max_prepared_transactions). */
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
index 219f4961..2d083e05 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.postgres.testutils;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
 
 import org.postgresql.xa.PGXADataSource;
+import org.testcontainers.containers.JdbcDatabaseContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
 
 import javax.sql.XADataSource;
@@ -38,7 +39,7 @@ public class PostgresMetadata implements DatabaseMetadata {
         this(container, false);
     }
 
-    public PostgresMetadata(PostgreSQLContainer<?> container, boolean 
hasXaEnabled) {
+    public PostgresMetadata(JdbcDatabaseContainer<?> container, boolean 
hasXaEnabled) {
         this.username = container.getUsername();
         this.password = container.getPassword();
         this.url = container.getJdbcUrl();
diff --git 
a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java
 
b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java
index 7629fba5..ddb9fbd1 100644
--- 
a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java
+++ 
b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java
@@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.sqlserver.testutils;
 
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -48,15 +50,13 @@ public class SqlServerDatabase extends DatabaseExtension 
implements SqlServerIma
     }
 
     @Override
-    protected DatabaseMetadata startDatabase() throws Exception {
-        CONTAINER.start();
+    protected DatabaseMetadata getMetadataDB() {
         return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {
-        CONTAINER.stop();
-        metadata = null;
+    protected DatabaseResource getResource() {
+        return new DockerResource(CONTAINER);
     }
 
     /** {@link MSSQLServerContainer} with Xa. */
diff --git 
a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java
 
b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java
index ba65a189..f56d3d48 100644
--- 
a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java
+++ 
b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.connector.jdbc.postgres.testutils.PostgresImages;
 import org.apache.flink.connector.jdbc.postgres.testutils.PostgresMetadata;
 import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.testcontainers.containers.BindMode;
@@ -33,6 +35,7 @@ import org.testcontainers.containers.wait.strategy.Wait;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
 
 /** A Trino database for testing. */
 public class TrinoDatabase extends DatabaseExtension implements TrinoImages, 
PostgresImages {
@@ -75,34 +78,52 @@ public class TrinoDatabase extends DatabaseExtension 
implements TrinoImages, Pos
     }
 
     @Override
-    protected DatabaseMetadata startDatabase() throws Exception {
-        CONTAINER_DB.start();
-
-        Path tempFile = Files.createTempFile(null, null);
-        String postgresContent =
-                "connector.name=postgresql\n"
-                        + String.format(
-                                
"connection-url=jdbc:postgresql://%s:%s/test\n",
-                                CONTAINER_DB_ALIAS, CONTAINER_DB_PORT)
-                        + String.format("connection-user=%s\n", 
CONTAINER_DB.getUsername())
-                        + String.format("connection-password=%s\n", 
CONTAINER_DB.getPassword());
-        Files.write(tempFile, 
postgresContent.getBytes(StandardCharsets.UTF_8));
-
-        CONTAINER
-                .withDatabaseName("postgres/public")
-                .withFileSystemBind(
-                        tempFile.toFile().getAbsolutePath(),
-                        "/etc/trino/catalog/postgres.properties",
-                        BindMode.READ_WRITE)
-                .waitingFor(Wait.forHttp("/ui/login.html").forStatusCode(200));
-        CONTAINER.start();
+    protected DatabaseMetadata getMetadataDB() {
         return getMetadata();
     }
 
     @Override
-    protected void stopDatabase() throws Exception {
-        CONTAINER.stop();
-        CONTAINER_DB.stop();
-        metadata = null;
+    protected DatabaseResource getResource() {
+        return new DatabaseResource() {
+            @Override
+            public void start() {
+                try {
+                    CONTAINER_DB.start();
+
+                    Path tempFile = Files.createTempFile(null, null);
+                    String postgresContent =
+                            "connector.name=postgresql\n"
+                                    + String.format(
+                                            
"connection-url=jdbc:postgresql://%s:%s/test\n",
+                                            CONTAINER_DB_ALIAS, 
CONTAINER_DB_PORT)
+                                    + String.format(
+                                            "connection-user=%s\n", 
CONTAINER_DB.getUsername())
+                                    + String.format(
+                                            "connection-password=%s\n", 
CONTAINER_DB.getPassword());
+                    Files.write(tempFile, 
postgresContent.getBytes(StandardCharsets.UTF_8));
+
+                    CONTAINER
+                            .withDatabaseName("postgres/public")
+                            .withFileSystemBind(
+                                    tempFile.toFile().getAbsolutePath(),
+                                    "/etc/trino/catalog/postgres.properties",
+                                    BindMode.READ_WRITE)
+                            
.waitingFor(Wait.forHttp("/ui/login.html").forStatusCode(200));
+                    CONTAINER.start();
+                } catch (Exception e) {
+                    throw new FlinkRuntimeException(e);
+                }
+            }
+
+            @Override
+            public void stop() {
+                Arrays.asList(CONTAINER, CONTAINER_DB)
+                        .forEach(
+                                container -> {
+                                    container.stop();
+                                    DockerResource.cleanContainers(container);
+                                });
+            }
+        };
     }
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
index 3a4d4df4..5b4ac294 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
 import org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
-import org.apache.flink.connector.jdbc.derby.testutils.DerbyDatabase;
+import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -52,7 +52,8 @@ import static 
org.apache.flink.streaming.util.OperatorSnapshotUtil.readStateHand
 import static 
org.apache.flink.streaming.util.OperatorSnapshotUtil.writeStateHandle;
 
 /** Tests state migration for {@link JdbcXaSinkFunction}. */
-class JdbcXaSinkMigrationTest extends JdbcTestBase {
+@Deprecated
+class JdbcXaSinkMigrationTest extends JdbcTestBase implements DerbyTestBase {
 
     // write a snapshot:
     // java <CLASS_NAME> <VERSION>
@@ -60,7 +61,7 @@ class JdbcXaSinkMigrationTest extends JdbcTestBase {
     // mvn exec:java -Dexec.mainClass="<CLASS_NAME>" -Dexec.args='<VERSION>'
     // -Dexec.classpathScope=test -Dexec.cleanupDaemonThreads=false
     public static void main(String[] args) throws Exception {
-        new DerbyDatabase().startDatabase();
+        //        new DerbyDatabase();
         JdbcXaSinkMigrationTest test = new JdbcXaSinkMigrationTest();
         test.writeSnapshot(parseVersionArg(args));
     }
diff --git a/pom.xml b/pom.xml
index fda0c842..fa8d4837 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,6 @@ under the License.
         <slf4j.version>1.7.36</slf4j.version>
         <log4j.version>2.17.2</log4j.version>
 
-
         
<flink.parent.artifactId>flink-connector-jdbc-parent</flink.parent.artifactId>
         <!-- These 2 properties should be removed together with upgrade of 
flink-connector-parent to 1.1.x -->
         <flink.surefire.baseArgLine>-XX:+UseG1GC -Xms256m 
-XX:+IgnoreUnrecognizedVMOptions 
${surefire.module.config}</flink.surefire.baseArgLine>


Reply via email to