This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit 39ba942cda4bd7752d3706dc70385a88182d8a3b Author: Joao Boto <[email protected]> AuthorDate: Wed Jul 10 19:11:04 2024 +0200 [FLINK-35363] Modify tests to clean up unused docker images --- flink-connector-jdbc-core/pom.xml | 8 +++ .../jdbc/derby/testutils/DerbyDatabase.java | 57 +++++++++-------- .../connector/jdbc/h2/testutils/H2XaDatabase.java | 43 +++++++------ .../jdbc/testutils/DatabaseExtension.java | 66 ++++++++------------ .../connector/jdbc/testutils/DatabaseResource.java | 15 +++++ .../jdbc/testutils/resources/DockerResource.java | 61 +++++++++++++++++++ .../jdbc/testutils/resources/MemoryResource.java | 6 ++ .../jdbc/cratedb/testutils/CrateDBDatabase.java | 10 +-- .../connector/jdbc/db2/testutils/Db2Database.java | 10 +-- .../jdbc/mysql/testutils/MySqlDatabase.java | 10 +-- .../oceanbase/testutils/OceanBaseDatabase.java | 26 +++++--- .../jdbc/oracle/testutils/OracleDatabase.java | 10 +-- .../jdbc/postgres/testutils/PostgresDatabase.java | 11 ++-- .../jdbc/postgres/testutils/PostgresMetadata.java | 3 +- .../sqlserver/testutils/SqlServerDatabase.java | 10 +-- .../jdbc/trino/testutils/TrinoDatabase.java | 71 ++++++++++++++-------- .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 7 ++- pom.xml | 1 - 18 files changed, 270 insertions(+), 155 deletions(-) diff --git a/flink-connector-jdbc-core/pom.xml b/flink-connector-jdbc-core/pom.xml index a7579845..851c1e07 100644 --- a/flink-connector-jdbc-core/pom.xml +++ b/flink-connector-jdbc-core/pom.xml @@ -158,6 +158,14 @@ under the License. <scope>test</scope> </dependency> + <!-- TestContainer dependencies --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>jdbc</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> <build> diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java index b8a7439d..5c9e75ce 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java @@ -20,6 +20,8 @@ package org.apache.flink.connector.jdbc.derby.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.MemoryResource; import org.apache.flink.util.FlinkRuntimeException; import java.io.OutputStream; @@ -36,39 +38,44 @@ public class DerbyDatabase extends DatabaseExtension { public void write(int b) {} }; - private static DerbyMetadata metadata; + private static final DerbyMetadata metadata = new DerbyMetadata("test"); public static DerbyMetadata getMetadata() { - if (metadata == null) { - metadata = new DerbyMetadata("test"); - } return metadata; } @Override - public DatabaseMetadata startDatabase() throws Exception { - DatabaseMetadata metadata = getMetadata(); - try { - System.setProperty( - "derby.stream.error.field", - DerbyDatabase.class.getCanonicalName() + ".DEV_NULL"); - Class.forName(metadata.getDriverClass()); - DriverManager.getConnection(String.format("%s;create=true", metadata.getJdbcUrl())) - .close(); - } catch (Exception e) { - throw new FlinkRuntimeException(e); - } - return metadata; + protected DatabaseMetadata getMetadataDB() { + return getMetadata(); } @Override - protected void stopDatabase() throws Exception { - try { - DriverManager.getConnection(String.format("%s;shutdown=true", metadata.getJdbcUrl())) - .close(); - } catch (SQLException ignored) { - } finally { - metadata = null; - } + protected DatabaseResource getResource() { + return new MemoryResource() { + @Override + public void start() { + try { + System.setProperty( + "derby.stream.error.field", + DerbyDatabase.class.getCanonicalName() + ".DEV_NULL"); + Class.forName(metadata.getDriverClass()); + DriverManager.getConnection( + String.format("%s;create=true", metadata.getJdbcUrl())) + .close(); + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + } + + @Override + public void stop() { + try { + DriverManager.getConnection( + String.format("%s;shutdown=true", metadata.getJdbcUrl())) + .close(); + } catch (SQLException ignored) { + } + } + }; } } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java index edca8ad0..6c37f69e 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.h2.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.MemoryResource; import org.apache.flink.util.FlinkRuntimeException; import java.sql.DriverManager; @@ -26,31 +28,36 @@ import java.sql.DriverManager; /** H2 database for testing. */ public class H2XaDatabase extends DatabaseExtension { - private static H2Metadata metadata; + private static final H2Metadata metadata = new H2Metadata("test"); public static H2Metadata getMetadata() { - if (metadata == null) { - metadata = new H2Metadata("test"); - } return metadata; } @Override - protected DatabaseMetadata startDatabase() throws Exception { - DatabaseMetadata metadata = getMetadata(); - try { - Class.forName(metadata.getDriverClass()); - DriverManager.getConnection( - String.format( - "%s;DB_CLOSE_DELAY=-1;INIT=CREATE SCHEMA IF NOT EXISTS %s\\;SET SCHEMA %s", - metadata.getJdbcUrl(), "test", "test")) - .close(); - } catch (Exception e) { - throw new FlinkRuntimeException(e); - } - return metadata; + protected DatabaseMetadata getMetadataDB() { + return getMetadata(); } @Override - protected void stopDatabase() throws Exception {} + protected DatabaseResource getResource() { + return new MemoryResource() { + @Override + public void start() { + try { + Class.forName(metadata.getDriverClass()); + DriverManager.getConnection( + String.format( + "%s;DB_CLOSE_DELAY=-1;INIT=CREATE SCHEMA IF NOT EXISTS %s\\;SET SCHEMA %s", + metadata.getJdbcUrl(), "test", "test")) + .close(); + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + } + + @Override + public void stop() {} + }; + } } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java index 3cc73559..718b4f4f 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java @@ -35,42 +35,21 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import static org.junit.platform.commons.support.AnnotationSupport.findRepeatableAnnotations; /** Database extension for testing. */ public abstract class DatabaseExtension - implements BeforeAllCallback, - AfterAllCallback, - BeforeEachCallback, - AfterEachCallback, - ExtensionContext.Store.CloseableResource { - - /** - * Database Lifecycle for testing. The goal it's that all database containers are create only - * one time. - */ - public enum Lifecycle { - /** Database will be instantiated only one time. */ - PER_EXECUTION, - /** Database will be instantiated by class. */ - PER_CLASS - } + implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { - protected abstract DatabaseMetadata startDatabase() throws Exception; + protected abstract DatabaseMetadata getMetadataDB(); - protected abstract void stopDatabase() throws Exception; + protected abstract DatabaseResource getResource(); private final String uniqueKey = this.getClass().getSimpleName(); - - protected Lifecycle getLifecycle() { - return Lifecycle.PER_EXECUTION; - } - - private ExtensionContext.Store getStore(ExtensionContext context) { - return context.getRoot().getStore(Namespace.GLOBAL); - } + private final String uniqueResource = String.format("%sResource", uniqueKey); private DatabaseTest getDatabaseBaseTest(Class<?> clazz) throws Exception { DatabaseTest dbClazz = null; @@ -95,8 +74,7 @@ public abstract class DatabaseExtension .filter(DatabaseTest.class::isAssignableFrom) .ifPresent( clazz -> { - DatabaseMetadata metadata = - getStore(context).get(uniqueKey, DatabaseMetadata.class); + DatabaseMetadata metadata = getMetadataDB(); if (metadata != null) { try (Connection conn = metadata.getConnection()) { for (TableManaged table : @@ -132,15 +110,19 @@ public abstract class DatabaseExtension return false; } + private DatabaseResource getResource(ExtensionContext context) { + return context.getRoot() + .getStore(Namespace.GLOBAL) + .getOrComputeIfAbsent(uniqueResource, startResource(), DatabaseResource.class); + } + @Override public final void beforeAll(ExtensionContext context) throws Exception { if (ignoreTestDatabase(context)) { return; } - if (getStore(context).get(uniqueKey) == null) { - getStore(context).put(uniqueKey, startDatabase()); - } + getResource(context); getManagedTables(context, TableManaged::createTable); } @@ -153,6 +135,7 @@ public abstract class DatabaseExtension if (ignoreTestDatabase(context)) { return; } + getManagedTables(context, TableManaged::deleteTable); } @@ -161,18 +144,9 @@ public abstract class DatabaseExtension if (ignoreTestDatabase(context)) { return; } - getManagedTables(context, TableManaged::dropTable); - if (Lifecycle.PER_CLASS == getLifecycle()) { - stopDatabase(); - getStore(context).remove(uniqueKey, DatabaseMetadata.class); - } - } - @Override - public final void close() throws Throwable { - if (Lifecycle.PER_EXECUTION == getLifecycle()) { - stopDatabase(); - } + getManagedTables(context, TableManaged::dropTable); + getResource(context); } private Set<String> retrieveDatabaseExtensions(final ExtensionContext context) { @@ -199,4 +173,12 @@ public abstract class DatabaseExtension return retrieveExtensions.apply(context, new HashSet<>()); } + + private Function<String, DatabaseResource> startResource() { + return s -> { + DatabaseResource resource = getResource(); + resource.start(); + return resource; + }; + } } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseResource.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseResource.java new file mode 100644 index 00000000..07e95617 --- /dev/null +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseResource.java @@ -0,0 +1,15 @@ +package org.apache.flink.connector.jdbc.testutils; + +import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource; + +/** Database resource for testing. */ +public interface DatabaseResource extends CloseableResource { + + void start(); + + void stop(); + + default void close() throws Throwable { + stop(); + } +} diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java new file mode 100644 index 00000000..8b721b12 --- /dev/null +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java @@ -0,0 +1,61 @@ +package org.apache.flink.connector.jdbc.testutils.resources; + +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; + +import com.github.dockerjava.api.DockerClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.JdbcDatabaseContainer; + +import java.util.Arrays; + +/** Docker based database resource. */ +public class DockerResource implements DatabaseResource { + + protected static final Logger LOG = LoggerFactory.getLogger(DockerResource.class); + + private final JdbcDatabaseContainer<?> container; + + public DockerResource(JdbcDatabaseContainer<?> container) { + this.container = container; + } + + @Override + public void start() { + this.container.start(); + } + + @Override + public void stop() { + this.container.stop(); + } + + @Override + public void close() throws Throwable { + stop(); + cleanContainers(container); + } + + public static void cleanContainers(GenericContainer<?> container) { + try { + DockerClient client = DockerClientFactory.instance().client(); + // client.removeImageCmd(container.getDockerImageName()).exec(); + client.listImagesCmd().exec().stream() + .filter( + image -> + Arrays.stream(image.getRepoTags()) + .anyMatch( + tag -> + !tag.contains("testcontainers/ryuk") + && !tag.contains( + container + .getDockerImageName()))) + .forEach(image -> client.removeImageCmd(image.getId()).exec()); + + } catch (Exception ignore) { + LOG.warn("Error deleting image."); + } + } +} diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/MemoryResource.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/MemoryResource.java new file mode 100644 index 00000000..2404bf44 --- /dev/null +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/MemoryResource.java @@ -0,0 +1,6 @@ +package org.apache.flink.connector.jdbc.testutils.resources; + +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; + +/** Memory based database resource. */ +public interface MemoryResource extends DatabaseResource {} diff --git a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java index 50d3ae4c..08bf113f 100644 --- a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java +++ b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.cratedb.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; import org.apache.flink.util.FlinkRuntimeException; import org.testcontainers.containers.JdbcDatabaseContainer; @@ -66,15 +68,13 @@ public class CrateDBDatabase extends DatabaseExtension implements CrateDBImages } @Override - protected DatabaseMetadata startDatabase() throws Exception { - CONTAINER.start(); + protected DatabaseMetadata getMetadataDB() { return getMetadata(); } @Override - protected void stopDatabase() throws Exception { - CONTAINER.stop(); - metadata = null; + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER); } /** diff --git a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java index 24e75858..930dbdb1 100644 --- a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java +++ b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java @@ -20,6 +20,8 @@ package org.apache.flink.connector.jdbc.db2.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; @@ -56,14 +58,12 @@ public class Db2Database extends DatabaseExtension implements Db2Images { } @Override - protected DatabaseMetadata startDatabase() throws Exception { - CONTAINER.start(); + protected DatabaseMetadata getMetadataDB() { return getMetadata(); } @Override - protected void stopDatabase() throws Exception { - CONTAINER.stop(); - metadata = null; + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER); } } diff --git a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java index 99195063..0b5e7908 100644 --- a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java +++ b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.mysql.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; import org.apache.flink.util.FlinkRuntimeException; import org.testcontainers.containers.MySQLContainer; @@ -46,14 +48,12 @@ public class MySqlDatabase extends DatabaseExtension implements MySqlImages { } @Override - protected DatabaseMetadata startDatabase() throws Exception { - CONTAINER.start(); + protected DatabaseMetadata getMetadataDB() { return getMetadata(); } @Override - protected void stopDatabase() throws Exception { - CONTAINER.stop(); - metadata = null; + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER); } } diff --git a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java index 62279748..c07ad5c2 100644 --- a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java +++ b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java @@ -20,6 +20,8 @@ package org.apache.flink.connector.jdbc.oceanbase.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; @@ -28,6 +30,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.oceanbase.OceanBaseCEContainer; import java.sql.Connection; +import java.sql.SQLException; import java.sql.Statement; /** OceanBase database for testing. */ @@ -56,18 +59,23 @@ public class OceanBaseDatabase extends DatabaseExtension implements OceanBaseIma } @Override - protected DatabaseMetadata startDatabase() throws Exception { - CONTAINER.start(); - try (Connection connection = getMetadata().getConnection(); - Statement statement = connection.createStatement()) { - statement.execute("SET GLOBAL time_zone = '+00:00'"); - } + protected DatabaseMetadata getMetadataDB() { return getMetadata(); } @Override - protected void stopDatabase() throws Exception { - CONTAINER.stop(); - metadata = null; + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER) { + @Override + public void start() { + super.start(); + try (Connection connection = getMetadata().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("SET GLOBAL time_zone = '+00:00'"); + } catch (SQLException e) { + throw new FlinkRuntimeException(e); + } + } + }; } } diff --git a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java index 37197292..c1a4e4f9 100644 --- a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java +++ b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.oracle.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; import org.apache.flink.util.FlinkRuntimeException; import org.testcontainers.containers.OracleContainer; @@ -45,14 +47,12 @@ public class OracleDatabase extends DatabaseExtension implements OracleImages { } @Override - protected DatabaseMetadata startDatabase() throws Exception { - CONTAINER.start(); + protected DatabaseMetadata getMetadataDB() { return getMetadata(); } @Override - protected void stopDatabase() throws Exception { - CONTAINER.stop(); - metadata = null; + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER); } } diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java index 5b1700bd..0b871ec4 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.postgres.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; import org.apache.flink.util.FlinkRuntimeException; import org.testcontainers.containers.PostgreSQLContainer; @@ -44,16 +46,13 @@ public class PostgresDatabase extends DatabaseExtension implements PostgresImage return metadata; } - @Override - protected DatabaseMetadata startDatabase() throws Exception { - CONTAINER.start(); + protected DatabaseMetadata getMetadataDB() { return getMetadata(); } @Override - protected void stopDatabase() throws Exception { - CONTAINER.stop(); - metadata = null; + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER); } /** {@link PostgreSQLContainer} with XA enabled (by setting max_prepared_transactions). */ diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java index 219f4961..2d083e05 100644 --- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java +++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.postgres.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; import org.postgresql.xa.PGXADataSource; +import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.containers.PostgreSQLContainer; import javax.sql.XADataSource; @@ -38,7 +39,7 @@ public class PostgresMetadata implements DatabaseMetadata { this(container, false); } - public PostgresMetadata(PostgreSQLContainer<?> container, boolean hasXaEnabled) { + public PostgresMetadata(JdbcDatabaseContainer<?> container, boolean hasXaEnabled) { this.username = container.getUsername(); this.password = container.getPassword(); this.url = container.getJdbcUrl(); diff --git a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java index 7629fba5..ddb9fbd1 100644 --- a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java +++ b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.jdbc.sqlserver.testutils; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -48,15 +50,13 @@ public class SqlServerDatabase extends DatabaseExtension implements SqlServerIma } @Override - protected DatabaseMetadata startDatabase() throws Exception { - CONTAINER.start(); + protected DatabaseMetadata getMetadataDB() { return getMetadata(); } @Override - protected void stopDatabase() throws Exception { - CONTAINER.stop(); - metadata = null; + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER); } /** {@link MSSQLServerContainer} with Xa. */ diff --git a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java index ba65a189..f56d3d48 100644 --- a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java +++ b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java @@ -22,6 +22,8 @@ import org.apache.flink.connector.jdbc.postgres.testutils.PostgresImages; import org.apache.flink.connector.jdbc.postgres.testutils.PostgresMetadata; import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; import org.apache.flink.util.FlinkRuntimeException; import org.testcontainers.containers.BindMode; @@ -33,6 +35,7 @@ import org.testcontainers.containers.wait.strategy.Wait; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; /** A Trino database for testing. */ public class TrinoDatabase extends DatabaseExtension implements TrinoImages, PostgresImages { @@ -75,34 +78,52 @@ public class TrinoDatabase extends DatabaseExtension implements TrinoImages, Pos } @Override - protected DatabaseMetadata startDatabase() throws Exception { - CONTAINER_DB.start(); - - Path tempFile = Files.createTempFile(null, null); - String postgresContent = - "connector.name=postgresql\n" - + String.format( - "connection-url=jdbc:postgresql://%s:%s/test\n", - CONTAINER_DB_ALIAS, CONTAINER_DB_PORT) - + String.format("connection-user=%s\n", CONTAINER_DB.getUsername()) - + String.format("connection-password=%s\n", CONTAINER_DB.getPassword()); - Files.write(tempFile, postgresContent.getBytes(StandardCharsets.UTF_8)); - - CONTAINER - .withDatabaseName("postgres/public") - .withFileSystemBind( - tempFile.toFile().getAbsolutePath(), - "/etc/trino/catalog/postgres.properties", - BindMode.READ_WRITE) - .waitingFor(Wait.forHttp("/ui/login.html").forStatusCode(200)); - CONTAINER.start(); + protected DatabaseMetadata getMetadataDB() { return getMetadata(); } @Override - protected void stopDatabase() throws Exception { - CONTAINER.stop(); - CONTAINER_DB.stop(); - metadata = null; + protected DatabaseResource getResource() { + return new DatabaseResource() { + @Override + public void start() { + try { + CONTAINER_DB.start(); + + Path tempFile = Files.createTempFile(null, null); + String postgresContent = + "connector.name=postgresql\n" + + String.format( + "connection-url=jdbc:postgresql://%s:%s/test\n", + CONTAINER_DB_ALIAS, CONTAINER_DB_PORT) + + String.format( + "connection-user=%s\n", CONTAINER_DB.getUsername()) + + String.format( + "connection-password=%s\n", CONTAINER_DB.getPassword()); + Files.write(tempFile, postgresContent.getBytes(StandardCharsets.UTF_8)); + + CONTAINER + .withDatabaseName("postgres/public") + .withFileSystemBind( + tempFile.toFile().getAbsolutePath(), + "/etc/trino/catalog/postgres.properties", + BindMode.READ_WRITE) + .waitingFor(Wait.forHttp("/ui/login.html").forStatusCode(200)); + CONTAINER.start(); + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + } + + @Override + public void stop() { + Arrays.asList(CONTAINER, CONTAINER_DB) + .forEach( + container -> { + container.stop(); + DockerResource.cleanContainers(container); + }); + } + }; } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java index 3a4d4df4..5b4ac294 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java @@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.JdbcTestBase; import org.apache.flink.connector.jdbc.JdbcTestFixture; import org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry; -import org.apache.flink.connector.jdbc.derby.testutils.DerbyDatabase; +import org.apache.flink.connector.jdbc.derby.DerbyTestBase; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -52,7 +52,8 @@ import static org.apache.flink.streaming.util.OperatorSnapshotUtil.readStateHand import static org.apache.flink.streaming.util.OperatorSnapshotUtil.writeStateHandle; /** Tests state migration for {@link JdbcXaSinkFunction}. */ -class JdbcXaSinkMigrationTest extends JdbcTestBase { +@Deprecated +class JdbcXaSinkMigrationTest extends JdbcTestBase implements DerbyTestBase { // write a snapshot: // java <CLASS_NAME> <VERSION> @@ -60,7 +61,7 @@ class JdbcXaSinkMigrationTest extends JdbcTestBase { // mvn exec:java -Dexec.mainClass="<CLASS_NAME>" -Dexec.args='<VERSION>' // -Dexec.classpathScope=test -Dexec.cleanupDaemonThreads=false public static void main(String[] args) throws Exception { - new DerbyDatabase().startDatabase(); + // new DerbyDatabase(); JdbcXaSinkMigrationTest test = new JdbcXaSinkMigrationTest(); test.writeSnapshot(parseVersionArg(args)); } diff --git a/pom.xml b/pom.xml index fda0c842..fa8d4837 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,6 @@ under the License. <slf4j.version>1.7.36</slf4j.version> <log4j.version>2.17.2</log4j.version> - <flink.parent.artifactId>flink-connector-jdbc-parent</flink.parent.artifactId> <!-- These 2 properties should be removed together with upgrade of flink-connector-parent to 1.1.x --> <flink.surefire.baseArgLine>-XX:+UseG1GC -Xms256m -XX:+IgnoreUnrecognizedVMOptions ${surefire.module.config}</flink.surefire.baseArgLine>
