This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new bb2fce23c [FLINK-37696][tests][postgres] Replace postgres image to
official pg 14 and adjust test to use pgoutput plugin
bb2fce23c is described below
commit bb2fce23c763bde1cf54533eb63c373154f5cfdb
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Apr 21 11:02:42 2025 +0800
[FLINK-37696][tests][postgres] Replace postgres image to official pg 14 and
adjust test to use pgoutput plugin
This closes #3992
---
.../connectors/postgres/PostgreSQLSourceTest.java | 98 ++++++++--------------
.../cdc/connectors/postgres/PostgresTestBase.java | 12 +--
.../postgres/source/NewlyAddedTableITCase.java | 1 +
.../postgres/source/PostgresSourceExampleTest.java | 2 +-
.../postgres/source/PostgresSourceITCase.java | 5 ++
.../source/reader/PostgresSourceReaderTest.java | 1 +
.../postgres/table/PostgreSQLConnectorITCase.java | 63 ++++++++++++--
.../postgres/table/PostgreSQLSavepointITCase.java | 1 +
.../src/test/resources/ddl/column_type_test.sql | 9 +-
9 files changed, 112 insertions(+), 80 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgreSQLSourceTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgreSQLSourceTest.java
index ca9edecfb..d44d7c935 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgreSQLSourceTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgreSQLSourceTest.java
@@ -39,16 +39,11 @@ import org.apache.flink.util.Preconditions;
import com.jayway.jsonpath.JsonPath;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.PostgreSQLContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
@@ -67,7 +62,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertDelete;
import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertInsert;
@@ -78,35 +72,22 @@ import static
org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
/** Tests for {@link PostgreSQLSource} which also heavily tests {@link
DebeziumSourceFunction}. */
class PostgreSQLSourceTest extends PostgresTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(PostgreSQLSourceTest.class);
- private static final String SLOT_NAME = "flink";
-
- // These tests only passes at the docker postgres:9.6
- private static final PostgreSQLContainer<?> POSTGRES_CONTAINER_OLD =
- new PostgreSQLContainer<>(
- DockerImageName.parse("debezium/postgres:9.6")
- .asCompatibleSubstituteFor("postgres"))
- .withDatabaseName(DEFAULT_DB)
- .withUsername("postgres")
- .withPassword("postgres")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
- @BeforeAll
- public static void startAll() {
- LOG.info("Starting containers...");
- Startables.deepStart(Stream.of(POSTGRES_CONTAINER_OLD)).join();
- LOG.info("Containers are started.");
- }
-
- @AfterAll
- public static void stopAll() {
- LOG.info("Stopping containers...");
- POSTGRES_CONTAINER_OLD.stop();
- LOG.info("Containers are stopped.");
- }
+ private String slotName;
@BeforeEach
public void before() {
- initializePostgresTable(POSTGRES_CONTAINER_OLD, "inventory");
+ initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+ slotName = getSlotName();
+ }
+
+ @AfterEach
+ public void after() throws SQLException {
+ String sql = String.format("SELECT pg_drop_replication_slot('%s')",
slotName);
+ try (Connection connection =
+ PostgresTestBase.getJdbcConnection(POSTGRES_CONTAINER,
"postgres");
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ }
}
@Test
@@ -116,7 +97,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
setupSource(source);
- try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
// start the source
final CheckedThread runThread =
@@ -237,7 +218,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
Assertions.assertThat((JsonPath.read(state,
"$.sourcePartition.server").toString()))
.isEqualTo("postgres_cdc_source");
Assertions.assertThat(JsonPath.<Integer>read(state,
"$.sourceOffset.txId"))
- .isEqualTo(557);
+ .isEqualTo(740);
Assertions.assertThat(
JsonPath.<Boolean>read(state,
"$.sourceOffset.last_snapshot_record"))
.isTrue();
@@ -273,7 +254,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
Assertions.assertThat(waitForAvailableRecords(Duration.ofSeconds(5),
sourceContext2))
.isFalse();
- try (Connection connection =
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
@@ -295,7 +276,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
Assertions.assertThat(JsonPath.<String>read(state,
"$.sourcePartition.server"))
.isEqualTo("postgres_cdc_source");
Assertions.assertThat(JsonPath.<Integer>read(state,
"$.sourceOffset.txId"))
- .isEqualTo(558);
+ .isEqualTo(741);
Assertions.assertThat(state).contains("ts_usec").doesNotContain("snapshot");
int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
Assertions.assertThat(lsn).isGreaterThan(prevLsn);
@@ -341,7 +322,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
.isFalse();
// can continue to receive new events
- try (Connection connection =
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute("DELETE FROM inventory.products WHERE
id=1001");
}
@@ -360,7 +341,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
Assertions.assertThat(JsonPath.<String>read(state,
"$.sourcePartition.server"))
.isEqualTo("postgres_cdc_source");
Assertions.assertThat(JsonPath.<Integer>read(state,
"$.sourceOffset.txId"))
- .isEqualTo(561);
+ .isEqualTo(744);
Assertions.assertThat(state).contains("ts_usec").doesNotContain("snapshot");
int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
Assertions.assertThat(lsn).isGreaterThan(prevLsn);
@@ -404,7 +385,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
Assertions.assertThat(JsonPath.<String>read(state,
"$.sourcePartition.server"))
.isEqualTo("postgres_cdc_source");
Assertions.assertThat(JsonPath.<Integer>read(state,
"$.sourceOffset.txId"))
- .isEqualTo(561);
+ .isEqualTo(744);
Assertions.assertThat(state).contains("ts_usec").doesNotContain("snapshot");
int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
Assertions.assertThat(lsn).isGreaterThan(prevLsn);
@@ -433,7 +414,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
};
runThread5.start();
- try (Connection connection =
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
@@ -456,7 +437,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
Assertions.assertThat(JsonPath.<String>read(state,
"$.sourcePartition.server"))
.isEqualTo("postgres_cdc_source");
Assertions.assertThat(JsonPath.<Integer>read(state,
"$.sourceOffset.txId"))
- .isEqualTo(562);
+ .isEqualTo(745);
Assertions.assertThat(state).contains("ts_usec").doesNotContain("snapshot");
int pos = JsonPath.read(state, "$.sourceOffset.lsn");
Assertions.assertThat(pos).isGreaterThan(prevLsn);
@@ -483,7 +464,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
}
};
runThread6.start();
- try (Connection connection =
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
@@ -582,7 +563,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
Assertions.assertThat(flushLsn.add(getConfirmedFlushLsn())).isTrue();
// verify LSN is advanced even if there is no changes on the table
- try (Connection connection =
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
// we have to do some transactions which is not related to the
monitored table
statement.execute("CREATE TABLE dummy (a int)");
@@ -617,7 +598,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
TestSourceContext<SourceRecord> sourceContext,
long checkpointId)
throws Exception {
- try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
for (int i = 0; i < num; i++) {
statement.execute(
@@ -648,29 +629,28 @@ class PostgreSQLSourceTest extends PostgresTestBase {
Properties properties = new Properties();
properties.setProperty("heartbeat.interval.ms",
String.valueOf(heartbeatInterval));
return PostgreSQLSource.<SourceRecord>builder()
- .hostname(POSTGRES_CONTAINER_OLD.getHost())
- .port(POSTGRES_CONTAINER_OLD.getMappedPort(POSTGRESQL_PORT))
- .database(POSTGRES_CONTAINER_OLD.getDatabaseName())
- .username(POSTGRES_CONTAINER_OLD.getUsername())
- .password(POSTGRES_CONTAINER_OLD.getPassword())
+ .hostname(POSTGRES_CONTAINER.getHost())
+ .port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .database(POSTGRES_CONTAINER.getDatabaseName())
+ .username(POSTGRES_CONTAINER.getUsername())
+ .password(POSTGRES_CONTAINER.getPassword())
.schemaList("inventory")
.tableList("inventory.products")
.deserializer(new ForwardDeserializeSchema())
- .slotName(SLOT_NAME)
+ .decodingPluginName("pgoutput")
+ .slotName(slotName)
.debeziumProperties(properties)
.build();
}
private String getConfirmedFlushLsn() throws SQLException {
- try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
ResultSet rs =
statement.executeQuery(
String.format(
"select * from pg_replication_slots where
slot_name = '%s' and database = '%s' and plugin = '%s'",
- SLOT_NAME,
- POSTGRES_CONTAINER_OLD.getDatabaseName(),
- "decoderbufs"));
+ slotName,
POSTGRES_CONTAINER.getDatabaseName(), "pgoutput"));
if (rs.next()) {
return rs.getString("confirmed_flush_lsn");
} else {
@@ -905,14 +885,6 @@ class PostgreSQLSourceTest extends PostgresTestBase {
list.add(value);
}
- public List<T> getList() {
- return list;
- }
-
- boolean isClearCalled() {
- return clearCalled;
- }
-
@Override
public void update(List<T> values) throws Exception {
clear();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
index 069bddd4c..04df0e7f6 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
@@ -60,14 +60,13 @@ import java.util.stream.Stream;
* log.
*/
public abstract class PostgresTestBase extends AbstractTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(PostgresTestBase.class);
+ protected static final Logger LOG =
LoggerFactory.getLogger(PostgresTestBase.class);
public static final Pattern COMMENT_PATTERN =
Pattern.compile("^(.*)--.*$");
public static final String DEFAULT_DB = "postgres";
- // use newer version of postgresql image to support pgoutput plugin
- // when testing postgres 13, only 13-alpine supports both amd64 and arm64
+ // use official postgresql image to support pgoutput plugin
protected static final DockerImageName PG_IMAGE =
-
DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres");
+
DockerImageName.parse("postgres:14").asCompatibleSubstituteFor("postgres");
public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
new PostgreSQLContainer<>(PG_IMAGE)
@@ -81,7 +80,9 @@ public abstract class PostgresTestBase extends
AbstractTestBase {
// default
"fsync=off",
"-c",
- "max_replication_slots=20");
+ "max_replication_slots=20",
+ "-c",
+ "wal_level=logical");
@BeforeAll
static void startContainers() throws Exception {
@@ -217,6 +218,7 @@ public abstract class PostgresTestBase extends
AbstractTestBase {
postgresSourceConfigFactory.splitSize(splitSize);
postgresSourceConfigFactory.skipSnapshotBackfill(skipSnapshotBackfill);
postgresSourceConfigFactory.setLsnCommitCheckpointsDelay(1);
+ postgresSourceConfigFactory.decodingPluginName("pgoutput");
return postgresSourceConfigFactory;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
index e1f4298e1..f9016cd27 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
@@ -942,6 +942,7 @@ class NewlyAddedTableITCase extends PostgresTestBase {
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'slot.name' = '%s', "
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceExampleTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceExampleTest.java
index 3445114d0..7c1c4fb8c 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceExampleTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceExampleTest.java
@@ -68,7 +68,7 @@ class PostgresSourceExampleTest extends PostgresTestBase {
private static final String TABLE_ID = SCHEMA_NAME + ".products";
private static final String SLOT_NAME = "flink";
- private static final String PLUGIN_NAME = "decoderbufs";
+ private static final String PLUGIN_NAME = "pgoutput";
private static final long CHECKPOINT_INTERVAL_MS = 3000;
private static final int DEFAULT_PARALLELISM = 2;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 0cee09744..5d212c342 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -340,6 +340,7 @@ class PostgresSourceITCase extends PostgresTestBase {
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' =
'100',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s', "
+ " 'debezium.slot.drop.on.stop' = 'true'"
+ ")",
@@ -644,6 +645,7 @@ class PostgresSourceITCase extends PostgresTestBase {
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' =
'100',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s',"
+ " 'scan.incremental.snapshot.backfill.skip'
= '%s'"
+ ")",
@@ -760,6 +762,7 @@ class PostgresSourceITCase extends PostgresTestBase {
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' =
'100',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s',"
+ " 'scan.incremental.snapshot.backfill.skip'
= '%s',"
+ " 'scan.newly-added-table.enabled' = 'true',"
@@ -823,6 +826,7 @@ class PostgresSourceITCase extends PostgresTestBase {
.username(customDatabase.getUsername())
.password(customDatabase.getPassword())
.database(customDatabase.getDatabaseName())
+ .decodingPluginName("pgoutput")
.slotName(slotName)
.tableList(tableId.toString())
.startupOptions(startupOptions)
@@ -962,6 +966,7 @@ class PostgresSourceITCase extends PostgresTestBase {
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' =
'100',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s',"
+ " 'scan.lsn-commit.checkpoints-num-delay' =
'1'"
+ " %s"
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
index 5f7ab8dbc..905d47131 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -77,6 +77,7 @@ class PostgresSourceReaderTest {
configFactory.database("pgdb");
configFactory.username("username");
configFactory.password("password");
+ configFactory.decodingPluginName("pgoutput");
configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
final TestTable customerTable =
new TestTable(ResolvedSchema.of(Column.physical("id",
BIGINT())));
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
index e837f4cc6..a163df921 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
@@ -31,9 +31,15 @@ import org.apache.flink.types.RowUtils;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
import java.sql.Connection;
import java.sql.SQLException;
@@ -42,6 +48,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
import static
org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
@@ -54,10 +61,45 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().inStreamingMode().build());
+ /** Use postgis plugin to test the GIS type. */
+ protected static final DockerImageName POSTGIS_IMAGE =
+
DockerImageName.parse("postgis/postgis:14-3.5").asCompatibleSubstituteFor("postgres");
+
+ public static final PostgreSQLContainer<?> POSTGIS_CONTAINER =
+ new PostgreSQLContainer<>(POSTGIS_IMAGE)
+ .withDatabaseName(DEFAULT_DB)
+ .withUsername("postgres")
+ .withPassword("postgres")
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .withCommand(
+ "postgres",
+ "-c",
+ // default
+ "fsync=off",
+ "-c",
+ "max_replication_slots=20",
+ "-c",
+ "wal_level=logical");
+
@RegisterExtension
public static StaticExternalResourceProxy<LegacyRowResource>
usesLegacyRows =
new StaticExternalResourceProxy<>(LegacyRowResource.INSTANCE);
+ @BeforeAll
+ static void startContainers() throws Exception {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(POSTGRES_CONTAINER,
POSTGIS_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @AfterAll
+ static void stopContainers() {
+ LOG.info("Stopping containers...");
+ POSTGIS_CONTAINER.stop();
+ POSTGIS_CONTAINER.stop();
+ LOG.info("Containers are stopped.");
+ }
+
void setup(boolean parallelismSnapshot) {
TestValuesTableFactory.clearAllData();
env.setRestartStrategy(RestartStrategies.noRestart());
@@ -92,6 +134,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s'"
+ ")",
POSTGRES_CONTAINER.getHost(),
@@ -211,6 +254,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s',"
+ " 'scan.startup.mode' = 'latest-offset'"
+ ")",
@@ -283,6 +327,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s'"
+ ")",
POSTGRES_CONTAINER.getHost(),
@@ -343,7 +388,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
@ValueSource(booleans = {true, false})
void testAllTypes(boolean parallelismSnapshot) throws Throwable {
setup(parallelismSnapshot);
- initializePostgresTable(POSTGRES_CONTAINER, "column_type_test");
+ initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
String sourceDDL =
String.format(
@@ -379,13 +424,14 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s'"
+ ")",
- POSTGRES_CONTAINER.getHost(),
- POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
- POSTGRES_CONTAINER.getUsername(),
- POSTGRES_CONTAINER.getPassword(),
- POSTGRES_CONTAINER.getDatabaseName(),
+ POSTGIS_CONTAINER.getHost(),
+ POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+ POSTGIS_CONTAINER.getUsername(),
+ POSTGIS_CONTAINER.getPassword(),
+ POSTGIS_CONTAINER.getDatabaseName(),
"inventory",
"full_types",
parallelismSnapshot,
@@ -429,7 +475,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
Thread.sleep(5000);
// generate WAL
- try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ try (Connection connection = getJdbcConnection(POSTGIS_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE
id=1;");
}
@@ -474,6 +520,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s'"
+ ")",
POSTGRES_CONTAINER.getHost(),
@@ -612,6 +659,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'table-name' = '%s',"
+ " 'slot.name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'changelog-mode' = '%s'"
+ ")",
POSTGRES_CONTAINER.getHost(),
@@ -739,6 +787,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
+ (parallelismSnapshot
? " 'scan.startup.mode' =
'latest-offset',"
: "")
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s'"
+ ")",
POSTGRES_CONTAINER.getHost(),
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
index f013418bf..b07258403 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
@@ -88,6 +88,7 @@ class PostgreSQLSavepointITCase extends PostgresTestBase {
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'true',"
+ " 'scan.incremental.snapshot.chunk.size' =
'2',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s'"
+ ")",
POSTGRES_CONTAINER.getHost(),
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
index ba56a9cbc..2d3005b8a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
@@ -19,8 +19,9 @@
-- Generate a number of tables to cover as many of the PG types as possible
DROP SCHEMA IF EXISTS inventory CASCADE;
CREATE SCHEMA inventory;
-SET search_path TO inventory;
-CREATE EXTENSION postgis;
+-- postgis is installed into public schema
+SET search_path TO inventory, public;
+
CREATE TABLE full_types
(
@@ -48,10 +49,10 @@ CREATE TABLE full_types
PRIMARY KEY (id)
);
-ALTER TABLE full_types
+ALTER TABLE inventory.full_types
REPLICA IDENTITY FULL;
-INSERT INTO full_types
+INSERT INTO inventory.full_types
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
'2020-07-17', '18:00:22', 500, 'SRID=3187;POINT(174.9479
-36.7208)'::geometry,