This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a7f5898f885 Default SchemaTransform configs to snake_case (#31374)
a7f5898f885 is described below
commit a7f5898f8850e5d110ccb299830c12a4f0807a73
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Jun 4 16:29:50 2024 -0400
Default SchemaTransform configs to snake_case (#31374)
* default schematransform configs to snake_case
* add to CHANGES.md
* update Go's bigtable wrapper to export snake_case param names
* make more yaml snake_case changes
---
.../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +-
CHANGES.md | 9 +++
sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go | 6 +-
.../transforms/TypedSchemaTransformProvider.java | 46 ++++++++---
.../TypedSchemaTransformProviderTest.java | 8 +-
.../IcebergReadSchemaTransformProvider.java | 12 ---
.../IcebergWriteSchemaTransformProvider.java | 11 ---
.../KafkaReadSchemaTransformProviderTest.java | 16 ++--
.../managed/ManagedSchemaTransformProvider.java | 11 ---
.../sdk/managed/ManagedTransformConstants.java | 21 +-----
.../ManagedSchemaTransformProviderTest.java | 12 +--
.../ManagedSchemaTransformTranslationTest.java | 6 +-
.../org/apache/beam/sdk/managed/ManagedTest.java | 2 +-
.../managed/src/test/resources/test_config.yaml | 4 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 14 ++--
sdks/python/apache_beam/io/gcp/bigtableio.py | 12 +--
.../transforms/external_transform_provider.py | 35 +--------
.../external_transform_provider_it_test.py | 22 ------
sdks/python/apache_beam/yaml/standard_io.yaml | 88 +++++++++++-----------
.../apache_beam/yaml/standard_providers.yaml | 8 +-
sdks/python/apache_beam/yaml/yaml_provider.py | 2 +-
sdks/python/gen_xlang_wrappers.py | 21 +-----
22 files changed, 141 insertions(+), 227 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index b2683333323..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 2
+ "modification": 1
}
diff --git a/CHANGES.md b/CHANGES.md
index 91bdfef6916..1aee8283bcb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -88,6 +88,15 @@
This new implementation still supports all (immutable) List methods as
before,
but some of the random access methods like get() and size() will be slower.
To use the old implementation one can use View.asList().withRandomAccess().
+* SchemaTransforms implemented with TypedSchemaTransformProvider now produce a
+ configuration Schema with snake_case naming convention
+ ([#31374](https://github.com/apache/beam/pull/31374)). This will make the
following
+ cases problematic:
+ * Running a pre-2.57.0 remote SDK pipeline containing a 2.57.0+ Java
SchemaTransform,
+ and vice versa:
+ * Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java
SchemaTransform
+ * All direct uses of Python's
[SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
+ should be updated to use new snake_case parameter names.
## Deprecations
diff --git a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
index 5b6d7d91631..81df24223ca 100644
--- a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
+++ b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
@@ -62,9 +62,9 @@ import (
)
type bigtableConfig struct {
- InstanceId string `beam:"instanceId"`
- ProjectId string `beam:"projectId"`
- TableId string `beam:"tableId"`
+ InstanceId string `beam:"instance_id"`
+ ProjectId string `beam:"project_id"`
+ TableId string `beam:"table_id"`
}
// Cell represents a single cell in a Bigtable row.
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index d5c6c724c6f..d9b49dd3ca2 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.schemas.transforms;
+import static
org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import java.lang.reflect.ParameterizedType;
import java.util.List;
@@ -26,9 +28,14 @@ import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
/**
@@ -38,8 +45,12 @@ import org.apache.beam.sdk.values.Row;
* <p>ConfigT should be available in the SchemaRegistry.
*
* <p>{@link #configurationSchema()} produces a configuration {@link Schema}
that is inferred from
- * {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be
used produce a {@link
- * SchemaTransform} using {@link #from(Row)}, as long as the Row fits the
configuration Schema.
+ * {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be
used to produce a
+ * {@link SchemaTransform} using {@link #from(Row)}, as long as the Row fits
the configuration
+ * Schema.
+ *
+ * <p>NOTE: The inferred field names in the configuration {@link Schema} and
{@link Row} follow the
+ * {@code snake_case} naming convention.
*
* <p><b>Internal only:</b> This interface is actively being worked on and it
will likely change as
* we provide implementations for more standard Beam transforms. We provide no
backwards
@@ -78,10 +89,11 @@ public abstract class TypedSchemaTransformProvider<ConfigT>
implements SchemaTra
}
@Override
- public Schema configurationSchema() {
+ public final Schema configurationSchema() {
try {
// Sort the fields by name to ensure a consistent schema is produced
- return
SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
+ // We also establish a `snake_case` convention for all SchemaTransform
configurations
+ return
SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
@@ -90,9 +102,12 @@ public abstract class TypedSchemaTransformProvider<ConfigT>
implements SchemaTra
}
}
- /** Produces a {@link SchemaTransform} from a Row configuration. */
+ /**
+ * Produces a {@link SchemaTransform} from a Row configuration. Row fields
are expected to have
+ * `snake_case` naming convention.
+ */
@Override
- public SchemaTransform from(Row configuration) {
+ public final SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
}
@@ -103,9 +118,22 @@ public abstract class
TypedSchemaTransformProvider<ConfigT> implements SchemaTra
private ConfigT configFromRow(Row configuration) {
try {
- return SchemaRegistry.createDefault()
- .getFromRowFunction(configurationClass())
- .apply(configuration);
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<Row, ConfigT> rowToConfigT =
+ registry.getFromRowFunction(configurationClass());
+
+ // Configuration objects handled by the AutoValueSchema provider will
expect Row fields with
+ // camelCase naming convention
+ SchemaProvider schemaProvider =
registry.getSchemaProvider(configurationClass());
+ if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
+ && checkNotNull(
+ ((DefaultSchemaProvider) schemaProvider)
+ .getUnderlyingSchemaProvider(configurationClass()))
+ .getClass()
+ .equals(AutoValueSchema.class)) {
+ configuration = configuration.toCamelCase();
+ }
+ return rowToConfigT.apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() +
"SchemaTransformProvider's config");
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
index b1dc0911a92..2eef0e30f80 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
@@ -130,8 +130,8 @@ public class TypedSchemaTransformProviderTest {
Row inputConfig =
Row.withSchema(provider.configurationSchema())
- .withFieldValue("stringField", "field1")
- .withFieldValue("integerField", Integer.valueOf(13))
+ .withFieldValue("string_field", "field1")
+ .withFieldValue("integer_field", Integer.valueOf(13))
.build();
Configuration outputConfig = ((FakeSchemaTransform)
provider.from(inputConfig)).config;
@@ -150,8 +150,8 @@ public class TypedSchemaTransformProviderTest {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
Row inputConfig =
Row.withSchema(provider.configurationSchema())
- .withFieldValue("stringField", "field1")
- .withFieldValue("integerField", Integer.valueOf(13))
+ .withFieldValue("string_field", "field1")
+ .withFieldValue("integer_field", Integer.valueOf(13))
.build();
assertEquals(Arrays.asList("field1", "13"),
provider.dependencies(inputConfig, null).get());
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index bfe2fab1f9a..fb32e18d937 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -25,7 +25,6 @@ import
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -132,15 +131,4 @@ public class IcebergReadSchemaTransformProvider extends
TypedSchemaTransformProv
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}
-
- // TODO: set global snake_case naming convention and remove these special
cases
- @Override
- public SchemaTransform from(Row rowConfig) {
- return super.from(rowConfig.toCamelCase());
- }
-
- @Override
- public Schema configurationSchema() {
- return super.configurationSchema().toSnakeCase();
- }
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index 71183c6b0a0..b490693a9ad 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -176,15 +176,4 @@ public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformPro
}
}
}
-
- // TODO: set global snake_case naming convention and remove these special
cases
- @Override
- public SchemaTransform from(Row rowConfig) {
- return super.from(rowConfig.toCamelCase());
- }
-
- @Override
- public Schema configurationSchema() {
- return super.configurationSchema().toSnakeCase();
- }
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index d5962a737ba..f5ac5bb54ad 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -121,17 +121,17 @@ public class KafkaReadSchemaTransformProviderTest {
assertEquals(
Sets.newHashSet(
- "bootstrapServers",
+ "bootstrap_servers",
"topic",
"schema",
- "autoOffsetResetConfig",
- "consumerConfigUpdates",
+ "auto_offset_reset_config",
+ "consumer_config_updates",
"format",
- "confluentSchemaRegistrySubject",
- "confluentSchemaRegistryUrl",
- "errorHandling",
- "fileDescriptorPath",
- "messageName"),
+ "confluent_schema_registry_subject",
+ "confluent_schema_registry_url",
+ "error_handling",
+ "file_descriptor_path",
+ "message_name"),
kafkaProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toSet()));
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index 0702137cffd..6f97983d326 100644
---
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -258,15 +258,4 @@ public class ManagedSchemaTransformProvider
throw new RuntimeException(e.getMessage());
}
}
-
- // TODO: set global snake_case naming convention and remove these special
cases
- @Override
- public SchemaTransform from(Row rowConfig) {
- return super.from(rowConfig.toCamelCase());
- }
-
- @Override
- public Schema configurationSchema() {
- return super.configurationSchema().toSnakeCase();
- }
}
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
index 8165633cf15..141544305a3 100644
---
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
+++
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
@@ -45,27 +45,10 @@ public class ManagedTransformConstants {
public static final String KAFKA_WRITE =
"beam:schematransform:org.apache.beam:kafka_write:v1";
private static final Map<String, String> KAFKA_READ_MAPPINGS =
- ImmutableMap.<String, String>builder()
- .put("topic", "topic")
- .put("bootstrap_servers", "bootstrapServers")
- .put("consumer_config_updates", "consumerConfigUpdates")
- .put("confluent_schema_registry_url", "confluentSchemaRegistryUrl")
- .put("confluent_schema_registry_subject",
"confluentSchemaRegistrySubject")
- .put("data_format", "format")
- .put("schema", "schema")
- .put("file_descriptor_path", "fileDescriptorPath")
- .put("message_name", "messageName")
- .build();
+ ImmutableMap.<String, String>builder().put("data_format",
"format").build();
private static final Map<String, String> KAFKA_WRITE_MAPPINGS =
- ImmutableMap.<String, String>builder()
- .put("topic", "topic")
- .put("bootstrap_servers", "bootstrapServers")
- .put("producer_config_updates", "producerConfigUpdates")
- .put("data_format", "format")
- .put("file_descriptor_path", "fileDescriptorPath")
- .put("message_name", "messageName")
- .build();
+ ImmutableMap.<String, String>builder().put("data_format",
"format").build();
public static final Map<String, Map<String, String>> MAPPINGS =
ImmutableMap.<String, Map<String, String>>builder()
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
index 3a3465406c0..e9edf8751e3 100644
---
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -51,7 +51,7 @@ public class ManagedSchemaTransformProviderTest {
@Test
public void testGetConfigRowFromYamlString() {
- String yamlString = "extraString: abc\n" + "extraInteger: 123";
+ String yamlString = "extra_string: abc\n" + "extra_integer: 123";
ManagedConfig config =
ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
@@ -60,8 +60,8 @@ public class ManagedSchemaTransformProviderTest {
Row expectedRow =
Row.withSchema(TestSchemaTransformProvider.SCHEMA)
- .withFieldValue("extraString", "abc")
- .withFieldValue("extraInteger", 123)
+ .withFieldValue("extra_string", "abc")
+ .withFieldValue("extra_integer", 123)
.build();
Row returnedRow =
@@ -84,8 +84,8 @@ public class ManagedSchemaTransformProviderTest {
Schema configSchema = new
TestSchemaTransformProvider().configurationSchema();
Row expectedRow =
Row.withSchema(configSchema)
- .withFieldValue("extraString", "abc")
- .withFieldValue("extraInteger", 123)
+ .withFieldValue("extra_string", "abc")
+ .withFieldValue("extra_integer", 123)
.build();
Row configRow =
ManagedSchemaTransformProvider.getRowConfig(
@@ -96,7 +96,7 @@ public class ManagedSchemaTransformProviderTest {
@Test
public void testBuildWithYamlString() {
- String yamlString = "extraString: abc\n" + "extraInteger: 123";
+ String yamlString = "extra_string: abc\n" + "extra_integer: 123";
ManagedConfig config =
ManagedConfig.builder()
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
index 0b0ad532dbd..f7769a9e1d1 100644
---
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
@@ -91,7 +91,7 @@ public class ManagedSchemaTransformTranslationTest {
@Test
public void testReCreateTransformFromRowWithConfig() {
- String yamlString = "extraString: abc\n" + "extraInteger: 123";
+ String yamlString = "extra_string: abc\n" + "extra_integer: 123";
ManagedConfig originalConfig =
ManagedConfig.builder()
@@ -130,8 +130,8 @@ public class ManagedSchemaTransformTranslationTest {
.setRowSchema(inputSchema);
Map<String, Object> underlyingConfig =
ImmutableMap.<String, Object>builder()
- .put("extraString", "abc")
- .put("extraInteger", 123)
+ .put("extra_string", "abc")
+ .put("extra_integer", 123)
.build();
String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
Managed.ManagedTransform transform =
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
index 260085486c8..7ed364d0e17 100644
---
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
@@ -90,7 +90,7 @@ public class ManagedTest {
.setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.build()
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
- .withConfig(ImmutableMap.of("extraString", "abc", "extraInteger",
123));
+ .withConfig(ImmutableMap.of("extra_string", "abc",
"extra_integer", 123));
runTestProviderTest(writeOp);
}
diff --git a/sdks/java/managed/src/test/resources/test_config.yaml
b/sdks/java/managed/src/test/resources/test_config.yaml
index 3967b6095ea..da3bd68546c 100644
--- a/sdks/java/managed/src/test/resources/test_config.yaml
+++ b/sdks/java/managed/src/test/resources/test_config.yaml
@@ -17,5 +17,5 @@
# under the License.
#
-extraString: "abc"
-extraInteger: 123
+extra_string: "abc"
+extra_integer: 123
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index caeed6b7b9b..29b7b575932 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2599,13 +2599,13 @@ class StorageWriteToBigQuery(PTransform):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table=table,
- createDisposition=self._create_disposition,
- writeDisposition=self._write_disposition,
- triggeringFrequencySeconds=self._triggering_frequency,
- autoSharding=self._with_auto_sharding,
- numStreams=self._num_storage_api_streams,
- useAtLeastOnceSemantics=self._use_at_least_once,
- errorHandling={
+ create_disposition=self._create_disposition,
+ write_disposition=self._write_disposition,
+ triggering_frequency_seconds=self._triggering_frequency,
+ auto_sharding=self._with_auto_sharding,
+ num_streams=self._num_storage_api_streams,
+ use_at_least_once_semantics=self._use_at_least_once,
+ error_handling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
}))
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py
b/sdks/python/apache_beam/io/gcp/bigtableio.py
index f8534f38ddf..0f3944a791b 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio.py
@@ -225,9 +225,9 @@ class WriteToBigTable(beam.PTransform):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- tableId=self._table_id,
- instanceId=self._instance_id,
- projectId=self._project_id)
+ table_id=self._table_id,
+ instance_id=self._instance_id,
+ project_id=self._project_id)
return (
input
@@ -323,9 +323,9 @@ class ReadFromBigtable(PTransform):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- tableId=self._table_id,
- instanceId=self._instance_id,
- projectId=self._project_id)
+ table_id=self._table_id,
+ instance_id=self._instance_id,
+ project_id=self._project_id)
return (
input.pipeline
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py
b/sdks/python/apache_beam/transforms/external_transform_provider.py
index 2799bd1b9e9..67adda5aec0 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -39,32 +39,6 @@ def snake_case_to_upper_camel_case(string):
return output
-def snake_case_to_lower_camel_case(string):
- """Convert snake_case to lowerCamelCase"""
- if len(string) <= 1:
- return string.lower()
- upper = snake_case_to_upper_camel_case(string)
- return upper[0].lower() + upper[1:]
-
-
-def camel_case_to_snake_case(string):
- """Convert camelCase to snake_case"""
- arr = []
- word = []
- for i, n in enumerate(string):
- # If seeing an upper letter after a lower letter, we just witnessed a word
- # If seeing an upper letter and the next letter is lower, we may have just
- # witnessed an all caps word
- if n.isupper() and ((i > 0 and string[i - 1].islower()) or
- (i + 1 < len(string) and string[i + 1].islower())):
- arr.append(''.join(word))
- word = [n.lower()]
- else:
- word.append(n.lower())
- arr.append(''.join(word))
- return '_'.join(arr).strip('_')
-
-
# Information regarding a Wrapper parameter.
ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])
@@ -76,7 +50,7 @@ def get_config_with_descriptions(
descriptions = schematransform.configuration_schema._field_descriptions
fields_with_descriptions = {}
for field in schema.fields:
- fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
+ fields_with_descriptions[field.name] = ParamInfo(
typing_from_runner_api(field.type),
descriptions[field.name],
field.name)
@@ -105,16 +79,11 @@ class ExternalTransform(PTransform):
expansion_service or self.default_expansion_service
def expand(self, input):
- camel_case_kwargs = {
- snake_case_to_lower_camel_case(k): v
- for k, v in self._kwargs.items()
- }
-
external_schematransform = SchemaAwareExternalTransform(
identifier=self.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- **camel_case_kwargs)
+ **self._kwargs)
return input | external_schematransform
diff --git
a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
index a53001c85fd..95720cee7ee 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
@@ -37,9 +37,7 @@ from apache_beam.transforms.external import
BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import
STANDARD_URN_PATTERN
from apache_beam.transforms.external_transform_provider import
ExternalTransform
from apache_beam.transforms.external_transform_provider import
ExternalTransformProvider
-from apache_beam.transforms.external_transform_provider import
camel_case_to_snake_case
from apache_beam.transforms.external_transform_provider import
infer_name_from_identifier
-from apache_beam.transforms.external_transform_provider import
snake_case_to_lower_camel_case
from apache_beam.transforms.external_transform_provider import
snake_case_to_upper_camel_case
from apache_beam.transforms.xlang.io import GenerateSequence
@@ -54,26 +52,6 @@ class NameAndTypeUtilsTest(unittest.TestCase):
for case in test_cases:
self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0]))
- def test_snake_case_to_lower_camel_case(self):
- test_cases = [("", ""), ("test", "test"), ("test_name", "testName"),
- ("test_double_underscore", "testDoubleUnderscore"),
- ("TEST_CAPITALIZED", "testCapitalized"),
- ("_prepended_underscore", "prependedUnderscore"),
- ("appended_underscore_", "appendedUnderscore")]
- for case in test_cases:
- self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0]))
-
- def test_camel_case_to_snake_case(self):
- test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"),
- ("TestDoubleUnderscore",
- "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"),
- ("BEGINNINGAllCaps",
- "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"),
- ("AllCapsMIDDLEWord", "all_caps_middle_word"),
- ("lowerCamelCase", "lower_camel_case")]
- for case in test_cases:
- self.assertEqual(case[1], camel_case_to_snake_case(case[0]))
-
def test_infer_name_from_identifier(self):
standard_test_cases = [
("beam:schematransform:org.apache.beam:transform:v1", "Transform"),
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml
b/sdks/python/apache_beam/yaml/standard_io.yaml
index 8a5ffd9f6a9..005e1af0549 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -30,16 +30,16 @@
mappings:
'ReadFromBigQuery':
query: 'query'
- table: 'tableSpec'
- fields: 'selectedFields'
- row_restriction: 'rowRestriction'
+ table: 'table_spec'
+ fields: 'selected_fields'
+ row_restriction: 'row_restriction'
'WriteToBigQuery':
table: 'table'
- create_disposition: 'createDisposition'
- write_disposition: 'writeDisposition'
- error_handling: 'errorHandling'
+ create_disposition: 'create_disposition'
+ write_disposition: 'write_disposition'
+ error_handling: 'error_handling'
# TODO(https://github.com/apache/beam/issues/30058): Required until
autosharding support is fixed
- num_streams: 'numStreams'
+ num_streams: 'num_streams'
underlying_provider:
type: beamJar
transforms:
@@ -56,24 +56,24 @@
mappings:
'ReadFromKafka':
'schema': 'schema'
- 'consumer_config': 'consumerConfigUpdates'
+ 'consumer_config': 'consumer_config_updates'
'format': 'format'
'topic': 'topic'
- 'bootstrap_servers': 'bootstrapServers'
- 'confluent_schema_registry_url': 'confluentSchemaRegistryUrl'
- 'confluent_schema_registry_subject': 'confluentSchemaRegistrySubject'
- 'auto_offset_reset_config': 'autoOffsetResetConfig'
- 'error_handling': 'errorHandling'
- 'file_descriptor_path': 'fileDescriptorPath'
- 'message_name': 'messageName'
+ 'bootstrap_servers': 'bootstrap_servers'
+ 'confluent_schema_registry_url': 'confluent_schema_registry_url'
+ 'confluent_schema_registry_subject':
'confluent_schema_registry_subject'
+ 'auto_offset_reset_config': 'auto_offset_reset_config'
+ 'error_handling': 'error_handling'
+ 'file_descriptor_path': 'file_descriptor_path'
+ 'message_name': 'message_name'
'WriteToKafka':
'format': 'format'
'topic': 'topic'
- 'bootstrap_servers': 'bootstrapServers'
- 'producer_config_updates': 'producerConfigUpdates'
- 'error_handling': 'errorHandling'
- 'file_descriptor_path': 'fileDescriptorPath'
- 'message_name': 'messageName'
+ 'bootstrap_servers': 'bootstrap_servers'
+ 'producer_config_updates': 'producer_config_updates'
+ 'error_handling': 'error_handling'
+ 'file_descriptor_path': 'file_descriptor_path'
+ 'message_name': 'message_name'
'schema': 'schema'
underlying_provider:
type: beamJar
@@ -93,24 +93,24 @@
'project': 'project'
'schema': 'schema'
'format': 'format'
- 'subscription_name': 'subscriptionName'
+ 'subscription_name': 'subscription_name'
'location': 'location'
'attributes': 'attributes'
- 'attribute_map': 'attributeMap'
- 'attribute_id': 'attributeId'
- 'error_handling': 'errorHandling'
- 'file_descriptor_path': 'fileDescriptorPath'
- 'message_name': 'messageName'
+ 'attribute_map': 'attribute_map'
+ 'attribute_id': 'attribute_id'
+ 'error_handling': 'error_handling'
+ 'file_descriptor_path': 'file_descriptor_path'
+ 'message_name': 'message_name'
'WriteToPubSubLite':
'project': 'project'
'format': 'format'
- 'topic_name': 'topicName'
+ 'topic_name': 'topic_name'
'location': 'location'
'attributes': 'attributes'
- 'attribute_id': 'attributeId'
- 'error_handling': 'errorHandling'
- 'file_descriptor_path': 'fileDescriptorPath'
- 'message_name': 'messageName'
+ 'attribute_id': 'attribute_id'
+ 'error_handling': 'error_handling'
+ 'file_descriptor_path': 'file_descriptor_path'
+ 'message_name': 'message_name'
'schema': 'schema'
underlying_provider:
type: beamJar
@@ -205,26 +205,26 @@
config:
mappings:
'ReadFromJdbc':
- driver_class_name: 'driverClassName'
- type: 'jdbcType'
- url: 'jdbcUrl'
+ driver_class_name: 'driver_class_name'
+ type: 'jdbc_type'
+ url: 'jdbc_url'
username: 'username'
password: 'password'
table: 'location'
- query: 'readQuery'
- driver_jars: 'driverJars'
- connection_properties: 'connectionProperties'
- connection_init_sql: 'connectionInitSql'
+ query: 'read_query'
+ driver_jars: 'driver_jars'
+ connection_properties: 'connection_properties'
+ connection_init_sql: 'connection_init_sql'
'WriteToJdbc':
- driver_class_name: 'driverClassName'
- type: 'jdbcType'
- url: 'jdbcUrl'
+ driver_class_name: 'driver_class_name'
+ type: 'jdbc_type'
+ url: 'jdbc_url'
username: 'username'
password: 'password'
table: 'location'
- driver_jars: 'driverJars'
- connection_properties: 'connectionProperties'
- connection_init_sql: 'connectionInitSql'
+ driver_jars: 'driver_jars'
+ connection_properties: 'connection_properties'
+ connection_init_sql: 'connection_init_sql'
'ReadFromMySql': 'ReadFromJdbc'
'WriteToMySql': 'WriteToJdbc'
'ReadFromPostgres': 'ReadFromJdbc'
diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml
b/sdks/python/apache_beam/yaml/standard_providers.yaml
index 89b0cc9d553..8d0037d4dd9 100644
--- a/sdks/python/apache_beam/yaml/standard_providers.yaml
+++ b/sdks/python/apache_beam/yaml/standard_providers.yaml
@@ -68,20 +68,20 @@
append: 'append'
drop: 'drop'
fields: 'fields'
- error_handling: 'errorHandling'
+ error_handling: 'error_handling'
'MapToFields-java':
language: 'language'
append: 'append'
drop: 'drop'
fields: 'fields'
- error_handling: 'errorHandling'
+ error_handling: 'error_handling'
'Filter-java':
language: 'language'
keep: 'keep'
- error_handling: 'errorHandling'
+ error_handling: 'error_handling'
'Explode':
fields: 'fields'
- cross_product: 'crossProduct'
+ cross_product: 'cross_product'
underlying_provider:
type: beamJar
transforms:
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py
b/sdks/python/apache_beam/yaml/yaml_provider.py
index d5f6d03c284..794cad0ec7f 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -929,7 +929,7 @@ def create_java_builtin_provider():
return java_provider.create_transform(
'WindowIntoStrategy',
{
- 'serializedWindowingStrategy': windowing_strategy.to_runner_api(
+ 'serialized_windowing_strategy': windowing_strategy.to_runner_api(
empty_context).SerializeToString()
},
None)
diff --git a/sdks/python/gen_xlang_wrappers.py
b/sdks/python/gen_xlang_wrappers.py
index a75fc05cba7..ea4f496c2d0 100644
--- a/sdks/python/gen_xlang_wrappers.py
+++ b/sdks/python/gen_xlang_wrappers.py
@@ -233,24 +233,6 @@ def pretty_type(tp):
return (tp, nullable)
-def camel_case_to_snake_case(string):
- """Convert camelCase to snake_case"""
- arr = []
- word = []
- for i, n in enumerate(string):
- # If seeing an upper letter after a lower letter, we just witnessed a word
- # If seeing an upper letter and the next letter is lower, we may have just
- # witnessed an all caps word
- if n.isupper() and ((i > 0 and string[i - 1].islower()) or
- (i + 1 < len(string) and string[i + 1].islower())):
- arr.append(''.join(word))
- word = [n.lower()]
- else:
- word.append(n.lower())
- arr.append(''.join(word))
- return '_'.join(arr).strip('_')
-
-
def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
"""
Generates code for external transform wrapper classes (subclasses of
@@ -287,9 +269,8 @@ def get_wrappers_from_transform_configs(config_file) ->
Dict[str, List[str]]:
parameters = []
for param, info in fields.items():
- pythonic_name = camel_case_to_snake_case(param)
param_details = {
- "name": pythonic_name,
+ "name": param,
"type": info['type'],
"description": info['description'],
}