This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b33a8438ad3 Revert global snake_case convention for SchemaTransforms
(#31109)
b33a8438ad3 is described below
commit b33a8438ad335b75feec0c5c97e9a728795fc6ff
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Apr 25 22:01:15 2024 -0400
Revert global snake_case convention for SchemaTransforms (#31109)
* revert global snake_case convention and make it a special case for
iceberg and managed
* remove docs and comments too
* cleanup
* revert python and yaml changes too
* fix test
---
.../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +-
.../transforms/TypedSchemaTransformProvider.java | 36 +++++-----------------
.../TypedSchemaTransformProviderTest.java | 8 ++---
.../IcebergReadSchemaTransformProvider.java | 12 ++++++++
.../IcebergWriteSchemaTransformProvider.java | 11 +++++++
.../KafkaReadSchemaTransformProviderTest.java | 16 +++++-----
.../managed/ManagedSchemaTransformProvider.java | 12 +++++++-
.../ManagedSchemaTransformProviderTest.java | 12 ++++----
.../ManagedSchemaTransformTranslationTest.java | 6 ++--
.../org/apache/beam/sdk/managed/ManagedTest.java | 2 +-
.../managed/src/test/resources/test_config.yaml | 4 +--
sdks/python/apache_beam/io/gcp/bigquery.py | 14 ++++-----
sdks/python/apache_beam/io/gcp/bigtableio.py | 12 ++++----
.../transforms/external_transform_provider.py | 35 +++++++++++++++++++--
.../external_transform_provider_it_test.py | 22 +++++++++++++
sdks/python/apache_beam/yaml/yaml_provider.py | 2 +-
sdks/python/gen_xlang_wrappers.py | 21 ++++++++++++-
17 files changed, 155 insertions(+), 72 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index e3d6056a5de..b2683333323 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 1
+ "modification": 2
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index cfd298ae87e..d5c6c724c6f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -17,10 +17,8 @@
*/
package org.apache.beam.sdk.schemas.transforms;
-import static
org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import java.lang.reflect.ParameterizedType;
import java.util.List;
@@ -28,10 +26,8 @@ import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.values.Row;
@@ -45,9 +41,6 @@ import org.apache.beam.sdk.values.Row;
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be
used produce a {@link
* SchemaTransform} using {@link #from(Row)}, as long as the Row fits the
configuration Schema.
*
- * <p>NOTE: The inferred field names in the configuration {@link Schema} and
{@link Row} follow the
- * {@code snake_case} naming convention.
- *
* <p><b>Internal only:</b> This interface is actively being worked on and it
will likely change as
* we provide implementations for more standard Beam transforms. We provide no
backwards
* compatibility guarantees and it should not be implemented outside of the
Beam repository.
@@ -85,11 +78,10 @@ public abstract class TypedSchemaTransformProvider<ConfigT>
implements SchemaTra
}
@Override
- public final Schema configurationSchema() {
+ public Schema configurationSchema() {
try {
// Sort the fields by name to ensure a consistent schema is produced
- // We also establish a `snake_case` convention for all SchemaTransform
configurations
- return
SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
+ return
SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
@@ -98,12 +90,9 @@ public abstract class TypedSchemaTransformProvider<ConfigT>
implements SchemaTra
}
}
- /**
- * Produces a {@link SchemaTransform} from a Row configuration. Row fields
are expected to have
- * `snake_case` naming convention.
- */
+ /** Produces a {@link SchemaTransform} from a Row configuration. */
@Override
- public final SchemaTransform from(Row configuration) {
+ public SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
}
@@ -114,20 +103,9 @@ public abstract class
TypedSchemaTransformProvider<ConfigT> implements SchemaTra
private ConfigT configFromRow(Row configuration) {
try {
- SchemaRegistry registry = SchemaRegistry.createDefault();
-
- // Configuration objects handled by the AutoValueSchema provider will
expect Row fields with
- // camelCase naming convention
- SchemaProvider schemaProvider =
registry.getSchemaProvider(configurationClass());
- if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
- && checkNotNull(
- ((DefaultSchemaProvider) schemaProvider)
- .getUnderlyingSchemaProvider(configurationClass()))
- .getClass()
- .equals(AutoValueSchema.class)) {
- configuration = configuration.toCamelCase();
- }
- return
registry.getFromRowFunction(configurationClass()).apply(configuration);
+ return SchemaRegistry.createDefault()
+ .getFromRowFunction(configurationClass())
+ .apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() +
"SchemaTransformProvider's config");
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
index 2eef0e30f80..b1dc0911a92 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
@@ -130,8 +130,8 @@ public class TypedSchemaTransformProviderTest {
Row inputConfig =
Row.withSchema(provider.configurationSchema())
- .withFieldValue("string_field", "field1")
- .withFieldValue("integer_field", Integer.valueOf(13))
+ .withFieldValue("stringField", "field1")
+ .withFieldValue("integerField", Integer.valueOf(13))
.build();
Configuration outputConfig = ((FakeSchemaTransform)
provider.from(inputConfig)).config;
@@ -150,8 +150,8 @@ public class TypedSchemaTransformProviderTest {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
Row inputConfig =
Row.withSchema(provider.configurationSchema())
- .withFieldValue("string_field", "field1")
- .withFieldValue("integer_field", Integer.valueOf(13))
+ .withFieldValue("stringField", "field1")
+ .withFieldValue("integerField", Integer.valueOf(13))
.build();
assertEquals(Arrays.asList("field1", "13"),
provider.dependencies(inputConfig, null).get());
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index fb32e18d937..bfe2fab1f9a 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -25,6 +25,7 @@ import
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -131,4 +132,15 @@ public class IcebergReadSchemaTransformProvider extends
TypedSchemaTransformProv
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}
+
+ // TODO: set global snake_case naming convention and remove these special
cases
+ @Override
+ public SchemaTransform from(Row rowConfig) {
+ return super.from(rowConfig.toCamelCase());
+ }
+
+ @Override
+ public Schema configurationSchema() {
+ return super.configurationSchema().toSnakeCase();
+ }
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index b490693a9ad..71183c6b0a0 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -176,4 +176,15 @@ public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformPro
}
}
}
+
+ // TODO: set global snake_case naming convention and remove these special
cases
+ @Override
+ public SchemaTransform from(Row rowConfig) {
+ return super.from(rowConfig.toCamelCase());
+ }
+
+ @Override
+ public Schema configurationSchema() {
+ return super.configurationSchema().toSnakeCase();
+ }
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index bf9895e36b8..f6e231c758a 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -112,17 +112,17 @@ public class KafkaReadSchemaTransformProviderTest {
assertEquals(
Sets.newHashSet(
- "bootstrap_servers",
+ "bootstrapServers",
"topic",
"schema",
- "auto_offset_reset_config",
- "consumer_config_updates",
+ "autoOffsetResetConfig",
+ "consumerConfigUpdates",
"format",
- "confluent_schema_registry_subject",
- "confluent_schema_registry_url",
- "error_handling",
- "file_descriptor_path",
- "message_name"),
+ "confluentSchemaRegistrySubject",
+ "confluentSchemaRegistryUrl",
+ "errorHandling",
+ "fileDescriptorPath",
+ "messageName"),
kafkaProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toSet()));
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index cb5088a24cc..e13741e86b4 100644
---
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -198,7 +198,6 @@ public class ManagedSchemaTransformProvider
}
}
- /** */
@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
// May return an empty row (perhaps the underlying transform doesn't have
any required
@@ -209,4 +208,15 @@ public class ManagedSchemaTransformProvider
Map<String, SchemaTransformProvider> getAllProviders() {
return schemaTransformProviders;
}
+
+ // TODO: set global snake_case naming convention and remove these special
cases
+ @Override
+ public SchemaTransform from(Row rowConfig) {
+ return super.from(rowConfig.toCamelCase());
+ }
+
+ @Override
+ public Schema configurationSchema() {
+ return super.configurationSchema().toSnakeCase();
+ }
}
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
index e9edf8751e3..3a3465406c0 100644
---
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -51,7 +51,7 @@ public class ManagedSchemaTransformProviderTest {
@Test
public void testGetConfigRowFromYamlString() {
- String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+ String yamlString = "extraString: abc\n" + "extraInteger: 123";
ManagedConfig config =
ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
@@ -60,8 +60,8 @@ public class ManagedSchemaTransformProviderTest {
Row expectedRow =
Row.withSchema(TestSchemaTransformProvider.SCHEMA)
- .withFieldValue("extra_string", "abc")
- .withFieldValue("extra_integer", 123)
+ .withFieldValue("extraString", "abc")
+ .withFieldValue("extraInteger", 123)
.build();
Row returnedRow =
@@ -84,8 +84,8 @@ public class ManagedSchemaTransformProviderTest {
Schema configSchema = new
TestSchemaTransformProvider().configurationSchema();
Row expectedRow =
Row.withSchema(configSchema)
- .withFieldValue("extra_string", "abc")
- .withFieldValue("extra_integer", 123)
+ .withFieldValue("extraString", "abc")
+ .withFieldValue("extraInteger", 123)
.build();
Row configRow =
ManagedSchemaTransformProvider.getRowConfig(
@@ -96,7 +96,7 @@ public class ManagedSchemaTransformProviderTest {
@Test
public void testBuildWithYamlString() {
- String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+ String yamlString = "extraString: abc\n" + "extraInteger: 123";
ManagedConfig config =
ManagedConfig.builder()
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
index b4b41ded841..7a418976079 100644
---
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
@@ -84,7 +84,7 @@ public class ManagedSchemaTransformTranslationTest {
@Test
public void testReCreateTransformFromRowWithConfig() {
- String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+ String yamlString = "extraString: abc\n" + "extraInteger: 123";
ManagedConfig originalConfig =
ManagedConfig.builder()
@@ -123,8 +123,8 @@ public class ManagedSchemaTransformTranslationTest {
.setRowSchema(inputSchema);
Map<String, Object> underlyingConfig =
ImmutableMap.<String, Object>builder()
- .put("extra_string", "abc")
- .put("extra_integer", 123)
+ .put("extraString", "abc")
+ .put("extraInteger", 123)
.build();
String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
Managed.ManagedTransform transform =
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
index 7ed364d0e17..260085486c8 100644
---
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
@@ -90,7 +90,7 @@ public class ManagedTest {
.setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.build()
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
- .withConfig(ImmutableMap.of("extra_string", "abc",
"extra_integer", 123));
+ .withConfig(ImmutableMap.of("extraString", "abc", "extraInteger",
123));
runTestProviderTest(writeOp);
}
diff --git a/sdks/java/managed/src/test/resources/test_config.yaml
b/sdks/java/managed/src/test/resources/test_config.yaml
index 7725c32b348..3967b6095ea 100644
--- a/sdks/java/managed/src/test/resources/test_config.yaml
+++ b/sdks/java/managed/src/test/resources/test_config.yaml
@@ -17,5 +17,5 @@
# under the License.
#
-extra_string: "abc"
-extra_integer: 123
\ No newline at end of file
+extraString: "abc"
+extraInteger: 123
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index d89ce712d8f..43bd1702218 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2574,13 +2574,13 @@ class StorageWriteToBigQuery(PTransform):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table=table,
- create_disposition=self._create_disposition,
- write_disposition=self._write_disposition,
- triggering_frequency_seconds=self._triggering_frequency,
- auto_sharding=self._with_auto_sharding,
- num_streams=self._num_storage_api_streams,
- use_at_least_once_semantics=self._use_at_least_once,
- error_handling={
+ createDisposition=self._create_disposition,
+ writeDisposition=self._write_disposition,
+ triggeringFrequencySeconds=self._triggering_frequency,
+ autoSharding=self._with_auto_sharding,
+ numStreams=self._num_storage_api_streams,
+ useAtLeastOnceSemantics=self._use_at_least_once,
+ errorHandling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
}))
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py
b/sdks/python/apache_beam/io/gcp/bigtableio.py
index 0f3944a791b..f8534f38ddf 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio.py
@@ -225,9 +225,9 @@ class WriteToBigTable(beam.PTransform):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- table_id=self._table_id,
- instance_id=self._instance_id,
- project_id=self._project_id)
+ tableId=self._table_id,
+ instanceId=self._instance_id,
+ projectId=self._project_id)
return (
input
@@ -323,9 +323,9 @@ class ReadFromBigtable(PTransform):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- table_id=self._table_id,
- instance_id=self._instance_id,
- project_id=self._project_id)
+ tableId=self._table_id,
+ instanceId=self._instance_id,
+ projectId=self._project_id)
return (
input.pipeline
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py
b/sdks/python/apache_beam/transforms/external_transform_provider.py
index 67adda5aec0..2799bd1b9e9 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -39,6 +39,32 @@ def snake_case_to_upper_camel_case(string):
return output
+def snake_case_to_lower_camel_case(string):
+ """Convert snake_case to lowerCamelCase"""
+ if len(string) <= 1:
+ return string.lower()
+ upper = snake_case_to_upper_camel_case(string)
+ return upper[0].lower() + upper[1:]
+
+
+def camel_case_to_snake_case(string):
+ """Convert camelCase to snake_case"""
+ arr = []
+ word = []
+ for i, n in enumerate(string):
+ # If seeing an upper letter after a lower letter, we just witnessed a word
+ # If seeing an upper letter and the next letter is lower, we may have just
+ # witnessed an all caps word
+ if n.isupper() and ((i > 0 and string[i - 1].islower()) or
+ (i + 1 < len(string) and string[i + 1].islower())):
+ arr.append(''.join(word))
+ word = [n.lower()]
+ else:
+ word.append(n.lower())
+ arr.append(''.join(word))
+ return '_'.join(arr).strip('_')
+
+
# Information regarding a Wrapper parameter.
ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])
@@ -50,7 +76,7 @@ def get_config_with_descriptions(
descriptions = schematransform.configuration_schema._field_descriptions
fields_with_descriptions = {}
for field in schema.fields:
- fields_with_descriptions[field.name] = ParamInfo(
+ fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
typing_from_runner_api(field.type),
descriptions[field.name],
field.name)
@@ -79,11 +105,16 @@ class ExternalTransform(PTransform):
expansion_service or self.default_expansion_service
def expand(self, input):
+ camel_case_kwargs = {
+ snake_case_to_lower_camel_case(k): v
+ for k, v in self._kwargs.items()
+ }
+
external_schematransform = SchemaAwareExternalTransform(
identifier=self.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- **self._kwargs)
+ **camel_case_kwargs)
return input | external_schematransform
diff --git
a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
index 95720cee7ee..a53001c85fd 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
@@ -37,7 +37,9 @@ from apache_beam.transforms.external import
BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import
STANDARD_URN_PATTERN
from apache_beam.transforms.external_transform_provider import
ExternalTransform
from apache_beam.transforms.external_transform_provider import
ExternalTransformProvider
+from apache_beam.transforms.external_transform_provider import
camel_case_to_snake_case
from apache_beam.transforms.external_transform_provider import
infer_name_from_identifier
+from apache_beam.transforms.external_transform_provider import
snake_case_to_lower_camel_case
from apache_beam.transforms.external_transform_provider import
snake_case_to_upper_camel_case
from apache_beam.transforms.xlang.io import GenerateSequence
@@ -52,6 +54,26 @@ class NameAndTypeUtilsTest(unittest.TestCase):
for case in test_cases:
self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0]))
+ def test_snake_case_to_lower_camel_case(self):
+ test_cases = [("", ""), ("test", "test"), ("test_name", "testName"),
+ ("test_double_underscore", "testDoubleUnderscore"),
+ ("TEST_CAPITALIZED", "testCapitalized"),
+ ("_prepended_underscore", "prependedUnderscore"),
+ ("appended_underscore_", "appendedUnderscore")]
+ for case in test_cases:
+ self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0]))
+
+ def test_camel_case_to_snake_case(self):
+ test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"),
+ ("TestDoubleUnderscore",
+ "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"),
+ ("BEGINNINGAllCaps",
+ "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"),
+ ("AllCapsMIDDLEWord", "all_caps_middle_word"),
+ ("lowerCamelCase", "lower_camel_case")]
+ for case in test_cases:
+ self.assertEqual(case[1], camel_case_to_snake_case(case[0]))
+
def test_infer_name_from_identifier(self):
standard_test_cases = [
("beam:schematransform:org.apache.beam:transform:v1", "Transform"),
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py
b/sdks/python/apache_beam/yaml/yaml_provider.py
index 1e9c7c60546..5f53302028c 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -889,7 +889,7 @@ def create_java_builtin_provider():
return java_provider.create_transform(
'WindowIntoStrategy',
{
- 'serialized_windowing_strategy': windowing_strategy.to_runner_api(
+ 'serializedWindowingStrategy': windowing_strategy.to_runner_api(
empty_context).SerializeToString()
},
None)
diff --git a/sdks/python/gen_xlang_wrappers.py
b/sdks/python/gen_xlang_wrappers.py
index ea4f496c2d0..a75fc05cba7 100644
--- a/sdks/python/gen_xlang_wrappers.py
+++ b/sdks/python/gen_xlang_wrappers.py
@@ -233,6 +233,24 @@ def pretty_type(tp):
return (tp, nullable)
+def camel_case_to_snake_case(string):
+ """Convert camelCase to snake_case"""
+ arr = []
+ word = []
+ for i, n in enumerate(string):
+ # If seeing an upper letter after a lower letter, we just witnessed a word
+ # If seeing an upper letter and the next letter is lower, we may have just
+ # witnessed an all caps word
+ if n.isupper() and ((i > 0 and string[i - 1].islower()) or
+ (i + 1 < len(string) and string[i + 1].islower())):
+ arr.append(''.join(word))
+ word = [n.lower()]
+ else:
+ word.append(n.lower())
+ arr.append(''.join(word))
+ return '_'.join(arr).strip('_')
+
+
def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
"""
Generates code for external transform wrapper classes (subclasses of
@@ -269,8 +287,9 @@ def get_wrappers_from_transform_configs(config_file) ->
Dict[str, List[str]]:
parameters = []
for param, info in fields.items():
+ pythonic_name = camel_case_to_snake_case(param)
param_details = {
- "name": param,
+ "name": pythonic_name,
"type": info['type'],
"description": info['description'],
}