This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e3293e289e4 Minor changes on Managed JDBCIO (#36339)
e3293e289e4 is described below
commit e3293e289e4026c5b510e08deb8b9a03575d40f1
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Oct 8 18:07:07 2025 -0400
Minor changes on Managed JDBCIO (#36339)
---
sdks/java/expansion-service/container/Dockerfile | 1 -
sdks/java/extensions/schemaio-expansion-service/build.gradle | 6 ++++++
.../providers/ReadFromPostgresSchemaTransformProvider.java | 10 +++++++---
.../jdbc/providers/WriteToPostgresSchemaTransformProvider.java | 10 +++++++---
sdks/python/apache_beam/transforms/external.py | 2 ++
5 files changed, 22 insertions(+), 7 deletions(-)
diff --git a/sdks/java/expansion-service/container/Dockerfile
b/sdks/java/expansion-service/container/Dockerfile
index 2688a317671..968f5cd2ac2 100644
--- a/sdks/java/expansion-service/container/Dockerfile
+++ b/sdks/java/expansion-service/container/Dockerfile
@@ -28,7 +28,6 @@ WORKDIR /opt/apache/beam
COPY target/avro.jar jars/
COPY target/beam-sdks-java-io-expansion-service.jar jars/
COPY target/beam-sdks-java-io-google-cloud-platform-expansion-service.jar jars/
-COPY target/beam-sdks-java-extensions-schemaio-expansion-service.jar jars/
# Copy licenses
COPY target/LICENSE /opt/apache/beam/
diff --git a/sdks/java/extensions/schemaio-expansion-service/build.gradle
b/sdks/java/extensions/schemaio-expansion-service/build.gradle
index 12ee92a9e10..e33d6b96b63 100644
--- a/sdks/java/extensions/schemaio-expansion-service/build.gradle
+++ b/sdks/java/extensions/schemaio-expansion-service/build.gradle
@@ -76,3 +76,9 @@ task runExpansionService (type: JavaExec) {
classpath = sourceSets.test.runtimeClasspath
args = [project.findProperty("constructionService.port") ?: "8097"]
}
+
+shadowJar {
+ manifest {
+ attributes(["Multi-Release": true])
+ }
+}
\ No newline at end of file
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
index 834e7a0a492..05011be7379 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
@@ -59,14 +59,18 @@ public class ReadFromPostgresSchemaTransformProvider
extends JdbcReadSchemaTrans
JdbcReadSchemaTransformConfiguration configuration) {
String jdbcType = configuration.getJdbcType();
if (jdbcType != null && !jdbcType.isEmpty() &&
!jdbcType.equals(jdbcType())) {
- throw new IllegalArgumentException(
- String.format("Wrong JDBC type. Expected '%s' but got '%s'",
jdbcType(), jdbcType));
+ LOG.warn(
+ "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+ jdbcType(),
+ jdbcType,
+ jdbcType());
+ configuration =
configuration.toBuilder().setJdbcType(jdbcType()).build();
}
List<@org.checkerframework.checker.nullness.qual.Nullable String>
connectionInitSql =
configuration.getConnectionInitSql();
if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
- LOG.warn("Postgres does not support connectionInitSql, ignoring.");
+ throw new IllegalArgumentException("Postgres does not support
connectionInitSql.");
}
Boolean disableAutoCommit = configuration.getDisableAutoCommit();
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
index 97074742dbe..64581c2b01b 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
@@ -59,14 +59,18 @@ public class WriteToPostgresSchemaTransformProvider extends
JdbcWriteSchemaTrans
JdbcWriteSchemaTransformConfiguration configuration) {
String jdbcType = configuration.getJdbcType();
if (jdbcType != null && !jdbcType.isEmpty() &&
!jdbcType.equals(jdbcType())) {
- throw new IllegalArgumentException(
- String.format("Wrong JDBC type. Expected '%s' but got '%s'",
jdbcType(), jdbcType));
+ LOG.warn(
+ "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+ jdbcType(),
+ jdbcType,
+ jdbcType());
+ configuration =
configuration.toBuilder().setJdbcType(jdbcType()).build();
}
List<@org.checkerframework.checker.nullness.qual.Nullable String>
connectionInitSql =
configuration.getConnectionInitSql();
if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
- LOG.warn("Postgres does not support connectionInitSql, ignoring.");
+ throw new IllegalArgumentException("Postgres does not support
connectionInitSql.");
}
// Override "connectionInitSql" for postgres
diff --git a/sdks/python/apache_beam/transforms/external.py
b/sdks/python/apache_beam/transforms/external.py
index c9029119241..ff4e8b6098b 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -85,6 +85,8 @@ MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING = {
ManagedTransforms.Urns.POSTGRES_WRITE.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long
ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET,
ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET,
+ ManagedTransforms.Urns.SQL_SERVER_READ.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long
+ ManagedTransforms.Urns.SQL_SERVER_WRITE.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long
}