This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f86452f32fb Support managed jdbc io (MySQL) (#36045)
f86452f32fb is described below
commit f86452f32fbfe3c62845c6ddfa4d569f1d79d206
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Sep 17 15:27:08 2025 -0400
Support managed jdbc io (MySQL) (#36045)
* Add mysql read to managed io
* Add mysql write to managed io
* Add schema transform translation and test for mysql read and write
* Remove redundant config validation.
* Allow jdbcType to be empty.
* Address reviewer's comments.
---
.../model/pipeline/v1/external_transforms.proto | 4 +
.../providers/MySqlSchemaTransformTranslation.java | 93 +++++++++
.../ReadFromMySqlSchemaTransformProvider.java | 41 +++-
.../WriteToMySqlSchemaTransformProvider.java | 31 ++-
.../MysqlSchemaTransformTranslationTest.java | 231 +++++++++++++++++++++
.../java/org/apache/beam/sdk/managed/Managed.java | 3 +
sdks/python/apache_beam/transforms/external.py | 2 +
sdks/python/apache_beam/transforms/managed.py | 3 +
8 files changed, 406 insertions(+), 2 deletions(-)
diff --git
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index 02a5dd18e2c..31232eb6067 100644
---
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -80,6 +80,10 @@ message ManagedTransforms {
"beam:schematransform:org.apache.beam:postgres_read:v1"];
POSTGRES_WRITE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:postgres_write:v1"];
+ MYSQL_READ = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:mysql_read:v1"];
+ MYSQL_WRITE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:mysql_write:v1"];
}
}
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java
new file mode 100644
index 00000000000..3367248b719
--- /dev/null
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static
org.apache.beam.sdk.io.jdbc.providers.ReadFromMySqlSchemaTransformProvider.MySqlReadSchemaTransform;
+import static
org.apache.beam.sdk.io.jdbc.providers.WriteToMySqlSchemaTransformProvider.MySqlWriteSchemaTransform;
+import static
org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class MySqlSchemaTransformTranslation {
+ static class MySqlReadSchemaTransformTranslator
+ extends SchemaTransformPayloadTranslator<MySqlReadSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new ReadFromMySqlSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(MySqlReadSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class ReadRegistrar implements
TransformPayloadTranslatorRegistrar {
+ @Override
+ @SuppressWarnings({
+ "rawtypes",
+ })
+ public Map<
+ ? extends Class<? extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap
+ .<Class<? extends PTransform>,
PTransformTranslation.TransformPayloadTranslator>builder()
+ .put(MySqlReadSchemaTransform.class, new
MySqlReadSchemaTransformTranslator())
+ .build();
+ }
+ }
+
+ static class MySqlWriteSchemaTransformTranslator
+ extends SchemaTransformPayloadTranslator<MySqlWriteSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new WriteToMySqlSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(MySqlWriteSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class WriteRegistrar implements
TransformPayloadTranslatorRegistrar {
+ @Override
+ @SuppressWarnings({
+ "rawtypes",
+ })
+ public Map<
+ ? extends Class<? extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap
+ .<Class<? extends PTransform>,
PTransformTranslation.TransformPayloadTranslator>builder()
+ .put(MySqlWriteSchemaTransform.class, new
MySqlWriteSchemaTransformTranslator())
+ .build();
+ }
+ }
+}
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
index 3d0135ef8ec..b51ee723641 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
@@ -18,20 +18,28 @@
package org.apache.beam.sdk.io.jdbc.providers;
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(SchemaTransformProvider.class)
public class ReadFromMySqlSchemaTransformProvider extends
JdbcReadSchemaTransformProvider {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReadFromMySqlSchemaTransformProvider.class);
+
@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
- return "beam:schematransform:org.apache.beam:mysql_read:v1";
+ return getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ);
}
@Override
@@ -43,4 +51,35 @@ public class ReadFromMySqlSchemaTransformProvider extends
JdbcReadSchemaTransfor
protected String jdbcType() {
return MYSQL;
}
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcReadSchemaTransformConfiguration configuration) {
+ String jdbcType = configuration.getJdbcType();
+ if (jdbcType != null && !jdbcType.isEmpty() &&
!jdbcType.equals(jdbcType())) {
+ LOG.warn(
+ "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+ jdbcType(),
+ jdbcType,
+ jdbcType());
+ configuration =
configuration.toBuilder().setJdbcType(jdbcType()).build();
+ }
+
+ Integer fetchSize = configuration.getFetchSize();
+ if (fetchSize != null
+ && fetchSize > 0
+ && configuration.getJdbcUrl() != null
+ && !configuration.getJdbcUrl().contains("useCursorFetch=true")) {
+ throw new IllegalArgumentException(
+ "It is required to set useCursorFetch=true"
+ + " in the JDBC URL when using fetchSize for MySQL");
+ }
+ return new MySqlReadSchemaTransform(configuration);
+ }
+
+ public static class MySqlReadSchemaTransform extends JdbcReadSchemaTransform
{
+ public MySqlReadSchemaTransform(JdbcReadSchemaTransformConfiguration
config) {
+ super(config, MYSQL);
+ }
+ }
}
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
index 57f08522016..9f38fccf65b 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
@@ -18,20 +18,28 @@
package org.apache.beam.sdk.io.jdbc.providers;
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(SchemaTransformProvider.class)
public class WriteToMySqlSchemaTransformProvider extends
JdbcWriteSchemaTransformProvider {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WriteToMySqlSchemaTransformProvider.class);
+
@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
- return "beam:schematransform:org.apache.beam:mysql_write:v1";
+ return getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE);
}
@Override
@@ -43,4 +51,25 @@ public class WriteToMySqlSchemaTransformProvider extends
JdbcWriteSchemaTransfor
protected String jdbcType() {
return MYSQL;
}
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcWriteSchemaTransformConfiguration configuration) {
+ String jdbcType = configuration.getJdbcType();
+ if (jdbcType != null && !jdbcType.isEmpty() &&
!jdbcType.equals(jdbcType())) {
+ LOG.warn(
+ "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+ jdbcType(),
+ jdbcType,
+ jdbcType());
+ configuration =
configuration.toBuilder().setJdbcType(jdbcType()).build();
+ }
+ return new MySqlWriteSchemaTransform(configuration);
+ }
+
+ public static class MySqlWriteSchemaTransform extends
JdbcWriteSchemaTransform {
+ public MySqlWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration
config) {
+ super(config, MYSQL);
+ }
+ }
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java
new file mode 100644
index 00000000000..cfc48b6a8a0
--- /dev/null
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static
org.apache.beam.sdk.io.jdbc.providers.MySqlSchemaTransformTranslation.MySqlReadSchemaTransformTranslator;
+import static
org.apache.beam.sdk.io.jdbc.providers.MySqlSchemaTransformTranslation.MySqlWriteSchemaTransformTranslator;
+import static
org.apache.beam.sdk.io.jdbc.providers.ReadFromMySqlSchemaTransformProvider.MySqlReadSchemaTransform;
+import static
org.apache.beam.sdk.io.jdbc.providers.WriteToMySqlSchemaTransformProvider.MySqlWriteSchemaTransform;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class MysqlSchemaTransformTranslationTest {
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ static final WriteToMySqlSchemaTransformProvider WRITE_PROVIDER =
+ new WriteToMySqlSchemaTransformProvider();
+ static final ReadFromMySqlSchemaTransformProvider READ_PROVIDER =
+ new ReadFromMySqlSchemaTransformProvider();
+
+ static final Row READ_CONFIG =
+ Row.withSchema(READ_PROVIDER.configurationSchema())
+ .withFieldValue("jdbc_url", "jdbc:mysql://host:port/database")
+ .withFieldValue("location", "test_table")
+ .withFieldValue("connection_properties", "some_property")
+ .withFieldValue("connection_init_sql",
ImmutableList.<String>builder().build())
+ .withFieldValue("driver_class_name", null)
+ .withFieldValue("driver_jars", null)
+ .withFieldValue("disable_auto_commit", true)
+ .withFieldValue("fetch_size", null)
+ .withFieldValue("num_partitions", 5)
+ .withFieldValue("output_parallelization", true)
+ .withFieldValue("partition_column", "col")
+ .withFieldValue("read_query", null)
+ .withFieldValue("username", "my_user")
+ .withFieldValue("password", "my_pass")
+ .build();
+
+ static final Row WRITE_CONFIG =
+ Row.withSchema(WRITE_PROVIDER.configurationSchema())
+ .withFieldValue("jdbc_url", "jdbc:mysql://host:port/database")
+ .withFieldValue("location", "test_table")
+ .withFieldValue("autosharding", true)
+ .withFieldValue("connection_init_sql",
ImmutableList.<String>builder().build())
+ .withFieldValue("connection_properties", "some_property")
+ .withFieldValue("driver_class_name", null)
+ .withFieldValue("driver_jars", null)
+ .withFieldValue("batch_size", 100L)
+ .withFieldValue("username", "my_user")
+ .withFieldValue("password", "my_pass")
+ .withFieldValue("write_statement", null)
+ .build();
+
+ @Test
+ public void testRecreateWriteTransformFromRow() {
+ MySqlWriteSchemaTransform writeTransform =
+ (MySqlWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+
+ MySqlWriteSchemaTransformTranslator translator = new
MySqlWriteSchemaTransformTranslator();
+ Row translatedRow = translator.toConfigRow(writeTransform);
+
+ MySqlWriteSchemaTransform writeTransformFromRow =
+ translator.fromConfigRow(translatedRow,
PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testWriteTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+ Schema inputSchema = Schema.builder().addStringField("name").build();
+ PCollection<Row> input =
+ p.apply(
+ Create.of(
+ Collections.singletonList(
+ Row.withSchema(inputSchema).addValue("test").build())))
+ .setRowSchema(inputSchema);
+
+ MySqlWriteSchemaTransform writeTransform =
+ (MySqlWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+ PCollectionRowTuple.of("input", input).apply(writeTransform);
+
+ // Then translate the pipeline to a proto and extract
MySqlWriteSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List<RunnerApi.PTransform> writeTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(WRITE_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, writeTransformProto.size());
+ RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload =
SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec =
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec =
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+
+ assertEquals(WRITE_CONFIG, rowFromSpec);
+
+ // Use the information in the proto to recreate the
MySqlWriteSchemaTransform
+ MySqlWriteSchemaTransformTranslator translator = new
MySqlWriteSchemaTransformTranslator();
+ MySqlWriteSchemaTransform writeTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow());
+ }
+
+ @Test
+ public void testReCreateReadTransformFromRow() {
+ // setting a subset of fields here.
+ MySqlReadSchemaTransform readTransform =
+ (MySqlReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+ MySqlReadSchemaTransformTranslator translator = new
MySqlReadSchemaTransformTranslator();
+ Row row = translator.toConfigRow(readTransform);
+
+ MySqlReadSchemaTransform readTransformFromRow =
+ translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testReadTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+
+ MySqlReadSchemaTransform readTransform =
+ (MySqlReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+ // Mock inferBeamSchema since it requires database connection.
+ Schema expectedSchema = Schema.builder().addStringField("name").build();
+ try (MockedStatic<JdbcIO.ReadRows> mock =
Mockito.mockStatic(JdbcIO.ReadRows.class)) {
+ mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(),
Mockito.any()))
+ .thenReturn(expectedSchema);
+ PCollectionRowTuple.empty(p).apply(readTransform);
+ }
+
+ // Then translate the pipeline to a proto and extract
MySqlReadSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List<RunnerApi.PTransform> readTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(READ_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, readTransformProto.size());
+ RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload =
SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec =
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec =
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+ assertEquals(READ_CONFIG, rowFromSpec);
+
+ // Use the information in the proto to recreate the
MySqlReadSchemaTransform
+ MySqlReadSchemaTransformTranslator translator = new
MySqlReadSchemaTransformTranslator();
+ MySqlReadSchemaTransform readTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow());
+ }
+}
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index cda84629a7d..4f45eeac861 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -97,6 +97,7 @@ public class Managed {
public static final String KAFKA = "kafka";
public static final String BIGQUERY = "bigquery";
public static final String POSTGRES = "postgres";
+ public static final String MYSQL = "mysql";
// Supported SchemaTransforms
public static final Map<String, String> READ_TRANSFORMS =
@@ -106,6 +107,7 @@ public class Managed {
.put(KAFKA,
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
.put(BIGQUERY,
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
.put(POSTGRES,
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ))
+ .put(MYSQL,
getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ))
.build();
public static final Map<String, String> WRITE_TRANSFORMS =
ImmutableMap.<String, String>builder()
@@ -113,6 +115,7 @@ public class Managed {
.put(KAFKA,
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
.put(BIGQUERY,
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE))
.put(POSTGRES,
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE))
+ .put(MYSQL,
getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE))
.build();
/**
diff --git a/sdks/python/apache_beam/transforms/external.py
b/sdks/python/apache_beam/transforms/external.py
index b22ed6e0c64..3f9f56a5413 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -83,6 +83,8 @@ MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING = {
ManagedTransforms.Urns.BIGQUERY_WRITE.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long
ManagedTransforms.Urns.POSTGRES_READ.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET,
ManagedTransforms.Urns.POSTGRES_WRITE.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long
+ ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET,
+ ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET,
}
diff --git a/sdks/python/apache_beam/transforms/managed.py
b/sdks/python/apache_beam/transforms/managed.py
index 72dfb6fd9a0..03449236ac9 100644
--- a/sdks/python/apache_beam/transforms/managed.py
+++ b/sdks/python/apache_beam/transforms/managed.py
@@ -86,6 +86,7 @@ _ICEBERG_CDC = "iceberg_cdc"
KAFKA = "kafka"
BIGQUERY = "bigquery"
POSTGRES = "postgres"
+MYSQL = "mysql"
__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]
@@ -98,6 +99,7 @@ class Read(PTransform):
KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn,
BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn,
POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn,
+ MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn,
}
def __init__(
@@ -140,6 +142,7 @@ class Write(PTransform):
KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn,
BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn,
POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn,
+ MYSQL: ManagedTransforms.Urns.MYSQL_WRITE.urn,
}
def __init__(