This is an automated email from the ASF dual-hosted git repository. shunping pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new be4fb977e18 Support managed jdbc io (Postgres) (#36034) be4fb977e18 is described below commit be4fb977e181e86fd68753218563c5884744df02 Author: Shunping Huang <shunp...@google.com> AuthorDate: Mon Sep 8 15:35:39 2025 -0400 Support managed jdbc io (Postgres) (#36034) * Add postgres read to managed io * Add postgres write to managed io * Add integration tests for both managed and unmanaged postgres read and write. * Fix error in analyzeClassesDependencies gradle task * Fix spotless failure. * Fix python lint * Add schema transform translation for postgres read and write. * Add test for postgres schema transform translation. * Address reviewer's feedback. --- .../model/pipeline/v1/external_transforms.proto | 4 + sdks/java/io/jdbc/build.gradle | 3 + .../io/jdbc/JdbcReadSchemaTransformProvider.java | 18 ++ .../io/jdbc/JdbcWriteSchemaTransformProvider.java | 18 ++ .../PostgresSchemaTransformTranslation.java | 93 ++++++++ .../ReadFromPostgresSchemaTransformProvider.java | 48 ++++- .../WriteToPostgresSchemaTransformProvider.java | 38 +++- .../apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java | 178 ++++++++++++++++ .../PostgresSchemaTransformTranslationTest.java | 233 +++++++++++++++++++++ .../java/org/apache/beam/sdk/managed/Managed.java | 3 + sdks/python/apache_beam/transforms/external.py | 4 +- sdks/python/apache_beam/transforms/managed.py | 7 +- 12 files changed, 642 insertions(+), 5 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index add8a1999ca..02a5dd18e2c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -76,6 +76,10 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:bigquery_write:v1"]; ICEBERG_CDC_READ = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:iceberg_cdc_read:v1"]; + POSTGRES_READ = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:postgres_read:v1"]; + POSTGRES_WRITE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:postgres_write:v1"]; } } diff --git a/sdks/java/io/jdbc/build.gradle b/sdks/java/io/jdbc/build.gradle index 8c5fa685fda..87a231a5a42 100644 --- a/sdks/java/io/jdbc/build.gradle +++ b/sdks/java/io/jdbc/build.gradle @@ -29,6 +29,7 @@ ext.summary = "IO to read and write on JDBC datasource." dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":model:pipeline", configuration: "shadow") implementation library.java.dbcp2 implementation library.java.joda_time implementation "org.apache.commons:commons-pool2:2.11.1" @@ -39,8 +40,10 @@ dependencies { testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:io:common") + testImplementation project(path: ":sdks:java:managed") testImplementation project(path: ":sdks:java:testing:test-utils") testImplementation library.java.junit + testImplementation library.java.mockito_inline testImplementation library.java.slf4j_api testImplementation library.java.postgres diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index 6777be50ab5..da75c9baaa4 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; @@ -265,6 +267,20 @@ public class JdbcReadSchemaTransformProvider } return PCollectionRowTuple.of("output", input.getPipeline().apply(readRows)); } + + public Row getConfigurationRow() { + try { + // To stay consistent with our SchemaTransform configuration naming conventions, + // we sort lexicographically + return SchemaRegistry.createDefault() + .getToRowFunction(JdbcReadSchemaTransformConfiguration.class) + .apply(config) + .sorted() + .toSnakeCase(); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } } @Override @@ -401,6 +417,8 @@ public class JdbcReadSchemaTransformProvider .Builder(); } + public abstract Builder toBuilder(); + @AutoValue.Builder public abstract static class Builder { public abstract Builder setDriverClassName(String value); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index 6f10df56aab..4dbb9b396f0 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -27,7 +27,9 @@ import java.util.List; import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; @@ -265,6 +267,20 @@ public class JdbcWriteSchemaTransformProvider .setRowSchema(Schema.of()); return PCollectionRowTuple.of("post_write", postWrite); } + + public Row getConfigurationRow() { + try { + // To stay consistent with our SchemaTransform configuration naming conventions, + // we sort lexicographically + return SchemaRegistry.createDefault() + .getToRowFunction(JdbcWriteSchemaTransformConfiguration.class) + .apply(config) + .sorted() + .toSnakeCase(); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } } @Override @@ -382,6 +398,8 @@ public class JdbcWriteSchemaTransformProvider .Builder(); } + public abstract Builder toBuilder(); + @AutoValue.Builder public abstract static class Builder { public abstract Builder setDriverClassName(String value); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslation.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslation.java new file mode 100644 index 00000000000..288b29642c5 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform; +import static org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform; +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +public class PostgresSchemaTransformTranslation { + static class PostgresReadSchemaTransformTranslator + extends SchemaTransformPayloadTranslator<PostgresReadSchemaTransform> { + @Override + public SchemaTransformProvider provider() { + return new ReadFromPostgresSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(PostgresReadSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class<? extends PTransform>, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + .<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder() + .put(PostgresReadSchemaTransform.class, new PostgresReadSchemaTransformTranslator()) + .build(); + } + } + + static class PostgresWriteSchemaTransformTranslator + extends SchemaTransformPayloadTranslator<PostgresWriteSchemaTransform> { + @Override + public SchemaTransformProvider provider() { + return new WriteToPostgresSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(PostgresWriteSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class<? extends PTransform>, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + .<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder() + .put(PostgresWriteSchemaTransform.class, new PostgresWriteSchemaTransformTranslator()) + .build(); + } + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java index 62ff14c23e0..8755ce0ecca 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java @@ -18,20 +18,30 @@ package org.apache.beam.sdk.io.jdbc.providers; import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.List; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class ReadFromPostgresSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + private static final Logger LOG = + LoggerFactory.getLogger(ReadFromPostgresSchemaTransformProvider.class); + @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:postgres_read:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ); } @Override @@ -43,4 +53,40 @@ public class ReadFromPostgresSchemaTransformProvider extends JdbcReadSchemaTrans protected String jdbcType() { return POSTGRES; } + + @Override + public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcReadSchemaTransformConfiguration configuration) { + String jdbcType = configuration.getJdbcType(); + if (jdbcType != null && !jdbcType.equals(jdbcType())) { + throw new IllegalArgumentException( + String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); + } + + List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql = + configuration.getConnectionInitSql(); + if (connectionInitSql != null && !connectionInitSql.isEmpty()) { + LOG.warn("Postgres does not support connectionInitSql, ignoring."); + } + + Boolean disableAutoCommit = configuration.getDisableAutoCommit(); + if (disableAutoCommit != null && !disableAutoCommit) { + LOG.warn("Postgres reads require disableAutoCommit to be true, overriding to true."); + } + + // Override "connectionInitSql" and "disableAutoCommit" for postgres + configuration = + configuration + .toBuilder() + .setConnectionInitSql(Collections.emptyList()) + .setDisableAutoCommit(true) + .build(); + return new PostgresReadSchemaTransform(configuration); + } + + public static class PostgresReadSchemaTransform extends JdbcReadSchemaTransform { + public PostgresReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { + super(config, POSTGRES); + } + } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java index c50b8431163..411e1ff2c47 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java @@ -18,20 +18,30 @@ package org.apache.beam.sdk.io.jdbc.providers; import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.List; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class WriteToPostgresSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + private static final Logger LOG = + LoggerFactory.getLogger(WriteToPostgresSchemaTransformProvider.class); + @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:postgres_write:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE); } @Override @@ -43,4 +53,30 @@ public class WriteToPostgresSchemaTransformProvider extends JdbcWriteSchemaTrans protected String jdbcType() { return POSTGRES; } + + @Override + public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcWriteSchemaTransformConfiguration configuration) { + String jdbcType = configuration.getJdbcType(); + if (jdbcType != null && !jdbcType.equals(jdbcType())) { + throw new IllegalArgumentException( + String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); + } + + List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql = + configuration.getConnectionInitSql(); + if (connectionInitSql != null && !connectionInitSql.isEmpty()) { + LOG.warn("Postgres does not support connectionInitSql, ignoring."); + } + + // Override "connectionInitSql" for postgres + configuration = configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build(); + return new PostgresWriteSchemaTransform(configuration); + } + + public static class PostgresWriteSchemaTransform extends JdbcWriteSchemaTransform { + public PostgresWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { + super(config, POSTGRES); + } + } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java new file mode 100644 index 00000000000..d5878309692 --- /dev/null +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc; + +import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.io.common.DatabaseTestHelper; +import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions; +import org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider; +import org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.postgresql.ds.PGSimpleDataSource; + +/** + * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance. + * + * <p>Similar to JdbcIOIT, this test requires a running instance of Postgres. Pass in connection + * information using PipelineOptions: + * + * <pre> + * ./gradlew integrationTest -p sdks/java/io/jdbc -DintegrationTestPipelineOptions='[ + * "--postgresServerName=1.2.3.4", + * "--postgresUsername=postgres", + * "--postgresDatabaseName=myfancydb", + * "--postgresPassword=mypass", + * "--postgresSsl=false" ]' + * --tests org.apache.beam.sdk.io.jdbc.JdbcIOPostgresIT + * -DintegrationTestRunner=direct + * </pre> + */ +@RunWith(JUnit4.class) +public class JdbcIOPostgresIT { + private static final Schema INPUT_SCHEMA = + Schema.of( + Schema.Field.of("id", Schema.FieldType.INT32), + Schema.Field.of("name", Schema.FieldType.STRING)); + + private static final List<Row> ROWS = + Arrays.asList( + Row.withSchema(INPUT_SCHEMA) + .withFieldValue("id", 1) + .withFieldValue("name", "foo") + .build(), + Row.withSchema(INPUT_SCHEMA) + .withFieldValue("id", 2) + .withFieldValue("name", "bar") + .build(), + Row.withSchema(INPUT_SCHEMA) + .withFieldValue("id", 3) + .withFieldValue("name", "baz") + .build()); + + private static PGSimpleDataSource dataSource; + private static String jdbcUrl; + + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + + @BeforeClass + public static void setup() { + PostgresIOTestPipelineOptions options; + try { + options = readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class); + } catch (IllegalArgumentException e) { + options = null; + } + org.junit.Assume.assumeNotNull(options); + dataSource = DatabaseTestHelper.getPostgresDataSource(options); + jdbcUrl = DatabaseTestHelper.getPostgresDBUrl(options); + } + + @Test + public void testWriteThenRead() throws SQLException { + String tableName = DatabaseTestHelper.getTestTableName("JdbcIOPostgresIT"); + DatabaseTestHelper.createTable(dataSource, tableName); + + JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration writeConfig = + JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder() + .setJdbcUrl(jdbcUrl) + .setUsername(dataSource.getUser()) + .setPassword(dataSource.getPassword()) + .setLocation(tableName) + .build(); + + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration readConfig = + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setJdbcUrl(jdbcUrl) + .setUsername(dataSource.getUser()) + .setPassword(dataSource.getPassword()) + .setLocation(tableName) + .build(); + + try { + PCollection<Row> input = writePipeline.apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA); + PCollectionRowTuple inputTuple = PCollectionRowTuple.of("input", input); + inputTuple.apply( + new WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform(writeConfig)); + writePipeline.run().waitUntilFinish(); + + PCollectionRowTuple pbeginTuple = PCollectionRowTuple.empty(readPipeline); + PCollectionRowTuple outputTuple = + pbeginTuple.apply( + new ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform(readConfig)); + PCollection<Row> output = outputTuple.get("output"); + PAssert.that(output).containsInAnyOrder(ROWS); + readPipeline.run().waitUntilFinish(); + } finally { + DatabaseTestHelper.deleteTable(dataSource, tableName); + } + } + + @Test + public void testManagedWriteThenManagedRead() throws SQLException { + String tableName = DatabaseTestHelper.getTestTableName("ManagedJdbcIOPostgresIT"); + DatabaseTestHelper.createTable(dataSource, tableName); + + Map<String, Object> writeConfig = + ImmutableMap.<String, Object>builder() + .put("jdbc_url", jdbcUrl) + .put("username", dataSource.getUser()) + .put("password", dataSource.getPassword()) + .put("location", tableName) + .build(); + + Map<String, Object> readConfig = + ImmutableMap.<String, Object>builder() + .put("jdbc_url", jdbcUrl) + .put("username", dataSource.getUser()) + .put("password", dataSource.getPassword()) + .put("location", tableName) + .build(); + + try { + PCollection<Row> input = writePipeline.apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA); + input.apply(Managed.write(Managed.POSTGRES).withConfig(writeConfig)); + writePipeline.run().waitUntilFinish(); + + PCollectionRowTuple output = + readPipeline.apply(Managed.read(Managed.POSTGRES).withConfig(readConfig)); + PAssert.that(output.get("output")).containsInAnyOrder(ROWS); + readPipeline.run().waitUntilFinish(); + } finally { + DatabaseTestHelper.deleteTable(dataSource, tableName); + } + } +} diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslationTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslationTest.java new file mode 100644 index 00000000000..503baaefc33 --- /dev/null +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslationTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; +import static org.apache.beam.sdk.io.jdbc.providers.PostgresSchemaTransformTranslation.PostgresReadSchemaTransformTranslator; +import static org.apache.beam.sdk.io.jdbc.providers.PostgresSchemaTransformTranslation.PostgresWriteSchemaTransformTranslator; +import static org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform; +import static org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.construction.BeamUrns; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +public class PostgresSchemaTransformTranslationTest { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + static final WriteToPostgresSchemaTransformProvider WRITE_PROVIDER = + new WriteToPostgresSchemaTransformProvider(); + static final ReadFromPostgresSchemaTransformProvider READ_PROVIDER = + new ReadFromPostgresSchemaTransformProvider(); + + static final Row READ_CONFIG = + Row.withSchema(READ_PROVIDER.configurationSchema()) + .withFieldValue("jdbc_url", "jdbc:postgresql://host:port/database") + .withFieldValue("location", "test_table") + .withFieldValue("connection_properties", "some_property") + .withFieldValue("connection_init_sql", ImmutableList.<String>builder().build()) + .withFieldValue("driver_class_name", null) + .withFieldValue("driver_jars", null) + .withFieldValue("disable_auto_commit", true) + .withFieldValue("fetch_size", 10) + .withFieldValue("num_partitions", 5) + .withFieldValue("output_parallelization", true) + .withFieldValue("partition_column", "col") + .withFieldValue("read_query", null) + .withFieldValue("username", "my_user") + .withFieldValue("password", "my_pass") + .build(); + + static final Row WRITE_CONFIG = + Row.withSchema(WRITE_PROVIDER.configurationSchema()) + .withFieldValue("jdbc_url", "jdbc:postgresql://host:port/database") + .withFieldValue("location", "test_table") + .withFieldValue("autosharding", true) + .withFieldValue("connection_init_sql", ImmutableList.<String>builder().build()) + .withFieldValue("connection_properties", "some_property") + .withFieldValue("driver_class_name", null) + .withFieldValue("driver_jars", null) + .withFieldValue("batch_size", 100L) + .withFieldValue("username", "my_user") + .withFieldValue("password", "my_pass") + .withFieldValue("write_statement", null) + .build(); + + @Test + public void testRecreateWriteTransformFromRow() { + PostgresWriteSchemaTransform writeTransform = + (PostgresWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); + + PostgresWriteSchemaTransformTranslator translator = + new PostgresWriteSchemaTransformTranslator(); + Row translatedRow = translator.toConfigRow(writeTransform); + + PostgresWriteSchemaTransform writeTransformFromRow = + translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow()); + } + + @Test + public void testWriteTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + Schema inputSchema = Schema.builder().addStringField("name").build(); + PCollection<Row> input = + p.apply( + Create.of( + Collections.singletonList( + Row.withSchema(inputSchema).addValue("test").build()))) + .setRowSchema(inputSchema); + + PostgresWriteSchemaTransform writeTransform = + (PostgresWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); + PCollectionRowTuple.of("input", input).apply(writeTransform); + + // Then translate the pipeline to a proto and extract PostgresWriteSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List<RunnerApi.PTransform> writeTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(WRITE_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, writeTransformProto.size()); + RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + + assertEquals(WRITE_CONFIG, rowFromSpec); + + // Use the information in the proto to recreate the PostgresWriteSchemaTransform + PostgresWriteSchemaTransformTranslator translator = + new PostgresWriteSchemaTransformTranslator(); + PostgresWriteSchemaTransform writeTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow()); + } + + @Test + public void testReCreateReadTransformFromRow() { + // setting a subset of fields here. + PostgresReadSchemaTransform readTransform = + (PostgresReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); + + PostgresReadSchemaTransformTranslator translator = new PostgresReadSchemaTransformTranslator(); + Row row = translator.toConfigRow(readTransform); + + PostgresReadSchemaTransform readTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow()); + } + + @Test + public void testReadTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + + PostgresReadSchemaTransform readTransform = + (PostgresReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); + + // Mock inferBeamSchema since it requires database connection. + Schema expectedSchema = Schema.builder().addStringField("name").build(); + try (MockedStatic<JdbcIO.ReadRows> mock = Mockito.mockStatic(JdbcIO.ReadRows.class)) { + mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(), Mockito.any())) + .thenReturn(expectedSchema); + PCollectionRowTuple.empty(p).apply(readTransform); + } + + // Then translate the pipeline to a proto and extract PostgresReadSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List<RunnerApi.PTransform> readTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(READ_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, readTransformProto.size()); + RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + assertEquals(READ_CONFIG, rowFromSpec); + + // Use the information in the proto to recreate the PostgresReadSchemaTransform + PostgresReadSchemaTransformTranslator translator = new PostgresReadSchemaTransformTranslator(); + PostgresReadSchemaTransform readTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow()); + } +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 06aed06c71c..cda84629a7d 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -96,6 +96,7 @@ public class Managed { public static final String ICEBERG_CDC = "iceberg_cdc"; public static final String KAFKA = "kafka"; public static final String BIGQUERY = "bigquery"; + public static final String POSTGRES = "postgres"; // Supported SchemaTransforms public static final Map<String, String> READ_TRANSFORMS = @@ -104,12 +105,14 @@ public class Managed { .put(ICEBERG_CDC, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ)) .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ)) + .put(POSTGRES, getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ)) .build(); public static final Map<String, String> WRITE_TRANSFORMS = ImmutableMap.<String, String>builder() .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE)) .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE)) .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE)) + .put(POSTGRES, getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE)) .build(); /** diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 782fa3d030b..b22ed6e0c64 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -80,7 +80,9 @@ MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING = { ManagedTransforms.Urns.KAFKA_READ.urn: _IO_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.KAFKA_WRITE.urn: _IO_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.BIGQUERY_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, - ManagedTransforms.Urns.BIGQUERY_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET + ManagedTransforms.Urns.BIGQUERY_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.POSTGRES_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, + ManagedTransforms.Urns.POSTGRES_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long } diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index bf680d5fd35..72dfb6fd9a0 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -85,6 +85,7 @@ ICEBERG = "iceberg" _ICEBERG_CDC = "iceberg_cdc" KAFKA = "kafka" BIGQUERY = "bigquery" +POSTGRES = "postgres" __all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"] @@ -95,7 +96,8 @@ class Read(PTransform): ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn, _ICEBERG_CDC: ManagedTransforms.Urns.ICEBERG_CDC_READ.urn, KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn, - BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn + BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn, + POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn, } def __init__( @@ -136,7 +138,8 @@ class Write(PTransform): _WRITE_TRANSFORMS = { ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn, KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn, - BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn + BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn, + POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn, } def __init__(