[ 
https://issues.apache.org/jira/browse/BEAM-4076?focusedWorklogId=123311&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-123311
 ]

ASF GitHub Bot logged work on BEAM-4076:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jul/18 12:25
            Start Date: 14/Jul/18 12:25
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #5941: [BEAM-4076] Schema 
utilities for converting between types
URL: https://github.com/apache/beam/pull/5941
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index c6d2bbd5041..4d2d7fc0b25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -26,6 +26,7 @@
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -172,6 +173,7 @@ public static Schema of(Field... fields) {
     return Schema.builder().addFields(fields).build();
   }
 
+  /** Returns true if two Schemas have the same fields in the same order. */
   @Override
   public boolean equals(Object o) {
     if (!(o instanceof Schema)) {
@@ -182,6 +184,34 @@ public boolean equals(Object o) {
         && Objects.equals(getFields(), other.getFields());
   }
 
+  /** Returns true if two Schemas have the same fields, but possibly in 
different orders. */
+  public boolean equivalent(Schema other) {
+    if (other.getFieldCount() != getFieldCount()) {
+      return false;
+    }
+
+    List<Field> otherFields =
+        other
+            .getFields()
+            .stream()
+            .sorted(Comparator.comparing(Field::getName))
+            .collect(Collectors.toList());
+    List<Field> actualFields =
+        getFields()
+            .stream()
+            .sorted(Comparator.comparing(Field::getName))
+            .collect(Collectors.toList());
+
+    for (int i = 0; i < otherFields.size(); ++i) {
+      Field otherField = otherFields.get(i);
+      Field actualField = actualFields.get(i);
+      if (!otherField.equivalent(actualField)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
@@ -399,6 +429,33 @@ public boolean equals(Object o) {
           && Arrays.equals(getMetadata(), other.getMetadata());
     }
 
+    private boolean equivalent(FieldType other) {
+      if (!other.getTypeName().equals(getTypeName())) {
+        return false;
+      }
+      switch (getTypeName()) {
+        case ROW:
+          if (!other.getRowSchema().equivalent(getRowSchema())) {
+            return false;
+          }
+          break;
+        case ARRAY:
+          if 
(!other.getCollectionElementType().equivalent(getCollectionElementType())) {
+            return false;
+          }
+          break;
+        case MAP:
+          if (!other.getMapKeyType().equivalent(getMapKeyType())
+              || !other.getMapValueType().equivalent(getMapValueType())) {
+            return false;
+          }
+          break;
+        default:
+          return other.equals(this);
+      }
+      return true;
+    }
+
     @Override
     public int hashCode() {
       return Arrays.deepHashCode(
@@ -495,6 +552,12 @@ public boolean equals(Object o) {
           && Objects.equals(getNullable(), other.getNullable());
     }
 
+    private boolean equivalent(Field otherField) {
+      return otherField.getName().equals(getName())
+          && otherField.getNullable().equals(getNullable())
+          && getType().equivalent(otherField.getType());
+    }
+
     @Override
     public int hashCode() {
       return Objects.hash(getName(), getDescription(), getType(), 
getNullable());
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaProvider.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaProvider.java
index 167e39ad213..faf269fd7b4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaProvider.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.schemas;
 
+import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -31,7 +32,7 @@
  * contacts an external schema-registry service to determine the schema for a 
type.
  */
 @Experimental(Kind.SCHEMAS)
-public interface SchemaProvider {
+public interface SchemaProvider extends Serializable {
 
   /** Lookup a schema for the given type. If no schema exists, returns null. */
   @Nullable
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
new file mode 100644
index 00000000000..d44467c8bad
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A set of utilities for converting between different objects supporting 
schemas. */
+@Experimental(Kind.SCHEMAS)
+public class Convert {
+  /**
+   * Convert a {@link PCollection<InputT>} into a {@link PCollection<Row>}.
+   *
+   * <p>The input {@link PCollection} must have a schema attached. The output 
collection will have
+   * the same schema as the iput.
+   */
+  public static <InputT> PTransform<PCollection<InputT>, PCollection<Row>> 
toRows() {
+    return to(Row.class);
+  }
+
+  /**
+   * Convert a {@link PCollection<Row>} into a {@link PCollection<OutputT>}.
+   *
+   * <p>The output schema will be inferred using the schema registry. A schema 
must be registered
+   * for this type, or the conversion will fail.
+   */
+  public static <OutputT> PTransform<PCollection<Row>, PCollection<OutputT>> 
fromRows(
+      Class<OutputT> clazz) {
+    return to(clazz);
+  }
+
+  /**
+   * Convert a {@link PCollection<Row>} into a {@link PCollection<OutputT>}.
+   *
+   * <p>The output schema will be inferred using the schema registry. A schema 
must be registered
+   * for this type, or the conversion will fail.
+   */
+  public static <OutputT> PTransform<PCollection<Row>, PCollection<OutputT>> 
fromRows(
+      TypeDescriptor<OutputT> typeDescriptor) {
+    return to(typeDescriptor);
+  }
+
+  /**
+   * Convert a {@link PCollection<InputT>} to a {@link PCollection<OutputT>}.
+   *
+   * <p>This function allows converting between two types as long as the two 
types have
+   * <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i> 
if they recursively
+   * have fields with the same names, but possibly different orders.
+   */
+  public static <InputT, OutputT> PTransform<PCollection<InputT>, 
PCollection<OutputT>> to(
+      Class<OutputT> clazz) {
+    return to(TypeDescriptor.of(clazz));
+  }
+
+  /**
+   * Convert a {@link PCollection<InputT>} to a {@link PCollection<OutputT>}.
+   *
+   * <p>This function allows converting between two types as long as the two 
types have
+   * <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i> 
if they recursively
+   * have fields with the same names, but possibly different orders.
+   */
+  public static <InputT, OutputT> PTransform<PCollection<InputT>, 
PCollection<OutputT>> to(
+      TypeDescriptor<OutputT> typeDescriptor) {
+    return new ConvertTransform<>(typeDescriptor);
+  }
+
+  private static class ConvertTransform<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+    TypeDescriptor<OutputT> outputTypeDescriptor;
+
+    ConvertTransform(Class<OutputT> outputClass) {
+      this(TypeDescriptor.of(outputClass));
+    }
+
+    ConvertTransform(TypeDescriptor<OutputT> outputTypeDescriptor) {
+      this.outputTypeDescriptor = outputTypeDescriptor;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<OutputT> expand(PCollection<InputT> input) {
+      if (!input.hasSchema()) {
+        throw new RuntimeException("Convert requires a schema on the input.");
+      }
+
+      final SchemaCoder<OutputT> outputSchemaCoder;
+      boolean toRow = 
outputTypeDescriptor.equals(TypeDescriptor.of(Row.class));
+      if (toRow) {
+        // If the output is of type Row, then just forward the schema of the 
input type to the
+        // output.
+        outputSchemaCoder =
+            (SchemaCoder<OutputT>)
+                SchemaCoder.of(
+                    input.getSchema(),
+                    SerializableFunctions.identity(),
+                    SerializableFunctions.identity());
+      } else {
+        // Otherwise, try to find a schema for the output type in the schema 
registry.
+        SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+        try {
+          outputSchemaCoder =
+              SchemaCoder.of(
+                  registry.getSchema(outputTypeDescriptor),
+                  registry.getToRowFunction(outputTypeDescriptor),
+                  registry.getFromRowFunction(outputTypeDescriptor));
+          // assert matches input schema.
+          if (!outputSchemaCoder.getSchema().equivalent(input.getSchema())) {
+            throw new RuntimeException(
+                "Cannot convert between types that don't have equivalent 
schemas."
+                    + " input schema: "
+                    + input.getSchema()
+                    + " output schema: "
+                    + outputSchemaCoder.getSchema());
+          }
+        } catch (NoSuchSchemaException e) {
+          throw new RuntimeException("No schema registered for " + 
outputTypeDescriptor);
+        }
+      }
+
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<InputT, OutputT>() {
+                    @ProcessElement
+                    public void processElement(@Element Row row, 
OutputReceiver<OutputT> o) {
+                      
o.output(outputSchemaCoder.getFromRowFunction().apply(row));
+                    }
+                  }))
+          .setSchema(
+              outputSchemaCoder.getSchema(),
+              outputSchemaCoder.getToRowFunction(),
+              outputSchemaCoder.getFromRowFunction());
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/package-info.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/package-info.java
new file mode 100644
index 00000000000..da05bcfa581
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines transforms that work on PCollections with schemas..
+ *
+ * <p>For further details, see the documentation for each class in this 
package.
+ */
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.schemas.transforms;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 32196e7d4c6..8457e0b56df 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -301,6 +301,21 @@ public String getName() {
     return setCoder(SchemaCoder.of(schema, toRowFunction, fromRowFunction));
   }
 
+  /** Returns whether this {@link PCollection} has an attached schema. */
+  @Experimental(Kind.SCHEMAS)
+  public boolean hasSchema() {
+    return getCoder() instanceof SchemaCoder;
+  }
+
+  /** Returns the attached schema, or null if there is none. */
+  @Experimental(Kind.SCHEMAS)
+  public Schema getSchema() {
+    if (!hasSchema()) {
+      throw new IllegalStateException("Cannot call getSchema when there is no 
schema");
+    }
+    return ((SchemaCoder) getCoder()).getSchema();
+  }
+
   /**
    * of the {@link PTransform}.
    *
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetter.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetter.java
index e47978b4e59..37a40ca064f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetter.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetter.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.values.reflect;
 
+import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Internal;
 
 /**
@@ -28,7 +29,7 @@
  * <p>Implementations of this interface are generated at runtime to map object 
fields to Row fields.
  */
 @Internal
-public interface FieldValueGetter<ObjectT, ValueT> {
+public interface FieldValueGetter<ObjectT, ValueT> extends Serializable {
   ValueT get(ObjectT object);
 
   String name();
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetterFactory.java
index 1c8dbfd46e0..a131b1d075d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetterFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetterFactory.java
@@ -18,11 +18,12 @@
 
 package org.apache.beam.sdk.values.reflect;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.schemas.Schema;
 
 /** A factory interface for creating {@link FieldValueGetter} objects 
corresponding to a class. */
-public interface FieldValueGetterFactory {
+public interface FieldValueGetterFactory extends Serializable {
   /**
    * Returns a list of {@link FieldValueGetter}s for the target class.
    *
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetter.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetter.java
index f55ae8ee576..4d0113210a5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetter.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetter.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.values.reflect;
 
+import java.io.Serializable;
 import java.lang.reflect.Type;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
@@ -31,7 +32,7 @@
  * fields.
  */
 @Internal
-public interface FieldValueSetter<ObjectT, ValueT> {
+public interface FieldValueSetter<ObjectT, ValueT> extends Serializable {
   /** Sets the specified field on object to value. */
   void set(ObjectT object, ValueT value);
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetterFactory.java
index d48138354d2..1e661195ba9 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetterFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetterFactory.java
@@ -18,11 +18,12 @@
 
 package org.apache.beam.sdk.values.reflect;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.schemas.Schema;
 
 /** A factory interface for creating {@link FieldValueSetter} objects 
corresponding to a class. */
-public interface FieldValueSetterFactory {
+public interface FieldValueSetterFactory extends Serializable {
   /**
    * Returns a list of {@link FieldValueGetter}s for the target class.
    *
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/ReflectionGetter.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/ReflectionGetter.java
index a30ca8631e6..cd5b7da8a90 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/ReflectionGetter.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/ReflectionGetter.java
@@ -30,7 +30,7 @@
 class ReflectionGetter implements FieldValueGetter {
   private String name;
   private Class type;
-  private Method getter;
+  private transient Method getter;
 
   ReflectionGetter(Method getter) {
     this.getter = getter;
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
index 2cea81f601f..0ce722a00b6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
@@ -20,6 +20,9 @@
 
 import static org.apache.beam.sdk.schemas.Schema.toSchema;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.stream.Stream;
 import org.apache.beam.sdk.schemas.Schema.Field;
@@ -156,4 +159,110 @@ public void testCollector() {
     assertEquals("f_string", schema.getField(1).getName());
     assertEquals(FieldType.STRING, schema.getField(1).getType());
   }
+
+  @Test
+  public void testEquivalent() {
+    final Schema expectedNested1 =
+        
Schema.builder().addStringField("yard1").addInt64Field("yard2").build();
+
+    final Schema expectedSchema1 =
+        Schema.builder()
+            .addStringField("field1")
+            .addInt64Field("field2")
+            .addRowField("field3", expectedNested1)
+            .addArrayField("field4", FieldType.row(expectedNested1))
+            .addMapField("field5", FieldType.STRING, 
FieldType.row(expectedNested1))
+            .build();
+
+    final Schema expectedNested2 =
+        
Schema.builder().addInt64Field("yard2").addStringField("yard1").build();
+
+    final Schema expectedSchema2 =
+        Schema.builder()
+            .addMapField("field5", FieldType.STRING, 
FieldType.row(expectedNested2))
+            .addArrayField("field4", FieldType.row(expectedNested2))
+            .addRowField("field3", expectedNested2)
+            .addInt64Field("field2")
+            .addStringField("field1")
+            .build();
+
+    assertNotEquals(expectedSchema1, expectedSchema2);
+    assertTrue(expectedSchema1.equivalent(expectedSchema2));
+  }
+
+  @Test
+  public void testPrimitiveNotEquivalent() {
+    Schema schema1 = Schema.builder().addInt64Field("foo").build();
+    Schema schema2 = Schema.builder().addStringField("foo").build();
+    assertNotEquals(schema1, schema2);
+    assertFalse(schema1.equivalent(schema2));
+
+    schema1 = Schema.builder().addInt64Field("foo").build();
+    schema2 = Schema.builder().addInt64Field("bar").build();
+    assertNotEquals(schema1, schema2);
+    assertFalse(schema1.equivalent(schema2));
+
+    schema1 = Schema.builder().addInt64Field("foo").build();
+    schema2 = Schema.builder().addNullableField("foo", 
FieldType.INT64).build();
+    assertNotEquals(schema1, schema2);
+    assertFalse(schema1.equivalent(schema2));
+  }
+
+  @Test
+  public void testNestedNotEquivalent() {
+    Schema nestedSchema1 = Schema.builder().addInt64Field("foo").build();
+    Schema nestedSchema2 = Schema.builder().addStringField("foo").build();
+
+    Schema schema1 = Schema.builder().addRowField("foo", 
nestedSchema1).build();
+    Schema schema2 = Schema.builder().addRowField("foo", 
nestedSchema2).build();
+    assertNotEquals(schema1, schema2);
+    assertFalse(schema1.equivalent(schema2));
+  }
+
+  @Test
+  public void testArrayNotEquivalent() {
+    Schema schema1 = Schema.builder().addArrayField("foo", 
FieldType.BOOLEAN).build();
+    Schema schema2 = Schema.builder().addArrayField("foo", 
FieldType.DATETIME).build();
+    assertNotEquals(schema1, schema2);
+    assertFalse(schema1.equivalent(schema2));
+  }
+
+  @Test
+  public void testNestedArraysNotEquivalent() {
+    Schema nestedSchema1 = Schema.builder().addInt64Field("foo").build();
+    Schema nestedSchema2 = Schema.builder().addStringField("foo").build();
+
+    Schema schema1 = Schema.builder().addArrayField("foo", 
FieldType.row(nestedSchema1)).build();
+    Schema schema2 = Schema.builder().addArrayField("foo", 
FieldType.row(nestedSchema2)).build();
+    assertNotEquals(schema1, schema2);
+    assertFalse(schema1.equivalent(schema2));
+  }
+
+  @Test
+  public void testMapNotEquivalent() {
+    Schema schema1 =
+        Schema.builder().addMapField("foo", FieldType.STRING, 
FieldType.BOOLEAN).build();
+    Schema schema2 =
+        Schema.builder().addMapField("foo", FieldType.DATETIME, 
FieldType.BOOLEAN).build();
+    assertNotEquals(schema1, schema2);
+    assertFalse(schema1.equivalent(schema2));
+
+    schema1 = Schema.builder().addMapField("foo", FieldType.STRING, 
FieldType.BOOLEAN).build();
+    schema2 = Schema.builder().addMapField("foo", FieldType.STRING, 
FieldType.STRING).build();
+    assertNotEquals(schema1, schema2);
+    assertFalse(schema1.equivalent(schema2));
+  }
+
+  @Test
+  public void testNestedMapsNotEquivalent() {
+    Schema nestedSchema1 = Schema.builder().addInt64Field("foo").build();
+    Schema nestedSchema2 = Schema.builder().addStringField("foo").build();
+
+    Schema schema1 =
+        Schema.builder().addMapField("foo", FieldType.STRING, 
FieldType.row(nestedSchema1)).build();
+    Schema schema2 =
+        Schema.builder().addMapField("foo", FieldType.STRING, 
FieldType.row(nestedSchema2)).build();
+    assertNotEquals(schema1, schema2);
+    assertFalse(schema1.equivalent(schema2));
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
new file mode 100644
index 00000000000..267295cb43d
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.schemas.DefaultSchema;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Tests for the {@link Convert} class. */
+public class ConvertTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  /** Test outer POJO. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO1 {
+    public String field1 = "field1";
+    public long field2 = 42;
+    public POJO1Nested field3 = new POJO1Nested();
+    public POJO1Nested[] field4 = new POJO1Nested[] {new POJO1Nested(), new 
POJO1Nested()};
+    public Map<String, POJO1Nested> field5 =
+        ImmutableMap.of(
+            "first", new POJO1Nested(),
+            "second", new POJO1Nested());
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      POJO1 pojo1 = (POJO1) o;
+      return field2 == pojo1.field2
+          && Objects.equals(field1, pojo1.field1)
+          && Objects.equals(field3, pojo1.field3)
+          && Arrays.equals(field4, pojo1.field4)
+          && Objects.equals(field5, pojo1.field5);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(field1, field2, field3, field5);
+      result = 31 * result + Arrays.hashCode(field4);
+      return result;
+    }
+  }
+
+  /** Test inner POJO. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO1Nested {
+    public String yard1 = "yard2";
+    public long yard2 = 43;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      POJO1Nested that = (POJO1Nested) o;
+      return yard2 == that.yard2 && Objects.equals(yard1, that.yard1);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(yard1, yard2);
+    }
+  }
+
+  private static final Schema EXPECTED_SCHEMA1_NESTED =
+      Schema.builder().addStringField("yard1").addInt64Field("yard2").build();
+
+  private static final Schema EXPECTED_SCHEMA1 =
+      Schema.builder()
+          .addStringField("field1")
+          .addInt64Field("field2")
+          .addRowField("field3", EXPECTED_SCHEMA1_NESTED)
+          .addArrayField("field4", FieldType.row(EXPECTED_SCHEMA1_NESTED))
+          .addMapField("field5", FieldType.STRING, 
FieldType.row(EXPECTED_SCHEMA1_NESTED))
+          .build();
+
+  private static final Row EXPECTED_ROW1_NESTED =
+      Row.withSchema(EXPECTED_SCHEMA1_NESTED).addValues("yard2", 43L).build();
+  private static final Row EXPECTED_ROW1 =
+      Row.withSchema(EXPECTED_SCHEMA1)
+          .addValue("field1")
+          .addValue(42L)
+          .addValue(EXPECTED_ROW1_NESTED)
+          .addArray(ImmutableList.of(EXPECTED_ROW1_NESTED, 
EXPECTED_ROW1_NESTED))
+          .addValue(ImmutableMap.of("first", EXPECTED_ROW1_NESTED, "second", 
EXPECTED_ROW1_NESTED))
+          .build();
+
+  /** Test outer POJO. Different but equivalent schema. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO2 {
+    public Map<String, POJO2Nested> field5 =
+        ImmutableMap.of(
+            "first", new POJO2Nested(),
+            "second", new POJO2Nested());
+    public POJO2Nested[] field4 = new POJO2Nested[] {new POJO2Nested(), new 
POJO2Nested()};
+    public POJO2Nested field3 = new POJO2Nested();
+    public long field2 = 42;
+    public String field1 = "field1";
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      POJO2 pojo2 = (POJO2) o;
+      return field2 == pojo2.field2
+          && Objects.equals(field5, pojo2.field5)
+          && Arrays.equals(field4, pojo2.field4)
+          && Objects.equals(field3, pojo2.field3)
+          && Objects.equals(field1, pojo2.field1);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(field5, field3, field2, field1);
+      result = 31 * result + Arrays.hashCode(field4);
+      return result;
+    }
+  }
+
+  /** Test inner POJO. Different but equivalent schema. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO2Nested {
+    public long yard2 = 43;
+    public String yard1 = "yard2";
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      POJO2Nested that = (POJO2Nested) o;
+      return yard2 == that.yard2 && Objects.equals(yard1, that.yard1);
+    }
+
+    @Override
+    public int hashCode() {
+
+      return Objects.hash(yard2, yard1);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testToRows() {
+    PCollection<Row> rows = pipeline.apply(Create.of(new 
POJO1())).apply(Convert.toRows());
+    PAssert.that(rows).containsInAnyOrder(EXPECTED_ROW1);
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFromRows() {
+    PCollection<POJO1> pojos =
+        pipeline
+            .apply(
+                Create.of(EXPECTED_ROW1)
+                    .withSchema(
+                        EXPECTED_SCHEMA1,
+                        SerializableFunctions.identity(),
+                        SerializableFunctions.identity()))
+            .apply(Convert.fromRows(POJO1.class));
+    PAssert.that(pojos).containsInAnyOrder(new POJO1());
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGeneralConvert() {
+    PCollection<POJO2> pojos =
+        pipeline.apply(Create.of(new POJO1())).apply(Convert.to(POJO2.class));
+    PAssert.that(pojos).containsInAnyOrder(new POJO2());
+    pipeline.run();
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java
index e0a20dc5095..f1d24b3de49 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java
@@ -17,62 +17,15 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
 
 /** Utilities for testing schemas. */
 public class SchemaTestUtils {
   // Assert that two schemas are equivalent, ignoring field order. This tests 
that both schemas
   // (recursively) contain the same fields with the same names, but possibly 
different orders.
   public static void assertSchemaEquivalent(Schema expected, Schema actual) {
-    List<Field> expectedFields =
-        expected
-            .getFields()
-            .stream()
-            .sorted(Comparator.comparing(Field::getName))
-            .collect(Collectors.toList());
-    List<Field> actualFields =
-        actual
-            .getFields()
-            .stream()
-            .sorted(Comparator.comparing(Field::getName))
-            .collect(Collectors.toList());
-    assertEquals(expectedFields.size(), actualFields.size());
-
-    for (int i = 0; i < expectedFields.size(); ++i) {
-      Field expectedField = expectedFields.get(i);
-      Field actualField = actualFields.get(i);
-      assertFieldEquivalent(expectedField, actualField);
-    }
-  }
-
-  public static void assertFieldEquivalent(Field expectedField, Field 
actualField) {
-    assertEquals(expectedField.getName(), actualField.getName());
-    assertEquals(expectedField.getNullable(), actualField.getNullable());
-    assertFieldTypeEquivalent(expectedField.getType(), actualField.getType());
-  }
-
-  public static void assertFieldTypeEquivalent(
-      FieldType expectedFieldType, FieldType actualFieldType) {
-    assertEquals(expectedFieldType.getTypeName(), 
actualFieldType.getTypeName());
-    if (TypeName.ROW.equals(expectedFieldType.getTypeName())) {
-      assertSchemaEquivalent(expectedFieldType.getRowSchema(), 
actualFieldType.getRowSchema());
-    } else if (TypeName.ARRAY.equals(expectedFieldType.getTypeName())) {
-      assertFieldTypeEquivalent(
-          expectedFieldType.getCollectionElementType(), 
actualFieldType.getCollectionElementType());
-    } else if (TypeName.MAP.equals(expectedFieldType.getTypeName())) {
-      assertFieldTypeEquivalent(expectedFieldType.getMapKeyType(), 
actualFieldType.getMapKeyType());
-      assertFieldTypeEquivalent(
-          expectedFieldType.getMapValueType(), 
actualFieldType.getMapValueType());
-    } else {
-      assertEquals(expectedFieldType, actualFieldType);
-    }
+    assertTrue(actual.equivalent(expected));
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 123311)
    Time Spent: 11.5h  (was: 11h 20m)

> Schema followups
> ----------------
>
>                 Key: BEAM-4076
>                 URL: https://issues.apache.org/jira/browse/BEAM-4076
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model, dsl-sql, sdk-java-core
>            Reporter: Kenneth Knowles
>            Priority: Major
>          Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> This umbrella bug contains subtasks with followups for Beam schemas, which 
> were moved from SQL to the core Java SDK and made to be type-name-based 
> rather than coder based.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to