This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f86452f32fb Support managed jdbc io (MySQL) (#36045)
f86452f32fb is described below

commit f86452f32fbfe3c62845c6ddfa4d569f1d79d206
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Sep 17 15:27:08 2025 -0400

    Support managed jdbc io (MySQL) (#36045)
    
    * Add mysql read to managed io
    
    * Add mysql write to managed io
    
    * Add schema transform translation and test for mysql read and write
    
    * Remove redundant config validation.
    
    * Allow jdbcType to be empty.
    
    * Address reviewer's comments.
---
 .../model/pipeline/v1/external_transforms.proto    |   4 +
 .../providers/MySqlSchemaTransformTranslation.java |  93 +++++++++
 .../ReadFromMySqlSchemaTransformProvider.java      |  41 +++-
 .../WriteToMySqlSchemaTransformProvider.java       |  31 ++-
 .../MysqlSchemaTransformTranslationTest.java       | 231 +++++++++++++++++++++
 .../java/org/apache/beam/sdk/managed/Managed.java  |   3 +
 sdks/python/apache_beam/transforms/external.py     |   2 +
 sdks/python/apache_beam/transforms/managed.py      |   3 +
 8 files changed, 406 insertions(+), 2 deletions(-)

diff --git 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index 02a5dd18e2c..31232eb6067 100644
--- 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++ 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -80,6 +80,10 @@ message ManagedTransforms {
       "beam:schematransform:org.apache.beam:postgres_read:v1"];
     POSTGRES_WRITE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
       "beam:schematransform:org.apache.beam:postgres_write:v1"];
+    MYSQL_READ = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:schematransform:org.apache.beam:mysql_read:v1"];
+    MYSQL_WRITE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:schematransform:org.apache.beam:mysql_write:v1"];
   }
 }
 
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java
new file mode 100644
index 00000000000..3367248b719
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static 
org.apache.beam.sdk.io.jdbc.providers.ReadFromMySqlSchemaTransformProvider.MySqlReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.jdbc.providers.WriteToMySqlSchemaTransformProvider.MySqlWriteSchemaTransform;
+import static 
org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class MySqlSchemaTransformTranslation {
+  static class MySqlReadSchemaTransformTranslator
+      extends SchemaTransformPayloadTranslator<MySqlReadSchemaTransform> {
+    @Override
+    public SchemaTransformProvider provider() {
+      return new ReadFromMySqlSchemaTransformProvider();
+    }
+
+    @Override
+    public Row toConfigRow(MySqlReadSchemaTransform transform) {
+      return transform.getConfigurationRow();
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class ReadRegistrar implements 
TransformPayloadTranslatorRegistrar {
+    @Override
+    @SuppressWarnings({
+      "rawtypes",
+    })
+    public Map<
+            ? extends Class<? extends PTransform>,
+            ? extends PTransformTranslation.TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap
+          .<Class<? extends PTransform>, 
PTransformTranslation.TransformPayloadTranslator>builder()
+          .put(MySqlReadSchemaTransform.class, new 
MySqlReadSchemaTransformTranslator())
+          .build();
+    }
+  }
+
+  static class MySqlWriteSchemaTransformTranslator
+      extends SchemaTransformPayloadTranslator<MySqlWriteSchemaTransform> {
+    @Override
+    public SchemaTransformProvider provider() {
+      return new WriteToMySqlSchemaTransformProvider();
+    }
+
+    @Override
+    public Row toConfigRow(MySqlWriteSchemaTransform transform) {
+      return transform.getConfigurationRow();
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class WriteRegistrar implements 
TransformPayloadTranslatorRegistrar {
+    @Override
+    @SuppressWarnings({
+      "rawtypes",
+    })
+    public Map<
+            ? extends Class<? extends PTransform>,
+            ? extends PTransformTranslation.TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap
+          .<Class<? extends PTransform>, 
PTransformTranslation.TransformPayloadTranslator>builder()
+          .put(MySqlWriteSchemaTransform.class, new 
MySqlWriteSchemaTransformTranslator())
+          .build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
index 3d0135ef8ec..b51ee723641 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java
@@ -18,20 +18,28 @@
 package org.apache.beam.sdk.io.jdbc.providers;
 
 import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 
 import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(SchemaTransformProvider.class)
 public class ReadFromMySqlSchemaTransformProvider extends 
JdbcReadSchemaTransformProvider {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReadFromMySqlSchemaTransformProvider.class);
+
   @Override
   public @UnknownKeyFor @NonNull @Initialized String identifier() {
-    return "beam:schematransform:org.apache.beam:mysql_read:v1";
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ);
   }
 
   @Override
@@ -43,4 +51,35 @@ public class ReadFromMySqlSchemaTransformProvider extends 
JdbcReadSchemaTransfor
   protected String jdbcType() {
     return MYSQL;
   }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    String jdbcType = configuration.getJdbcType();
+    if (jdbcType != null && !jdbcType.isEmpty() && 
!jdbcType.equals(jdbcType())) {
+      LOG.warn(
+          "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+          jdbcType(),
+          jdbcType,
+          jdbcType());
+      configuration = 
configuration.toBuilder().setJdbcType(jdbcType()).build();
+    }
+
+    Integer fetchSize = configuration.getFetchSize();
+    if (fetchSize != null
+        && fetchSize > 0
+        && configuration.getJdbcUrl() != null
+        && !configuration.getJdbcUrl().contains("useCursorFetch=true")) {
+      throw new IllegalArgumentException(
+          "It is required to set useCursorFetch=true"
+              + " in the JDBC URL when using fetchSize for MySQL");
+    }
+    return new MySqlReadSchemaTransform(configuration);
+  }
+
+  public static class MySqlReadSchemaTransform extends JdbcReadSchemaTransform 
{
+    public MySqlReadSchemaTransform(JdbcReadSchemaTransformConfiguration 
config) {
+      super(config, MYSQL);
+    }
+  }
 }
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
index 57f08522016..9f38fccf65b 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java
@@ -18,20 +18,28 @@
 package org.apache.beam.sdk.io.jdbc.providers;
 
 import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 
 import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(SchemaTransformProvider.class)
 public class WriteToMySqlSchemaTransformProvider extends 
JdbcWriteSchemaTransformProvider {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(WriteToMySqlSchemaTransformProvider.class);
+
   @Override
   public @UnknownKeyFor @NonNull @Initialized String identifier() {
-    return "beam:schematransform:org.apache.beam:mysql_write:v1";
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE);
   }
 
   @Override
@@ -43,4 +51,25 @@ public class WriteToMySqlSchemaTransformProvider extends 
JdbcWriteSchemaTransfor
   protected String jdbcType() {
     return MYSQL;
   }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcWriteSchemaTransformConfiguration configuration) {
+    String jdbcType = configuration.getJdbcType();
+    if (jdbcType != null && !jdbcType.isEmpty() && 
!jdbcType.equals(jdbcType())) {
+      LOG.warn(
+          "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+          jdbcType(),
+          jdbcType,
+          jdbcType());
+      configuration = 
configuration.toBuilder().setJdbcType(jdbcType()).build();
+    }
+    return new MySqlWriteSchemaTransform(configuration);
+  }
+
+  public static class MySqlWriteSchemaTransform extends 
JdbcWriteSchemaTransform {
+    public MySqlWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration 
config) {
+      super(config, MYSQL);
+    }
+  }
 }
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java
new file mode 100644
index 00000000000..cfc48b6a8a0
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static 
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static 
org.apache.beam.sdk.io.jdbc.providers.MySqlSchemaTransformTranslation.MySqlReadSchemaTransformTranslator;
+import static 
org.apache.beam.sdk.io.jdbc.providers.MySqlSchemaTransformTranslation.MySqlWriteSchemaTransformTranslator;
+import static 
org.apache.beam.sdk.io.jdbc.providers.ReadFromMySqlSchemaTransformProvider.MySqlReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.jdbc.providers.WriteToMySqlSchemaTransformProvider.MySqlWriteSchemaTransform;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import 
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class MysqlSchemaTransformTranslationTest {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  static final WriteToMySqlSchemaTransformProvider WRITE_PROVIDER =
+      new WriteToMySqlSchemaTransformProvider();
+  static final ReadFromMySqlSchemaTransformProvider READ_PROVIDER =
+      new ReadFromMySqlSchemaTransformProvider();
+
+  static final Row READ_CONFIG =
+      Row.withSchema(READ_PROVIDER.configurationSchema())
+          .withFieldValue("jdbc_url", "jdbc:mysql://host:port/database")
+          .withFieldValue("location", "test_table")
+          .withFieldValue("connection_properties", "some_property")
+          .withFieldValue("connection_init_sql", 
ImmutableList.<String>builder().build())
+          .withFieldValue("driver_class_name", null)
+          .withFieldValue("driver_jars", null)
+          .withFieldValue("disable_auto_commit", true)
+          .withFieldValue("fetch_size", null)
+          .withFieldValue("num_partitions", 5)
+          .withFieldValue("output_parallelization", true)
+          .withFieldValue("partition_column", "col")
+          .withFieldValue("read_query", null)
+          .withFieldValue("username", "my_user")
+          .withFieldValue("password", "my_pass")
+          .build();
+
+  static final Row WRITE_CONFIG =
+      Row.withSchema(WRITE_PROVIDER.configurationSchema())
+          .withFieldValue("jdbc_url", "jdbc:mysql://host:port/database")
+          .withFieldValue("location", "test_table")
+          .withFieldValue("autosharding", true)
+          .withFieldValue("connection_init_sql", 
ImmutableList.<String>builder().build())
+          .withFieldValue("connection_properties", "some_property")
+          .withFieldValue("driver_class_name", null)
+          .withFieldValue("driver_jars", null)
+          .withFieldValue("batch_size", 100L)
+          .withFieldValue("username", "my_user")
+          .withFieldValue("password", "my_pass")
+          .withFieldValue("write_statement", null)
+          .build();
+
+  @Test
+  public void testRecreateWriteTransformFromRow() {
+    MySqlWriteSchemaTransform writeTransform =
+        (MySqlWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+
+    MySqlWriteSchemaTransformTranslator translator = new 
MySqlWriteSchemaTransformTranslator();
+    Row translatedRow = translator.toConfigRow(writeTransform);
+
+    MySqlWriteSchemaTransform writeTransformFromRow =
+        translator.fromConfigRow(translatedRow, 
PipelineOptionsFactory.create());
+
+    assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow());
+  }
+
+  @Test
+  public void testWriteTransformProtoTranslation()
+      throws InvalidProtocolBufferException, IOException {
+    // First build a pipeline
+    Pipeline p = Pipeline.create();
+    Schema inputSchema = Schema.builder().addStringField("name").build();
+    PCollection<Row> input =
+        p.apply(
+                Create.of(
+                    Collections.singletonList(
+                        Row.withSchema(inputSchema).addValue("test").build())))
+            .setRowSchema(inputSchema);
+
+    MySqlWriteSchemaTransform writeTransform =
+        (MySqlWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+    PCollectionRowTuple.of("input", input).apply(writeTransform);
+
+    // Then translate the pipeline to a proto and extract 
MySqlWriteSchemaTransform proto
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> writeTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr -> {
+                  RunnerApi.FunctionSpec spec = tr.getSpec();
+                  try {
+                    return 
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+                        && SchemaTransformPayload.parseFrom(spec.getPayload())
+                            .getIdentifier()
+                            .equals(WRITE_PROVIDER.identifier());
+                  } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+    assertEquals(1, writeTransformProto.size());
+    RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec();
+
+    // Check that the proto contains correct values
+    SchemaTransformPayload payload = 
SchemaTransformPayload.parseFrom(spec.getPayload());
+    Schema schemaFromSpec = 
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+    assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec);
+    Row rowFromSpec = 
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+
+    assertEquals(WRITE_CONFIG, rowFromSpec);
+
+    // Use the information in the proto to recreate the 
MySqlWriteSchemaTransform
+    MySqlWriteSchemaTransformTranslator translator = new 
MySqlWriteSchemaTransformTranslator();
+    MySqlWriteSchemaTransform writeTransformFromSpec =
+        translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+    assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow());
+  }
+
+  @Test
+  public void testReCreateReadTransformFromRow() {
+    // setting a subset of fields here.
+    MySqlReadSchemaTransform readTransform =
+        (MySqlReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+    MySqlReadSchemaTransformTranslator translator = new 
MySqlReadSchemaTransformTranslator();
+    Row row = translator.toConfigRow(readTransform);
+
+    MySqlReadSchemaTransform readTransformFromRow =
+        translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+    assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow());
+  }
+
+  @Test
+  public void testReadTransformProtoTranslation()
+      throws InvalidProtocolBufferException, IOException {
+    // First build a pipeline
+    Pipeline p = Pipeline.create();
+
+    MySqlReadSchemaTransform readTransform =
+        (MySqlReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+    // Mock inferBeamSchema since it requires database connection.
+    Schema expectedSchema = Schema.builder().addStringField("name").build();
+    try (MockedStatic<JdbcIO.ReadRows> mock = 
Mockito.mockStatic(JdbcIO.ReadRows.class)) {
+      mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(), 
Mockito.any()))
+          .thenReturn(expectedSchema);
+      PCollectionRowTuple.empty(p).apply(readTransform);
+    }
+
+    // Then translate the pipeline to a proto and extract 
MySqlReadSchemaTransform proto
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> readTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr -> {
+                  RunnerApi.FunctionSpec spec = tr.getSpec();
+                  try {
+                    return 
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+                        && SchemaTransformPayload.parseFrom(spec.getPayload())
+                            .getIdentifier()
+                            .equals(READ_PROVIDER.identifier());
+                  } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+    assertEquals(1, readTransformProto.size());
+    RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec();
+
+    // Check that the proto contains correct values
+    SchemaTransformPayload payload = 
SchemaTransformPayload.parseFrom(spec.getPayload());
+    Schema schemaFromSpec = 
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+    assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec);
+    Row rowFromSpec = 
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+    assertEquals(READ_CONFIG, rowFromSpec);
+
+    // Use the information in the proto to recreate the 
MySqlReadSchemaTransform
+    MySqlReadSchemaTransformTranslator translator = new 
MySqlReadSchemaTransformTranslator();
+    MySqlReadSchemaTransform readTransformFromSpec =
+        translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+    assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow());
+  }
+}
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index cda84629a7d..4f45eeac861 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -97,6 +97,7 @@ public class Managed {
   public static final String KAFKA = "kafka";
   public static final String BIGQUERY = "bigquery";
   public static final String POSTGRES = "postgres";
+  public static final String MYSQL = "mysql";
 
   // Supported SchemaTransforms
   public static final Map<String, String> READ_TRANSFORMS =
@@ -106,6 +107,7 @@ public class Managed {
           .put(KAFKA, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
           .put(BIGQUERY, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
           .put(POSTGRES, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ))
+          .put(MYSQL, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ))
           .build();
   public static final Map<String, String> WRITE_TRANSFORMS =
       ImmutableMap.<String, String>builder()
@@ -113,6 +115,7 @@ public class Managed {
           .put(KAFKA, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
           .put(BIGQUERY, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE))
           .put(POSTGRES, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE))
+          .put(MYSQL, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE))
           .build();
 
   /**
diff --git a/sdks/python/apache_beam/transforms/external.py 
b/sdks/python/apache_beam/transforms/external.py
index b22ed6e0c64..3f9f56a5413 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -83,6 +83,8 @@ MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING = {
     ManagedTransforms.Urns.BIGQUERY_WRITE.urn: 
_GCP_EXPANSION_SERVICE_JAR_TARGET,  # pylint: disable=line-too-long
     ManagedTransforms.Urns.POSTGRES_READ.urn: 
_GCP_EXPANSION_SERVICE_JAR_TARGET,
     ManagedTransforms.Urns.POSTGRES_WRITE.urn: 
_GCP_EXPANSION_SERVICE_JAR_TARGET,  # pylint: disable=line-too-long
+    ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET,
+    ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET,
 }
 
 
diff --git a/sdks/python/apache_beam/transforms/managed.py 
b/sdks/python/apache_beam/transforms/managed.py
index 72dfb6fd9a0..03449236ac9 100644
--- a/sdks/python/apache_beam/transforms/managed.py
+++ b/sdks/python/apache_beam/transforms/managed.py
@@ -86,6 +86,7 @@ _ICEBERG_CDC = "iceberg_cdc"
 KAFKA = "kafka"
 BIGQUERY = "bigquery"
 POSTGRES = "postgres"
+MYSQL = "mysql"
 
 __all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]
 
@@ -98,6 +99,7 @@ class Read(PTransform):
       KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn,
       BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn,
       POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn,
+      MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn,
   }
 
   def __init__(
@@ -140,6 +142,7 @@ class Write(PTransform):
       KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn,
       BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn,
       POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn,
+      MYSQL: ManagedTransforms.Urns.MYSQL_WRITE.urn,
   }
 
   def __init__(

Reply via email to