This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4718cdff87f Sort SchemaTransform configuration schema fields by name
to establish consistency (#24344)
4718cdff87f is described below
commit 4718cdff87fed4f92636e94dbf3a04c2315d6a95
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Dec 1 13:00:59 2022 -0500
Sort SchemaTransform configuration schema fields by name to establish
consistency (#24344)
* add method to sort schema fields by name
* spotless
* make test more readable
* add test that breaks when new attributes are added
* ignore synthetic fields
---
.../java/org/apache/beam/sdk/schemas/Schema.java | 15 ++++
.../transforms/TypedSchemaTransformProvider.java | 3 +-
.../org/apache/beam/sdk/schemas/SchemaTest.java | 82 ++++++++++++++++++++++
3 files changed, 99 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 2a6ca630e7e..9a0805f52d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -264,6 +264,21 @@ public class Schema implements Serializable {
return Schema.builder().addFields(fields).build();
}
+ /** Returns an identical Schema with sorted fields. */
+ public Schema sorted() {
+ // Create a new schema and copy over the appropriate Schema object
attributes:
+ // {fields, uuid, encodingPositions, options}
+ Schema sortedSchema =
+ this.fields.stream()
+ .sorted(Comparator.comparing(Field::getName))
+ .collect(Schema.toSchema())
+ .withOptions(getOptions());
+ sortedSchema.setUUID(getUUID());
+ sortedSchema.setEncodingPositions(getEncodingPositions());
+
+ return sortedSchema;
+ }
+
/** Returns a copy of the Schema with the options set. */
public Schema withOptions(Options options) {
return new Schema(fields,
getOptions().toBuilder().addOptions(options).build());
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index 944f33a596d..c3ae723d1d6 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -61,7 +61,8 @@ public abstract class TypedSchemaTransformProvider<ConfigT>
implements SchemaTra
@Override
public final Schema configurationSchema() {
try {
- return SchemaRegistry.createDefault().getSchema(configurationClass());
+ // Sort the fields by name to ensure a consistent schema is produced
+ return
SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
index 5477885c62b..47746b59925 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
@@ -23,9 +23,15 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.Options;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.junit.Rule;
import org.junit.Test;
@@ -188,6 +194,82 @@ public class SchemaTest {
assertEquals(FieldType.STRING, schema.getField(1).getType());
}
+ @Test
+ public void testSorted() {
+ Options testOptions =
+ Options.builder()
+ .setOption("test_str_option", FieldType.STRING, "test_str")
+ .setOption("test_bool_option", FieldType.BOOLEAN, true)
+ .build();
+
+ Schema unorderedSchema =
+ Schema.builder()
+ .addStringField("d")
+ .addInt32Field("c")
+ .addStringField("b")
+ .addByteField("a")
+ .build()
+ .withOptions(testOptions);
+
+ Schema unorderedSchemaAfterSorting = unorderedSchema.sorted();
+
+ Schema sortedSchema =
+ Schema.builder()
+ .addByteField("a")
+ .addStringField("b")
+ .addInt32Field("c")
+ .addStringField("d")
+ .build()
+ .withOptions(testOptions);
+ sortedSchema.setEncodingPositions(unorderedSchema.getEncodingPositions());
+
+ assertEquals(true,
unorderedSchema.equivalent(unorderedSchemaAfterSorting));
+ assertEquals(
+ true,
+ Objects.equals(unorderedSchemaAfterSorting.getFields(),
sortedSchema.getFields())
+ && Objects.equals(unorderedSchemaAfterSorting.getOptions(),
sortedSchema.getOptions())
+ && Objects.equals(
+ unorderedSchemaAfterSorting.getEncodingPositions(),
+ sortedSchema.getEncodingPositions()));
+ }
+
+ @Test
+ public void testSortedMethodIncludesAllSchemaFields() {
+ // This test is most likely to break when new Schema object attributes are
added. It is designed
+ // this way to make sure that the Schema::sorted() method is updated to
return a full sorted
+ // copy.
+
+ // Schema object attributes that are accounted for in Schema::sorted().
+ // Note: Only the appropriate ones are copied over.
+ List<String> attributesAccountedForInSorted =
+ Arrays.asList(
+ "fieldIndices",
+ "encodingPositions",
+ "encodingPositionsOverridden",
+ "fields",
+ "hashCode",
+ "uuid",
+ "options");
+
+ // Current attributes in Schema object.
+ List<String> currentAttributes =
+ Arrays.stream(Schema.class.getDeclaredFields())
+ .filter(field -> !field.isSynthetic())
+ .map(java.lang.reflect.Field::getName)
+ .collect(Collectors.toList());
+
+ List<String> differences = new ArrayList<>(currentAttributes);
+ differences.removeAll(attributesAccountedForInSorted);
+
+ assertEquals(
+ String.format(
+ "Detected attributes %s in Schema object that are not accounted
for in Schema::sorted(). "
+ + "If appropriate, sorted() should copy over these attributes
as well. Either way, update this test after checking.",
+ differences.toString()),
+ currentAttributes,
+ attributesAccountedForInSorted);
+ }
+
@Test
public void testEquivalent() {
final Schema expectedNested1 =