This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new be4fb977e18 Support managed jdbc io (Postgres) (#36034)
be4fb977e18 is described below

commit be4fb977e181e86fd68753218563c5884744df02
Author: Shunping Huang <shunp...@google.com>
AuthorDate: Mon Sep 8 15:35:39 2025 -0400

    Support managed jdbc io (Postgres) (#36034)
    
    * Add postgres read to managed io
    
    * Add postgres write to managed io
    
    * Add integration tests for both managed and unmanaged postgres read and 
write.
    
    * Fix error in analyzeClassesDependencies gradle task
    
    * Fix spotless failure.
    
    * Fix python lint
    
    * Add schema transform translation for postgres read and write.
    
    * Add test for postgres schema transform translation.
    
    * Address reviewer's feedback.
---
 .../model/pipeline/v1/external_transforms.proto    |   4 +
 sdks/java/io/jdbc/build.gradle                     |   3 +
 .../io/jdbc/JdbcReadSchemaTransformProvider.java   |  18 ++
 .../io/jdbc/JdbcWriteSchemaTransformProvider.java  |  18 ++
 .../PostgresSchemaTransformTranslation.java        |  93 ++++++++
 .../ReadFromPostgresSchemaTransformProvider.java   |  48 ++++-
 .../WriteToPostgresSchemaTransformProvider.java    |  38 +++-
 .../apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java  | 178 ++++++++++++++++
 .../PostgresSchemaTransformTranslationTest.java    | 233 +++++++++++++++++++++
 .../java/org/apache/beam/sdk/managed/Managed.java  |   3 +
 sdks/python/apache_beam/transforms/external.py     |   4 +-
 sdks/python/apache_beam/transforms/managed.py      |   7 +-
 12 files changed, 642 insertions(+), 5 deletions(-)

diff --git 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index add8a1999ca..02a5dd18e2c 100644
--- 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++ 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -76,6 +76,10 @@ message ManagedTransforms {
       "beam:schematransform:org.apache.beam:bigquery_write:v1"];
     ICEBERG_CDC_READ = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
       "beam:schematransform:org.apache.beam:iceberg_cdc_read:v1"];
+    POSTGRES_READ = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:schematransform:org.apache.beam:postgres_read:v1"];
+    POSTGRES_WRITE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:schematransform:org.apache.beam:postgres_write:v1"];
   }
 }
 
diff --git a/sdks/java/io/jdbc/build.gradle b/sdks/java/io/jdbc/build.gradle
index 8c5fa685fda..87a231a5a42 100644
--- a/sdks/java/io/jdbc/build.gradle
+++ b/sdks/java/io/jdbc/build.gradle
@@ -29,6 +29,7 @@ ext.summary = "IO to read and write on JDBC datasource."
 dependencies {
   implementation library.java.vendored_guava_32_1_2_jre
   implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation project(path: ":model:pipeline", configuration: "shadow")
   implementation library.java.dbcp2
   implementation library.java.joda_time
   implementation "org.apache.commons:commons-pool2:2.11.1"
@@ -39,8 +40,10 @@ dependencies {
   testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
   testImplementation project(path: ":sdks:java:extensions:avro", 
configuration: "testRuntimeMigration")
   testImplementation project(path: ":sdks:java:io:common")
+  testImplementation project(path: ":sdks:java:managed")
   testImplementation project(path: ":sdks:java:testing:test-utils")
   testImplementation library.java.junit
+  testImplementation library.java.mockito_inline
   testImplementation library.java.slf4j_api
   testImplementation library.java.postgres
 
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
index 6777be50ab5..da75c9baaa4 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -265,6 +267,20 @@ public class JdbcReadSchemaTransformProvider
       }
       return PCollectionRowTuple.of("output", 
input.getPipeline().apply(readRows));
     }
+
+    public Row getConfigurationRow() {
+      try {
+        // To stay consistent with our SchemaTransform configuration naming 
conventions,
+        // we sort lexicographically
+        return SchemaRegistry.createDefault()
+            .getToRowFunction(JdbcReadSchemaTransformConfiguration.class)
+            .apply(config)
+            .sorted()
+            .toSnakeCase();
+      } catch (NoSuchSchemaException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
   @Override
@@ -401,6 +417,8 @@ public class JdbcReadSchemaTransformProvider
           .Builder();
     }
 
+    public abstract Builder toBuilder();
+
     @AutoValue.Builder
     public abstract static class Builder {
       public abstract Builder setDriverClassName(String value);
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
index 6f10df56aab..4dbb9b396f0 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
@@ -27,7 +27,9 @@ import java.util.List;
 import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -265,6 +267,20 @@ public class JdbcWriteSchemaTransformProvider
               .setRowSchema(Schema.of());
       return PCollectionRowTuple.of("post_write", postWrite);
     }
+
+    public Row getConfigurationRow() {
+      try {
+        // To stay consistent with our SchemaTransform configuration naming 
conventions,
+        // we sort lexicographically
+        return SchemaRegistry.createDefault()
+            .getToRowFunction(JdbcWriteSchemaTransformConfiguration.class)
+            .apply(config)
+            .sorted()
+            .toSnakeCase();
+      } catch (NoSuchSchemaException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
   @Override
@@ -382,6 +398,8 @@ public class JdbcWriteSchemaTransformProvider
           .Builder();
     }
 
+    public abstract Builder toBuilder();
+
     @AutoValue.Builder
     public abstract static class Builder {
       public abstract Builder setDriverClassName(String value);
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslation.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslation.java
new file mode 100644
index 00000000000..288b29642c5
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static 
org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform;
+import static 
org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class PostgresSchemaTransformTranslation {
+  static class PostgresReadSchemaTransformTranslator
+      extends SchemaTransformPayloadTranslator<PostgresReadSchemaTransform> {
+    @Override
+    public SchemaTransformProvider provider() {
+      return new ReadFromPostgresSchemaTransformProvider();
+    }
+
+    @Override
+    public Row toConfigRow(PostgresReadSchemaTransform transform) {
+      return transform.getConfigurationRow();
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class ReadRegistrar implements 
TransformPayloadTranslatorRegistrar {
+    @Override
+    @SuppressWarnings({
+      "rawtypes",
+    })
+    public Map<
+            ? extends Class<? extends PTransform>,
+            ? extends PTransformTranslation.TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap
+          .<Class<? extends PTransform>, 
PTransformTranslation.TransformPayloadTranslator>builder()
+          .put(PostgresReadSchemaTransform.class, new 
PostgresReadSchemaTransformTranslator())
+          .build();
+    }
+  }
+
+  static class PostgresWriteSchemaTransformTranslator
+      extends SchemaTransformPayloadTranslator<PostgresWriteSchemaTransform> {
+    @Override
+    public SchemaTransformProvider provider() {
+      return new WriteToPostgresSchemaTransformProvider();
+    }
+
+    @Override
+    public Row toConfigRow(PostgresWriteSchemaTransform transform) {
+      return transform.getConfigurationRow();
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class WriteRegistrar implements 
TransformPayloadTranslatorRegistrar {
+    @Override
+    @SuppressWarnings({
+      "rawtypes",
+    })
+    public Map<
+            ? extends Class<? extends PTransform>,
+            ? extends PTransformTranslation.TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap
+          .<Class<? extends PTransform>, 
PTransformTranslation.TransformPayloadTranslator>builder()
+          .put(PostgresWriteSchemaTransform.class, new 
PostgresWriteSchemaTransformTranslator())
+          .build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
index 62ff14c23e0..8755ce0ecca 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
@@ -18,20 +18,30 @@
 package org.apache.beam.sdk.io.jdbc.providers;
 
 import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 
 import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(SchemaTransformProvider.class)
 public class ReadFromPostgresSchemaTransformProvider extends 
JdbcReadSchemaTransformProvider {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReadFromPostgresSchemaTransformProvider.class);
+
   @Override
   public @UnknownKeyFor @NonNull @Initialized String identifier() {
-    return "beam:schematransform:org.apache.beam:postgres_read:v1";
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ);
   }
 
   @Override
@@ -43,4 +53,40 @@ public class ReadFromPostgresSchemaTransformProvider extends 
JdbcReadSchemaTrans
   protected String jdbcType() {
     return POSTGRES;
   }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    String jdbcType = configuration.getJdbcType();
+    if (jdbcType != null && !jdbcType.equals(jdbcType())) {
+      throw new IllegalArgumentException(
+          String.format("Wrong JDBC type. Expected '%s' but got '%s'", 
jdbcType(), jdbcType));
+    }
+
+    List<@org.checkerframework.checker.nullness.qual.Nullable String> 
connectionInitSql =
+        configuration.getConnectionInitSql();
+    if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
+      LOG.warn("Postgres does not support connectionInitSql, ignoring.");
+    }
+
+    Boolean disableAutoCommit = configuration.getDisableAutoCommit();
+    if (disableAutoCommit != null && !disableAutoCommit) {
+      LOG.warn("Postgres reads require disableAutoCommit to be true, 
overriding to true.");
+    }
+
+    // Override "connectionInitSql" and "disableAutoCommit" for postgres
+    configuration =
+        configuration
+            .toBuilder()
+            .setConnectionInitSql(Collections.emptyList())
+            .setDisableAutoCommit(true)
+            .build();
+    return new PostgresReadSchemaTransform(configuration);
+  }
+
+  public static class PostgresReadSchemaTransform extends 
JdbcReadSchemaTransform {
+    public PostgresReadSchemaTransform(JdbcReadSchemaTransformConfiguration 
config) {
+      super(config, POSTGRES);
+    }
+  }
 }
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
index c50b8431163..411e1ff2c47 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
@@ -18,20 +18,30 @@
 package org.apache.beam.sdk.io.jdbc.providers;
 
 import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 
 import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(SchemaTransformProvider.class)
 public class WriteToPostgresSchemaTransformProvider extends 
JdbcWriteSchemaTransformProvider {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(WriteToPostgresSchemaTransformProvider.class);
+
   @Override
   public @UnknownKeyFor @NonNull @Initialized String identifier() {
-    return "beam:schematransform:org.apache.beam:postgres_write:v1";
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE);
   }
 
   @Override
@@ -43,4 +53,30 @@ public class WriteToPostgresSchemaTransformProvider extends 
JdbcWriteSchemaTrans
   protected String jdbcType() {
     return POSTGRES;
   }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcWriteSchemaTransformConfiguration configuration) {
+    String jdbcType = configuration.getJdbcType();
+    if (jdbcType != null && !jdbcType.equals(jdbcType())) {
+      throw new IllegalArgumentException(
+          String.format("Wrong JDBC type. Expected '%s' but got '%s'", 
jdbcType(), jdbcType));
+    }
+
+    List<@org.checkerframework.checker.nullness.qual.Nullable String> 
connectionInitSql =
+        configuration.getConnectionInitSql();
+    if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
+      LOG.warn("Postgres does not support connectionInitSql, ignoring.");
+    }
+
+    // Override "connectionInitSql" for postgres
+    configuration = 
configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build();
+    return new PostgresWriteSchemaTransform(configuration);
+  }
+
+  public static class PostgresWriteSchemaTransform extends 
JdbcWriteSchemaTransform {
+    public PostgresWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration 
config) {
+      super(config, POSTGRES);
+    }
+  }
 }
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java
new file mode 100644
index 00000000000..d5878309692
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static 
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
+import 
org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider;
+import 
org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.postgresql.ds.PGSimpleDataSource;
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent 
Postgres instance.
+ *
+ * <p>Similar to JdbcIOIT, this test requires a running instance of Postgres. 
Pass in connection
+ * information using PipelineOptions:
+ *
+ * <pre>
+ *  ./gradlew integrationTest -p sdks/java/io/jdbc 
-DintegrationTestPipelineOptions='[
+ *  "--postgresServerName=1.2.3.4",
+ *  "--postgresUsername=postgres",
+ *  "--postgresDatabaseName=myfancydb",
+ *  "--postgresPassword=mypass",
+ *  "--postgresSsl=false" ]'
+ *  --tests org.apache.beam.sdk.io.jdbc.JdbcIOPostgresIT
+ *  -DintegrationTestRunner=direct
+ * </pre>
+ */
+@RunWith(JUnit4.class)
+public class JdbcIOPostgresIT {
+  private static final Schema INPUT_SCHEMA =
+      Schema.of(
+          Schema.Field.of("id", Schema.FieldType.INT32),
+          Schema.Field.of("name", Schema.FieldType.STRING));
+
+  private static final List<Row> ROWS =
+      Arrays.asList(
+          Row.withSchema(INPUT_SCHEMA)
+              .withFieldValue("id", 1)
+              .withFieldValue("name", "foo")
+              .build(),
+          Row.withSchema(INPUT_SCHEMA)
+              .withFieldValue("id", 2)
+              .withFieldValue("name", "bar")
+              .build(),
+          Row.withSchema(INPUT_SCHEMA)
+              .withFieldValue("id", 3)
+              .withFieldValue("name", "baz")
+              .build());
+
+  private static PGSimpleDataSource dataSource;
+  private static String jdbcUrl;
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() {
+    PostgresIOTestPipelineOptions options;
+    try {
+      options = readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class);
+    } catch (IllegalArgumentException e) {
+      options = null;
+    }
+    org.junit.Assume.assumeNotNull(options);
+    dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+    jdbcUrl = DatabaseTestHelper.getPostgresDBUrl(options);
+  }
+
+  @Test
+  public void testWriteThenRead() throws SQLException {
+    String tableName = DatabaseTestHelper.getTestTableName("JdbcIOPostgresIT");
+    DatabaseTestHelper.createTable(dataSource, tableName);
+
+    JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration 
writeConfig =
+        
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+            .setJdbcUrl(jdbcUrl)
+            .setUsername(dataSource.getUser())
+            .setPassword(dataSource.getPassword())
+            .setLocation(tableName)
+            .build();
+
+    JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration 
readConfig =
+        
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+            .setJdbcUrl(jdbcUrl)
+            .setUsername(dataSource.getUser())
+            .setPassword(dataSource.getPassword())
+            .setLocation(tableName)
+            .build();
+
+    try {
+      PCollection<Row> input = 
writePipeline.apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA);
+      PCollectionRowTuple inputTuple = PCollectionRowTuple.of("input", input);
+      inputTuple.apply(
+          new 
WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform(writeConfig));
+      writePipeline.run().waitUntilFinish();
+
+      PCollectionRowTuple pbeginTuple = 
PCollectionRowTuple.empty(readPipeline);
+      PCollectionRowTuple outputTuple =
+          pbeginTuple.apply(
+              new 
ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform(readConfig));
+      PCollection<Row> output = outputTuple.get("output");
+      PAssert.that(output).containsInAnyOrder(ROWS);
+      readPipeline.run().waitUntilFinish();
+    } finally {
+      DatabaseTestHelper.deleteTable(dataSource, tableName);
+    }
+  }
+
+  @Test
+  public void testManagedWriteThenManagedRead() throws SQLException {
+    String tableName = 
DatabaseTestHelper.getTestTableName("ManagedJdbcIOPostgresIT");
+    DatabaseTestHelper.createTable(dataSource, tableName);
+
+    Map<String, Object> writeConfig =
+        ImmutableMap.<String, Object>builder()
+            .put("jdbc_url", jdbcUrl)
+            .put("username", dataSource.getUser())
+            .put("password", dataSource.getPassword())
+            .put("location", tableName)
+            .build();
+
+    Map<String, Object> readConfig =
+        ImmutableMap.<String, Object>builder()
+            .put("jdbc_url", jdbcUrl)
+            .put("username", dataSource.getUser())
+            .put("password", dataSource.getPassword())
+            .put("location", tableName)
+            .build();
+
+    try {
+      PCollection<Row> input = 
writePipeline.apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA);
+      input.apply(Managed.write(Managed.POSTGRES).withConfig(writeConfig));
+      writePipeline.run().waitUntilFinish();
+
+      PCollectionRowTuple output =
+          
readPipeline.apply(Managed.read(Managed.POSTGRES).withConfig(readConfig));
+      PAssert.that(output.get("output")).containsInAnyOrder(ROWS);
+      readPipeline.run().waitUntilFinish();
+    } finally {
+      DatabaseTestHelper.deleteTable(dataSource, tableName);
+    }
+  }
+}
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslationTest.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslationTest.java
new file mode 100644
index 00000000000..503baaefc33
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslationTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static 
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static 
org.apache.beam.sdk.io.jdbc.providers.PostgresSchemaTransformTranslation.PostgresReadSchemaTransformTranslator;
+import static 
org.apache.beam.sdk.io.jdbc.providers.PostgresSchemaTransformTranslation.PostgresWriteSchemaTransformTranslator;
+import static 
org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import 
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class PostgresSchemaTransformTranslationTest {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  static final WriteToPostgresSchemaTransformProvider WRITE_PROVIDER =
+      new WriteToPostgresSchemaTransformProvider();
+  static final ReadFromPostgresSchemaTransformProvider READ_PROVIDER =
+      new ReadFromPostgresSchemaTransformProvider();
+
+  static final Row READ_CONFIG =
+      Row.withSchema(READ_PROVIDER.configurationSchema())
+          .withFieldValue("jdbc_url", "jdbc:postgresql://host:port/database")
+          .withFieldValue("location", "test_table")
+          .withFieldValue("connection_properties", "some_property")
+          .withFieldValue("connection_init_sql", 
ImmutableList.<String>builder().build())
+          .withFieldValue("driver_class_name", null)
+          .withFieldValue("driver_jars", null)
+          .withFieldValue("disable_auto_commit", true)
+          .withFieldValue("fetch_size", 10)
+          .withFieldValue("num_partitions", 5)
+          .withFieldValue("output_parallelization", true)
+          .withFieldValue("partition_column", "col")
+          .withFieldValue("read_query", null)
+          .withFieldValue("username", "my_user")
+          .withFieldValue("password", "my_pass")
+          .build();
+
+  static final Row WRITE_CONFIG =
+      Row.withSchema(WRITE_PROVIDER.configurationSchema())
+          .withFieldValue("jdbc_url", "jdbc:postgresql://host:port/database")
+          .withFieldValue("location", "test_table")
+          .withFieldValue("autosharding", true)
+          .withFieldValue("connection_init_sql", 
ImmutableList.<String>builder().build())
+          .withFieldValue("connection_properties", "some_property")
+          .withFieldValue("driver_class_name", null)
+          .withFieldValue("driver_jars", null)
+          .withFieldValue("batch_size", 100L)
+          .withFieldValue("username", "my_user")
+          .withFieldValue("password", "my_pass")
+          .withFieldValue("write_statement", null)
+          .build();
+
+  @Test
+  public void testRecreateWriteTransformFromRow() {
+    PostgresWriteSchemaTransform writeTransform =
+        (PostgresWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+
+    PostgresWriteSchemaTransformTranslator translator =
+        new PostgresWriteSchemaTransformTranslator();
+    Row translatedRow = translator.toConfigRow(writeTransform);
+
+    PostgresWriteSchemaTransform writeTransformFromRow =
+        translator.fromConfigRow(translatedRow, 
PipelineOptionsFactory.create());
+
+    assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow());
+  }
+
+  @Test
+  public void testWriteTransformProtoTranslation()
+      throws InvalidProtocolBufferException, IOException {
+    // First build a pipeline
+    Pipeline p = Pipeline.create();
+    Schema inputSchema = Schema.builder().addStringField("name").build();
+    PCollection<Row> input =
+        p.apply(
+                Create.of(
+                    Collections.singletonList(
+                        Row.withSchema(inputSchema).addValue("test").build())))
+            .setRowSchema(inputSchema);
+
+    PostgresWriteSchemaTransform writeTransform =
+        (PostgresWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+    PCollectionRowTuple.of("input", input).apply(writeTransform);
+
+    // Then translate the pipeline to a proto and extract 
PostgresWriteSchemaTransform proto
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> writeTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr -> {
+                  RunnerApi.FunctionSpec spec = tr.getSpec();
+                  try {
+                    return 
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+                        && SchemaTransformPayload.parseFrom(spec.getPayload())
+                            .getIdentifier()
+                            .equals(WRITE_PROVIDER.identifier());
+                  } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+    assertEquals(1, writeTransformProto.size());
+    RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec();
+
+    // Check that the proto contains correct values
+    SchemaTransformPayload payload = 
SchemaTransformPayload.parseFrom(spec.getPayload());
+    Schema schemaFromSpec = 
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+    assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec);
+    Row rowFromSpec = 
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+
+    assertEquals(WRITE_CONFIG, rowFromSpec);
+
+    // Use the information in the proto to recreate the 
PostgresWriteSchemaTransform
+    PostgresWriteSchemaTransformTranslator translator =
+        new PostgresWriteSchemaTransformTranslator();
+    PostgresWriteSchemaTransform writeTransformFromSpec =
+        translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+    assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow());
+  }
+
+  @Test
+  public void testReCreateReadTransformFromRow() {
+    // setting a subset of fields here.
+    PostgresReadSchemaTransform readTransform =
+        (PostgresReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+    PostgresReadSchemaTransformTranslator translator = new 
PostgresReadSchemaTransformTranslator();
+    Row row = translator.toConfigRow(readTransform);
+
+    PostgresReadSchemaTransform readTransformFromRow =
+        translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+    assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow());
+  }
+
+  @Test
+  public void testReadTransformProtoTranslation()
+      throws InvalidProtocolBufferException, IOException {
+    // First build a pipeline
+    Pipeline p = Pipeline.create();
+
+    PostgresReadSchemaTransform readTransform =
+        (PostgresReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+    // Mock inferBeamSchema since it requires database connection.
+    Schema expectedSchema = Schema.builder().addStringField("name").build();
+    try (MockedStatic<JdbcIO.ReadRows> mock = 
Mockito.mockStatic(JdbcIO.ReadRows.class)) {
+      mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(), 
Mockito.any()))
+          .thenReturn(expectedSchema);
+      PCollectionRowTuple.empty(p).apply(readTransform);
+    }
+
+    // Then translate the pipeline to a proto and extract 
PostgresReadSchemaTransform proto
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> readTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr -> {
+                  RunnerApi.FunctionSpec spec = tr.getSpec();
+                  try {
+                    return 
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+                        && SchemaTransformPayload.parseFrom(spec.getPayload())
+                            .getIdentifier()
+                            .equals(READ_PROVIDER.identifier());
+                  } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+    assertEquals(1, readTransformProto.size());
+    RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec();
+
+    // Check that the proto contains correct values
+    SchemaTransformPayload payload = 
SchemaTransformPayload.parseFrom(spec.getPayload());
+    Schema schemaFromSpec = 
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+    assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec);
+    Row rowFromSpec = 
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+    assertEquals(READ_CONFIG, rowFromSpec);
+
+    // Use the information in the proto to recreate the 
PostgresReadSchemaTransform
+    PostgresReadSchemaTransformTranslator translator = new 
PostgresReadSchemaTransformTranslator();
+    PostgresReadSchemaTransform readTransformFromSpec =
+        translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+    assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow());
+  }
+}
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 06aed06c71c..cda84629a7d 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -96,6 +96,7 @@ public class Managed {
   public static final String ICEBERG_CDC = "iceberg_cdc";
   public static final String KAFKA = "kafka";
   public static final String BIGQUERY = "bigquery";
+  public static final String POSTGRES = "postgres";
 
   // Supported SchemaTransforms
   public static final Map<String, String> READ_TRANSFORMS =
@@ -104,12 +105,14 @@ public class Managed {
           .put(ICEBERG_CDC, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ))
           .put(KAFKA, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
           .put(BIGQUERY, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
+          .put(POSTGRES, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ))
           .build();
   public static final Map<String, String> WRITE_TRANSFORMS =
       ImmutableMap.<String, String>builder()
           .put(ICEBERG, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE))
           .put(KAFKA, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
           .put(BIGQUERY, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE))
+          .put(POSTGRES, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE))
           .build();
 
   /**
diff --git a/sdks/python/apache_beam/transforms/external.py 
b/sdks/python/apache_beam/transforms/external.py
index 782fa3d030b..b22ed6e0c64 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -80,7 +80,9 @@ MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING = {
     ManagedTransforms.Urns.KAFKA_READ.urn: _IO_EXPANSION_SERVICE_JAR_TARGET,
     ManagedTransforms.Urns.KAFKA_WRITE.urn: _IO_EXPANSION_SERVICE_JAR_TARGET,
     ManagedTransforms.Urns.BIGQUERY_READ.urn: 
_GCP_EXPANSION_SERVICE_JAR_TARGET,
-    ManagedTransforms.Urns.BIGQUERY_WRITE.urn: 
_GCP_EXPANSION_SERVICE_JAR_TARGET
+    ManagedTransforms.Urns.BIGQUERY_WRITE.urn: 
_GCP_EXPANSION_SERVICE_JAR_TARGET,  # pylint: disable=line-too-long
+    ManagedTransforms.Urns.POSTGRES_READ.urn: 
_GCP_EXPANSION_SERVICE_JAR_TARGET,
+    ManagedTransforms.Urns.POSTGRES_WRITE.urn: 
_GCP_EXPANSION_SERVICE_JAR_TARGET,  # pylint: disable=line-too-long
 }
 
 
diff --git a/sdks/python/apache_beam/transforms/managed.py 
b/sdks/python/apache_beam/transforms/managed.py
index bf680d5fd35..72dfb6fd9a0 100644
--- a/sdks/python/apache_beam/transforms/managed.py
+++ b/sdks/python/apache_beam/transforms/managed.py
@@ -85,6 +85,7 @@ ICEBERG = "iceberg"
 _ICEBERG_CDC = "iceberg_cdc"
 KAFKA = "kafka"
 BIGQUERY = "bigquery"
+POSTGRES = "postgres"
 
 __all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]
 
@@ -95,7 +96,8 @@ class Read(PTransform):
       ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn,
       _ICEBERG_CDC: ManagedTransforms.Urns.ICEBERG_CDC_READ.urn,
       KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn,
-      BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn
+      BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn,
+      POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn,
   }
 
   def __init__(
@@ -136,7 +138,8 @@ class Write(PTransform):
   _WRITE_TRANSFORMS = {
       ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn,
       KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn,
-      BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn
+      BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn,
+      POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn,
   }
 
   def __init__(


Reply via email to