This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new bb2fce23c [FLINK-37696][tests][postgres] Replace postgres image to 
official pg 14 and adjust test to use pgoutput plugin
bb2fce23c is described below

commit bb2fce23c763bde1cf54533eb63c373154f5cfdb
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Apr 21 11:02:42 2025 +0800

    [FLINK-37696][tests][postgres] Replace postgres image to official pg 14 and 
adjust test to use pgoutput plugin
    
    This closes #3992
---
 .../connectors/postgres/PostgreSQLSourceTest.java  | 98 ++++++++--------------
 .../cdc/connectors/postgres/PostgresTestBase.java  | 12 +--
 .../postgres/source/NewlyAddedTableITCase.java     |  1 +
 .../postgres/source/PostgresSourceExampleTest.java |  2 +-
 .../postgres/source/PostgresSourceITCase.java      |  5 ++
 .../source/reader/PostgresSourceReaderTest.java    |  1 +
 .../postgres/table/PostgreSQLConnectorITCase.java  | 63 ++++++++++++--
 .../postgres/table/PostgreSQLSavepointITCase.java  |  1 +
 .../src/test/resources/ddl/column_type_test.sql    |  9 +-
 9 files changed, 112 insertions(+), 80 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgreSQLSourceTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgreSQLSourceTest.java
index ca9edecfb..d44d7c935 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgreSQLSourceTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgreSQLSourceTest.java
@@ -39,16 +39,11 @@ import org.apache.flink.util.Preconditions;
 import com.jayway.jsonpath.JsonPath;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.PostgreSQLContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
 
 import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
@@ -67,7 +62,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertDelete;
 import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertInsert;
@@ -78,35 +72,22 @@ import static 
org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
 /** Tests for {@link PostgreSQLSource} which also heavily tests {@link 
DebeziumSourceFunction}. */
 class PostgreSQLSourceTest extends PostgresTestBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(PostgreSQLSourceTest.class);
-    private static final String SLOT_NAME = "flink";
-
-    // These tests only passes at the docker postgres:9.6
-    private static final PostgreSQLContainer<?> POSTGRES_CONTAINER_OLD =
-            new PostgreSQLContainer<>(
-                            DockerImageName.parse("debezium/postgres:9.6")
-                                    .asCompatibleSubstituteFor("postgres"))
-                    .withDatabaseName(DEFAULT_DB)
-                    .withUsername("postgres")
-                    .withPassword("postgres")
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
-
-    @BeforeAll
-    public static void startAll() {
-        LOG.info("Starting containers...");
-        Startables.deepStart(Stream.of(POSTGRES_CONTAINER_OLD)).join();
-        LOG.info("Containers are started.");
-    }
-
-    @AfterAll
-    public static void stopAll() {
-        LOG.info("Stopping containers...");
-        POSTGRES_CONTAINER_OLD.stop();
-        LOG.info("Containers are stopped.");
-    }
+    private String slotName;
 
     @BeforeEach
     public void before() {
-        initializePostgresTable(POSTGRES_CONTAINER_OLD, "inventory");
+        initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+        slotName = getSlotName();
+    }
+
+    @AfterEach
+    public void after() throws SQLException {
+        String sql = String.format("SELECT pg_drop_replication_slot('%s')", 
slotName);
+        try (Connection connection =
+                        PostgresTestBase.getJdbcConnection(POSTGRES_CONTAINER, 
"postgres");
+                Statement statement = connection.createStatement()) {
+            statement.execute(sql);
+        }
     }
 
     @Test
@@ -116,7 +97,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
 
         setupSource(source);
 
-        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
                 Statement statement = connection.createStatement()) {
             // start the source
             final CheckedThread runThread =
@@ -237,7 +218,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
             Assertions.assertThat((JsonPath.read(state, 
"$.sourcePartition.server").toString()))
                     .isEqualTo("postgres_cdc_source");
             Assertions.assertThat(JsonPath.<Integer>read(state, 
"$.sourceOffset.txId"))
-                    .isEqualTo(557);
+                    .isEqualTo(740);
             Assertions.assertThat(
                             JsonPath.<Boolean>read(state, 
"$.sourceOffset.last_snapshot_record"))
                     .isTrue();
@@ -273,7 +254,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
             
Assertions.assertThat(waitForAvailableRecords(Duration.ofSeconds(5), 
sourceContext2))
                     .isFalse();
 
-            try (Connection connection = 
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+            try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
                     Statement statement = connection.createStatement()) {
 
                 statement.execute(
@@ -295,7 +276,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
                 Assertions.assertThat(JsonPath.<String>read(state, 
"$.sourcePartition.server"))
                         .isEqualTo("postgres_cdc_source");
                 Assertions.assertThat(JsonPath.<Integer>read(state, 
"$.sourceOffset.txId"))
-                        .isEqualTo(558);
+                        .isEqualTo(741);
                 
Assertions.assertThat(state).contains("ts_usec").doesNotContain("snapshot");
                 int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
                 Assertions.assertThat(lsn).isGreaterThan(prevLsn);
@@ -341,7 +322,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
                     .isFalse();
 
             // can continue to receive new events
-            try (Connection connection = 
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+            try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
                     Statement statement = connection.createStatement()) {
                 statement.execute("DELETE FROM inventory.products WHERE 
id=1001");
             }
@@ -360,7 +341,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
             Assertions.assertThat(JsonPath.<String>read(state, 
"$.sourcePartition.server"))
                     .isEqualTo("postgres_cdc_source");
             Assertions.assertThat(JsonPath.<Integer>read(state, 
"$.sourceOffset.txId"))
-                    .isEqualTo(561);
+                    .isEqualTo(744);
             
Assertions.assertThat(state).contains("ts_usec").doesNotContain("snapshot");
             int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
             Assertions.assertThat(lsn).isGreaterThan(prevLsn);
@@ -404,7 +385,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
             Assertions.assertThat(JsonPath.<String>read(state, 
"$.sourcePartition.server"))
                     .isEqualTo("postgres_cdc_source");
             Assertions.assertThat(JsonPath.<Integer>read(state, 
"$.sourceOffset.txId"))
-                    .isEqualTo(561);
+                    .isEqualTo(744);
             
Assertions.assertThat(state).contains("ts_usec").doesNotContain("snapshot");
             int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
             Assertions.assertThat(lsn).isGreaterThan(prevLsn);
@@ -433,7 +414,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
                     };
             runThread5.start();
 
-            try (Connection connection = 
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+            try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
                     Statement statement = connection.createStatement()) {
 
                 statement.execute(
@@ -456,7 +437,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
             Assertions.assertThat(JsonPath.<String>read(state, 
"$.sourcePartition.server"))
                     .isEqualTo("postgres_cdc_source");
             Assertions.assertThat(JsonPath.<Integer>read(state, 
"$.sourceOffset.txId"))
-                    .isEqualTo(562);
+                    .isEqualTo(745);
             
Assertions.assertThat(state).contains("ts_usec").doesNotContain("snapshot");
             int pos = JsonPath.read(state, "$.sourceOffset.lsn");
             Assertions.assertThat(pos).isGreaterThan(prevLsn);
@@ -483,7 +464,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
                         }
                     };
             runThread6.start();
-            try (Connection connection = 
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+            try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
                     Statement statement = connection.createStatement()) {
 
                 statement.execute(
@@ -582,7 +563,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
             
Assertions.assertThat(flushLsn.add(getConfirmedFlushLsn())).isTrue();
 
             // verify LSN is advanced even if there is no changes on the table
-            try (Connection connection = 
getJdbcConnection(POSTGRES_CONTAINER_OLD);
+            try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
                     Statement statement = connection.createStatement()) {
                 // we have to do some transactions which is not related to the 
monitored table
                 statement.execute("CREATE TABLE dummy (a int)");
@@ -617,7 +598,7 @@ class PostgreSQLSourceTest extends PostgresTestBase {
             TestSourceContext<SourceRecord> sourceContext,
             long checkpointId)
             throws Exception {
-        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
                 Statement statement = connection.createStatement()) {
             for (int i = 0; i < num; i++) {
                 statement.execute(
@@ -648,29 +629,28 @@ class PostgreSQLSourceTest extends PostgresTestBase {
         Properties properties = new Properties();
         properties.setProperty("heartbeat.interval.ms", 
String.valueOf(heartbeatInterval));
         return PostgreSQLSource.<SourceRecord>builder()
-                .hostname(POSTGRES_CONTAINER_OLD.getHost())
-                .port(POSTGRES_CONTAINER_OLD.getMappedPort(POSTGRESQL_PORT))
-                .database(POSTGRES_CONTAINER_OLD.getDatabaseName())
-                .username(POSTGRES_CONTAINER_OLD.getUsername())
-                .password(POSTGRES_CONTAINER_OLD.getPassword())
+                .hostname(POSTGRES_CONTAINER.getHost())
+                .port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                .database(POSTGRES_CONTAINER.getDatabaseName())
+                .username(POSTGRES_CONTAINER.getUsername())
+                .password(POSTGRES_CONTAINER.getPassword())
                 .schemaList("inventory")
                 .tableList("inventory.products")
                 .deserializer(new ForwardDeserializeSchema())
-                .slotName(SLOT_NAME)
+                .decodingPluginName("pgoutput")
+                .slotName(slotName)
                 .debeziumProperties(properties)
                 .build();
     }
 
     private String getConfirmedFlushLsn() throws SQLException {
-        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
                 Statement statement = connection.createStatement()) {
             ResultSet rs =
                     statement.executeQuery(
                             String.format(
                                     "select * from pg_replication_slots where 
slot_name = '%s' and database = '%s' and plugin = '%s'",
-                                    SLOT_NAME,
-                                    POSTGRES_CONTAINER_OLD.getDatabaseName(),
-                                    "decoderbufs"));
+                                    slotName, 
POSTGRES_CONTAINER.getDatabaseName(), "pgoutput"));
             if (rs.next()) {
                 return rs.getString("confirmed_flush_lsn");
             } else {
@@ -905,14 +885,6 @@ class PostgreSQLSourceTest extends PostgresTestBase {
             list.add(value);
         }
 
-        public List<T> getList() {
-            return list;
-        }
-
-        boolean isClearCalled() {
-            return clearCalled;
-        }
-
         @Override
         public void update(List<T> values) throws Exception {
             clear();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
index 069bddd4c..04df0e7f6 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
@@ -60,14 +60,13 @@ import java.util.stream.Stream;
  * log.
  */
 public abstract class PostgresTestBase extends AbstractTestBase {
-    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresTestBase.class);
+    protected static final Logger LOG = 
LoggerFactory.getLogger(PostgresTestBase.class);
     public static final Pattern COMMENT_PATTERN = 
Pattern.compile("^(.*)--.*$");
     public static final String DEFAULT_DB = "postgres";
 
-    // use newer version of postgresql image to support pgoutput plugin
-    // when testing postgres 13, only 13-alpine supports both amd64 and arm64
+    // use official postgresql image to support pgoutput plugin
     protected static final DockerImageName PG_IMAGE =
-            
DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres");
+            
DockerImageName.parse("postgres:14").asCompatibleSubstituteFor("postgres");
 
     public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
             new PostgreSQLContainer<>(PG_IMAGE)
@@ -81,7 +80,9 @@ public abstract class PostgresTestBase extends 
AbstractTestBase {
                             // default
                             "fsync=off",
                             "-c",
-                            "max_replication_slots=20");
+                            "max_replication_slots=20",
+                            "-c",
+                            "wal_level=logical");
 
     @BeforeAll
     static void startContainers() throws Exception {
@@ -217,6 +218,7 @@ public abstract class PostgresTestBase extends 
AbstractTestBase {
         postgresSourceConfigFactory.splitSize(splitSize);
         postgresSourceConfigFactory.skipSnapshotBackfill(skipSnapshotBackfill);
         postgresSourceConfigFactory.setLsnCommitCheckpointsDelay(1);
+        postgresSourceConfigFactory.decodingPluginName("pgoutput");
         return postgresSourceConfigFactory;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
index e1f4298e1..f9016cd27 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
@@ -942,6 +942,7 @@ class NewlyAddedTableITCase extends PostgresTestBase {
                         + " 'username' = '%s',"
                         + " 'password' = '%s',"
                         + " 'database-name' = '%s',"
+                        + " 'decoding.plugin.name' = 'pgoutput', "
                         + " 'schema-name' = '%s',"
                         + " 'table-name' = '%s',"
                         + " 'slot.name' = '%s', "
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceExampleTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceExampleTest.java
index 3445114d0..7c1c4fb8c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceExampleTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceExampleTest.java
@@ -68,7 +68,7 @@ class PostgresSourceExampleTest extends PostgresTestBase {
     private static final String TABLE_ID = SCHEMA_NAME + ".products";
 
     private static final String SLOT_NAME = "flink";
-    private static final String PLUGIN_NAME = "decoderbufs";
+    private static final String PLUGIN_NAME = "pgoutput";
     private static final long CHECKPOINT_INTERVAL_MS = 3000;
 
     private static final int DEFAULT_PARALLELISM = 2;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 0cee09744..5d212c342 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -340,6 +340,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                                 + " 'table-name' = '%s',"
                                 + " 'scan.startup.mode' = '%s',"
                                 + " 'scan.incremental.snapshot.chunk.size' = 
'100',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s', "
                                 + " 'debezium.slot.drop.on.stop' = 'true'"
                                 + ")",
@@ -644,6 +645,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                                 + " 'table-name' = '%s',"
                                 + " 'scan.startup.mode' = '%s',"
                                 + " 'scan.incremental.snapshot.chunk.size' = 
'100',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s',"
                                 + " 'scan.incremental.snapshot.backfill.skip' 
= '%s'"
                                 + ")",
@@ -760,6 +762,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                                 + " 'table-name' = '%s',"
                                 + " 'scan.startup.mode' = '%s',"
                                 + " 'scan.incremental.snapshot.chunk.size' = 
'100',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s',"
                                 + " 'scan.incremental.snapshot.backfill.skip' 
= '%s',"
                                 + " 'scan.newly-added-table.enabled' = 'true',"
@@ -823,6 +826,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                         .username(customDatabase.getUsername())
                         .password(customDatabase.getPassword())
                         .database(customDatabase.getDatabaseName())
+                        .decodingPluginName("pgoutput")
                         .slotName(slotName)
                         .tableList(tableId.toString())
                         .startupOptions(startupOptions)
@@ -962,6 +966,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                                 + " 'table-name' = '%s',"
                                 + " 'scan.startup.mode' = '%s',"
                                 + " 'scan.incremental.snapshot.chunk.size' = 
'100',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s',"
                                 + " 'scan.lsn-commit.checkpoints-num-delay' = 
'1'"
                                 + " %s"
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
index 5f7ab8dbc..905d47131 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -77,6 +77,7 @@ class PostgresSourceReaderTest {
         configFactory.database("pgdb");
         configFactory.username("username");
         configFactory.password("password");
+        configFactory.decodingPluginName("pgoutput");
         configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
         final TestTable customerTable =
                 new TestTable(ResolvedSchema.of(Column.physical("id", 
BIGINT())));
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
index e837f4cc6..a163df921 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
@@ -31,9 +31,15 @@ import org.apache.flink.types.RowUtils;
 import org.apache.flink.util.CloseableIterator;
 
 import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
 
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -42,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
 
 import static 
org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
 
@@ -54,10 +61,45 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
             StreamTableEnvironment.create(
                     env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
 
+    /** Use postgis plugin to test the GIS type. */
+    protected static final DockerImageName POSTGIS_IMAGE =
+            
DockerImageName.parse("postgis/postgis:14-3.5").asCompatibleSubstituteFor("postgres");
+
+    public static final PostgreSQLContainer<?> POSTGIS_CONTAINER =
+            new PostgreSQLContainer<>(POSTGIS_IMAGE)
+                    .withDatabaseName(DEFAULT_DB)
+                    .withUsername("postgres")
+                    .withPassword("postgres")
+                    .withLogConsumer(new Slf4jLogConsumer(LOG))
+                    .withCommand(
+                            "postgres",
+                            "-c",
+                            // default
+                            "fsync=off",
+                            "-c",
+                            "max_replication_slots=20",
+                            "-c",
+                            "wal_level=logical");
+
     @RegisterExtension
     public static StaticExternalResourceProxy<LegacyRowResource> 
usesLegacyRows =
             new StaticExternalResourceProxy<>(LegacyRowResource.INSTANCE);
 
+    @BeforeAll
+    static void startContainers() throws Exception {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(POSTGRES_CONTAINER, 
POSTGIS_CONTAINER)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @AfterAll
+    static void stopContainers() {
+        LOG.info("Stopping containers...");
+        POSTGIS_CONTAINER.stop();
+        POSTGIS_CONTAINER.stop();
+        LOG.info("Containers are stopped.");
+    }
+
     void setup(boolean parallelismSnapshot) {
         TestValuesTableFactory.clearAllData();
         env.setRestartStrategy(RestartStrategies.noRestart());
@@ -92,6 +134,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
                                 + " 'schema-name' = '%s',"
                                 + " 'table-name' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s'"
                                 + ")",
                         POSTGRES_CONTAINER.getHost(),
@@ -211,6 +254,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
                                 + " 'schema-name' = '%s',"
                                 + " 'table-name' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s',"
                                 + " 'scan.startup.mode' = 'latest-offset'"
                                 + ")",
@@ -283,6 +327,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
                                 + " 'schema-name' = '%s',"
                                 + " 'table-name' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s'"
                                 + ")",
                         POSTGRES_CONTAINER.getHost(),
@@ -343,7 +388,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
     @ValueSource(booleans = {true, false})
     void testAllTypes(boolean parallelismSnapshot) throws Throwable {
         setup(parallelismSnapshot);
-        initializePostgresTable(POSTGRES_CONTAINER, "column_type_test");
+        initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
 
         String sourceDDL =
                 String.format(
@@ -379,13 +424,14 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
                                 + " 'schema-name' = '%s',"
                                 + " 'table-name' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s'"
                                 + ")",
-                        POSTGRES_CONTAINER.getHost(),
-                        POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
-                        POSTGRES_CONTAINER.getUsername(),
-                        POSTGRES_CONTAINER.getPassword(),
-                        POSTGRES_CONTAINER.getDatabaseName(),
+                        POSTGIS_CONTAINER.getHost(),
+                        POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+                        POSTGIS_CONTAINER.getUsername(),
+                        POSTGIS_CONTAINER.getPassword(),
+                        POSTGIS_CONTAINER.getDatabaseName(),
                         "inventory",
                         "full_types",
                         parallelismSnapshot,
@@ -429,7 +475,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
         Thread.sleep(5000);
 
         // generate WAL
-        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+        try (Connection connection = getJdbcConnection(POSTGIS_CONTAINER);
                 Statement statement = connection.createStatement()) {
             statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE 
id=1;");
         }
@@ -474,6 +520,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
                                 + " 'schema-name' = '%s',"
                                 + " 'table-name' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s'"
                                 + ")",
                         POSTGRES_CONTAINER.getHost(),
@@ -612,6 +659,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
                                 + " 'table-name' = '%s',"
                                 + " 'slot.name' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'changelog-mode' = '%s'"
                                 + ")",
                         POSTGRES_CONTAINER.getHost(),
@@ -739,6 +787,7 @@ class PostgreSQLConnectorITCase extends PostgresTestBase {
                                 + (parallelismSnapshot
                                         ? " 'scan.startup.mode' = 
'latest-offset',"
                                         : "")
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s'"
                                 + ")",
                         POSTGRES_CONTAINER.getHost(),
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
index f013418bf..b07258403 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java
@@ -88,6 +88,7 @@ class PostgreSQLSavepointITCase extends PostgresTestBase {
                                 + " 'table-name' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'true',"
                                 + " 'scan.incremental.snapshot.chunk.size' = 
'2',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
                                 + " 'slot.name' = '%s'"
                                 + ")",
                         POSTGRES_CONTAINER.getHost(),
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
index ba56a9cbc..2d3005b8a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql
@@ -19,8 +19,9 @@
 -- Generate a number of tables to cover as many of the PG types as possible
 DROP SCHEMA IF EXISTS inventory CASCADE;
 CREATE SCHEMA inventory;
-SET search_path TO inventory;
-CREATE EXTENSION postgis;
+-- postgis is installed into public schema
+SET search_path TO inventory, public;
+
 
 CREATE TABLE full_types
 (
@@ -48,10 +49,10 @@ CREATE TABLE full_types
     PRIMARY KEY (id)
 );
 
-ALTER TABLE full_types
+ALTER TABLE inventory.full_types
     REPLICA IDENTITY FULL;
 
-INSERT INTO full_types
+INSERT INTO inventory.full_types
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
         '2020-07-17', '18:00:22', 500, 'SRID=3187;POINT(174.9479 
-36.7208)'::geometry,


Reply via email to