This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4718cdff87f Sort SchemaTransform configuration schema fields by name 
to establish consistency (#24344)
4718cdff87f is described below

commit 4718cdff87fed4f92636e94dbf3a04c2315d6a95
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Dec 1 13:00:59 2022 -0500

    Sort SchemaTransform configuration schema fields by name to establish 
consistency (#24344)
    
    * add method to sort schema fields by name
    
    * spotless
    
    * make test more readable
    
    * add test that breaks when new attributes are added
    
    * ignore synthetic fields
---
 .../java/org/apache/beam/sdk/schemas/Schema.java   | 15 ++++
 .../transforms/TypedSchemaTransformProvider.java   |  3 +-
 .../org/apache/beam/sdk/schemas/SchemaTest.java    | 82 ++++++++++++++++++++++
 3 files changed, 99 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 2a6ca630e7e..9a0805f52d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -264,6 +264,21 @@ public class Schema implements Serializable {
     return Schema.builder().addFields(fields).build();
   }
 
+  /** Returns an identical Schema with sorted fields. */
+  public Schema sorted() {
+    // Create a new schema and copy over the appropriate Schema object 
attributes:
+    // {fields, uuid, encodingPositions, options}
+    Schema sortedSchema =
+        this.fields.stream()
+            .sorted(Comparator.comparing(Field::getName))
+            .collect(Schema.toSchema())
+            .withOptions(getOptions());
+    sortedSchema.setUUID(getUUID());
+    sortedSchema.setEncodingPositions(getEncodingPositions());
+
+    return sortedSchema;
+  }
+
   /** Returns a copy of the Schema with the options set. */
   public Schema withOptions(Options options) {
     return new Schema(fields, 
getOptions().toBuilder().addOptions(options).build());
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index 944f33a596d..c3ae723d1d6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -61,7 +61,8 @@ public abstract class TypedSchemaTransformProvider<ConfigT> 
implements SchemaTra
   @Override
   public final Schema configurationSchema() {
     try {
-      return SchemaRegistry.createDefault().getSchema(configurationClass());
+      // Sort the fields by name to ensure a consistent schema is produced
+      return 
SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
     } catch (NoSuchSchemaException e) {
       throw new RuntimeException(
           "Unable to find schema for "
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
index 5477885c62b..47746b59925 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
@@ -23,9 +23,15 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.Options;
 import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
 import org.junit.Rule;
 import org.junit.Test;
@@ -188,6 +194,82 @@ public class SchemaTest {
     assertEquals(FieldType.STRING, schema.getField(1).getType());
   }
 
+  @Test
+  public void testSorted() {
+    Options testOptions =
+        Options.builder()
+            .setOption("test_str_option", FieldType.STRING, "test_str")
+            .setOption("test_bool_option", FieldType.BOOLEAN, true)
+            .build();
+
+    Schema unorderedSchema =
+        Schema.builder()
+            .addStringField("d")
+            .addInt32Field("c")
+            .addStringField("b")
+            .addByteField("a")
+            .build()
+            .withOptions(testOptions);
+
+    Schema unorderedSchemaAfterSorting = unorderedSchema.sorted();
+
+    Schema sortedSchema =
+        Schema.builder()
+            .addByteField("a")
+            .addStringField("b")
+            .addInt32Field("c")
+            .addStringField("d")
+            .build()
+            .withOptions(testOptions);
+    sortedSchema.setEncodingPositions(unorderedSchema.getEncodingPositions());
+
+    assertEquals(true, 
unorderedSchema.equivalent(unorderedSchemaAfterSorting));
+    assertEquals(
+        true,
+        Objects.equals(unorderedSchemaAfterSorting.getFields(), 
sortedSchema.getFields())
+            && Objects.equals(unorderedSchemaAfterSorting.getOptions(), 
sortedSchema.getOptions())
+            && Objects.equals(
+                unorderedSchemaAfterSorting.getEncodingPositions(),
+                sortedSchema.getEncodingPositions()));
+  }
+
+  @Test
+  public void testSortedMethodIncludesAllSchemaFields() {
+    // This test is most likely to break when new Schema object attributes are 
added. It is designed
+    // this way to make sure that the Schema::sorted() method is updated to 
return a full sorted
+    // copy.
+
+    // Schema object attributes that are accounted for in Schema::sorted().
+    // Note: Only the appropriate ones are copied over.
+    List<String> attributesAccountedForInSorted =
+        Arrays.asList(
+            "fieldIndices",
+            "encodingPositions",
+            "encodingPositionsOverridden",
+            "fields",
+            "hashCode",
+            "uuid",
+            "options");
+
+    // Current attributes in Schema object.
+    List<String> currentAttributes =
+        Arrays.stream(Schema.class.getDeclaredFields())
+            .filter(field -> !field.isSynthetic())
+            .map(java.lang.reflect.Field::getName)
+            .collect(Collectors.toList());
+
+    List<String> differences = new ArrayList<>(currentAttributes);
+    differences.removeAll(attributesAccountedForInSorted);
+
+    assertEquals(
+        String.format(
+            "Detected attributes %s in Schema object that are not accounted 
for in Schema::sorted(). "
+                + "If appropriate, sorted() should copy over these attributes 
as well. Either way, update this test after checking.",
+            differences.toString()),
+        currentAttributes,
+        attributesAccountedForInSorted);
+  }
+
   @Test
   public void testEquivalent() {
     final Schema expectedNested1 =

Reply via email to