Repository: beam
Updated Branches:
  refs/heads/master 7ab8954b9 -> a94d680ea


Introduces TypeDescriptors.extractFromTypeParameters


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62c922b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62c922b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62c922b3

Branch: refs/heads/master
Commit: 62c922b3c2dc3163b180f972a7449cf5e6ac501a
Parents: 7ab8954
Author: Eugene Kirpichov <[email protected]>
Authored: Tue Jul 25 17:18:16 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Jul 28 10:23:15 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  24 ++--
 .../apache/beam/sdk/values/TypeDescriptor.java  |  64 +++++++---
 .../apache/beam/sdk/values/TypeDescriptors.java | 118 ++++++++++++++++++-
 .../beam/sdk/values/TypeDescriptorsTest.java    |  49 ++++++++
 .../io/gcp/bigquery/DynamicDestinations.java    |  22 ++--
 5 files changed, 235 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 9953975..3bf5d5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -23,6 +23,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verifyNotNull;
 import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM;
+import static 
org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
@@ -33,7 +34,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
@@ -75,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import 
org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Instant;
@@ -255,17 +256,16 @@ public abstract class FileBasedSink<OutputT, 
DestinationT> implements Serializab
         return destinationCoder;
       }
       // If dynamicDestinations doesn't provide a coder, try to find it in the 
coder registry.
-      // We must first use reflection to figure out what the type parameter is.
-      TypeDescriptor<?> superDescriptor =
-          
TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
-      if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
-        throw new AssertionError(
-            "Couldn't find the DynamicDestinations superclass of " + 
this.getClass());
-      }
-      TypeVariable typeVariable = 
superDescriptor.getTypeParameter("DestinationT");
-      @SuppressWarnings("unchecked")
-      TypeDescriptor<DestinationT> descriptor =
-          (TypeDescriptor<DestinationT>) 
superDescriptor.resolveType(typeVariable);
+      @Nullable TypeDescriptor<DestinationT> descriptor =
+          extractFromTypeParameters(
+              this,
+              DynamicDestinations.class,
+              new TypeVariableExtractor<
+                  DynamicDestinations<UserT, DestinationT>, DestinationT>() 
{});
+      checkArgument(
+          descriptor != null,
+          "Unable to infer a coder for DestinationT, "
+              + "please specify it explicitly by overriding 
getDestinationCoder()");
       return registry.getCoder(descriptor);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
index 14f2cb8..dd6a0fd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
@@ -328,30 +328,64 @@ public abstract class TypeDescriptor<T> implements 
Serializable {
   }
 
   /**
-   * Returns a new {@code TypeDescriptor} where type variables represented by
-   * {@code typeParameter} are substituted by {@code typeDescriptor}. For 
example, it can be used to
-   * construct {@code Map<K, V>} for any {@code K} and {@code V} type: <pre> 
{@code
-   *   static <K, V> TypeDescriptor<Map<K, V>> mapOf(
-   *       TypeDescriptor<K> keyType, TypeDescriptor<V> valueType) {
-   *     return new TypeDescriptor<Map<K, V>>() {}
-   *         .where(new TypeParameter<K>() {}, keyType)
-   *         .where(new TypeParameter<V>() {}, valueType);
-   *   }}</pre>
+   * Returns a new {@code TypeDescriptor} where the type variable represented 
by {@code
+   * typeParameter} are substituted by {@code typeDescriptor}. For example, it 
can be used to
+   * construct {@code Map<K, V>} for any {@code K} and {@code V} type:
+   *
+   * <pre>{@code
+   * static <K, V> TypeDescriptor<Map<K, V>> mapOf(
+   *     TypeDescriptor<K> keyType, TypeDescriptor<V> valueType) {
+   *   return new TypeDescriptor<Map<K, V>>() {}
+   *       .where(new TypeParameter<K>() {}, keyType)
+   *       .where(new TypeParameter<V>() {}, valueType);
+   * }
+   * }</pre>
    *
    * @param <X> The parameter type
    * @param typeParameter the parameter type variable
    * @param typeDescriptor the actual type to substitute
    */
   @SuppressWarnings("unchecked")
-  public <X> TypeDescriptor<T> where(TypeParameter<X> typeParameter,
-      TypeDescriptor<X> typeDescriptor) {
-    TypeResolver resolver =
-        new TypeResolver()
-            .where(
-                typeParameter.typeVariable, typeDescriptor.getType());
+  public <X> TypeDescriptor<T> where(
+      TypeParameter<X> typeParameter, TypeDescriptor<X> typeDescriptor) {
+    return where(typeParameter.typeVariable, typeDescriptor.getType());
+  }
+
+  /**
+   * A more general form of {@link #where(TypeParameter, TypeDescriptor)} that 
returns a new {@code
+   * TypeDescriptor} by matching {@code formal} against {@code actual} to 
resolve type variables in
+   * the current {@link TypeDescriptor}.
+   */
+  @SuppressWarnings("unchecked")
+  public TypeDescriptor<T> where(Type formal, Type actual) {
+    TypeResolver resolver = new TypeResolver().where(formal, actual);
     return (TypeDescriptor<T>) 
TypeDescriptor.of(resolver.resolveType(token.getType()));
   }
 
+  /**
+   * Returns whether this {@link TypeDescriptor} has any unresolved type 
parameters, as opposed to
+   * being a concrete type.
+   *
+   * <p>For example:
+   * <pre>{@code
+   *   TypeDescriptor.of(new ArrayList<String>() 
{}.getClass()).hasUnresolvedTypeParameters()
+   *     => false, because the anonymous class is instantiated with a concrete 
type
+   *
+   *   class TestUtils {
+   *     <T> ArrayList<T> createTypeErasedList() {
+   *       return new ArrayList<T>() {};
+   *     }
+   *   }
+   *
+   *   TypeDescriptor.of(TestUtils.<String>createTypeErasedList().getClass())
+   *     => true, because the type variable T got type-erased and the 
anonymous ArrayList class
+   *     is instantiated with an unresolved type variable T.
+   * }</pre>
+   */
+  public boolean hasUnresolvedParameters() {
+    return hasUnresolvedParameters(getType());
+  }
+
   @Override
   public String toString() {
     return token.toString();

http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index a4626c9..8207f06 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -17,16 +17,20 @@
  */
 package org.apache.beam.sdk.values;
 
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.List;
 import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 
 /**
- * A utility class containing the Java primitives for
- * {@link TypeDescriptor} equivalents. Also, has methods
- * for classes that wrap Java primitives like {@link KV},
- * {@link Set}, {@link List}, and {@link Iterable}.
+ * A utility class for creating {@link TypeDescriptor} objects for different 
types, such as Java
+ * primitive types, containers and {@link KV KVs} of other {@link 
TypeDescriptor} objects, and
+ * extracting type variables of parameterized types (e.g. extracting the 
{@code OutputT} type
+ * variable of a {@code DoFn<InputT, OutputT>}).
  */
 public class TypeDescriptors {
   /**
@@ -286,4 +290,110 @@ public class TypeDescriptors {
 
     return typeDescriptor;
   }
+
+  /**
+   * A helper interface for use with {@link #extractFromTypeParameters(Object, 
Class,
+   * TypeVariableExtractor)}.
+   */
+  public interface TypeVariableExtractor<InputT, OutputT> {}
+
+  /**
+   * Extracts a type from the actual type parameters of a parameterized class, 
subject to Java type
+   * erasure. The type to extract is specified in a way that is safe w.r.t. 
changing the type
+   * signature of the parameterized class, as opposed to specifying the name 
or index of a type
+   * variable.
+   *
+   * <p>Example of use:
+   * <pre>{@code
+   *   class Foo<BarT> {
+   *     private SerializableFunction<BarT, String> fn;
+   *
+   *     TypeDescriptor<BarT> inferBarTypeDescriptorFromFn() {
+   *       return TypeDescriptors.extractFromTypeParameters(
+   *         fn,
+   *         SerializableFunction.class,
+   *         // The actual type of "fn" is matched against the input type of 
the extractor,
+   *         // and the obtained values of type variables of the superclass 
are substituted
+   *         // into the output type of the extractor.
+   *         new TypeVariableExtractor<SerializableFunction<BarT, String>, 
BarT>() {});
+   *     }
+   *   }
+   * }</pre>
+   *
+   * @param instance The object being analyzed
+   * @param supertype Parameterized superclass of interest
+   * @param extractor A class for specifying the type to extract from the 
supertype
+   *
+   * @return A {@link TypeDescriptor} for the actual value of the result type 
of the extractor,
+   *   or {@code null} if the type was erased.
+   */
+  @SuppressWarnings("unchecked")
+  @Nullable
+  public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
+      T instance, Class<? super T> supertype, TypeVariableExtractor<T, V> 
extractor) {
+    return extractFromTypeParameters(
+        (TypeDescriptor<T>) TypeDescriptor.of(instance.getClass()), supertype, 
extractor);
+  }
+
+  /**
+   * Like {@link #extractFromTypeParameters(Object, Class, 
TypeVariableExtractor)}, but takes a
+   * {@link TypeDescriptor} of the instance being analyzed rather than the 
instance itself.
+   */
+  @SuppressWarnings("unchecked")
+  @Nullable
+  public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
+      TypeDescriptor<T> type, Class<? super T> supertype, 
TypeVariableExtractor<T, V> extractor) {
+    // Get the type signature of the extractor, e.g.
+    // TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>
+    TypeDescriptor<TypeVariableExtractor<T, V>> extractorSupertype =
+        (TypeDescriptor<TypeVariableExtractor<T, V>>)
+            
TypeDescriptor.of(extractor.getClass()).getSupertype(TypeVariableExtractor.class);
+
+    // Get the actual type argument, e.g. SerializableFunction<BarT, String>
+    Type inputT = ((ParameterizedType) 
extractorSupertype.getType()).getActualTypeArguments()[0];
+
+    // Get the actual supertype of the type being analyzed, hopefully with all 
type parameters
+    // resolved, e.g. SerializableFunction<Integer, String>
+    TypeDescriptor supertypeDescriptor = type.getSupertype(supertype);
+
+    // Substitute actual supertype into the extractor, e.g.
+    // TypeVariableExtractor<SerializableFunction<Integer, String>, Integer>
+    TypeDescriptor<TypeVariableExtractor<T, V>> extractorT =
+        extractorSupertype.where(inputT, supertypeDescriptor.getType());
+
+    // Get output of the extractor.
+    Type outputT = ((ParameterizedType) 
extractorT.getType()).getActualTypeArguments()[1];
+    TypeDescriptor<?> res = TypeDescriptor.of(outputT);
+    if (res.hasUnresolvedParameters()) {
+      return null;
+    } else {
+      return (TypeDescriptor<V>) res;
+    }
+  }
+
+  /**
+   * Returns a type descriptor for the input of the given {@link 
SerializableFunction}, subject to
+   * Java type erasure: returns {@code null} if the type was erased.
+   */
+  @Nullable
+  public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
+      SerializableFunction<InputT, OutputT> fn) {
+    return extractFromTypeParameters(
+        fn,
+        SerializableFunction.class,
+        new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, 
InputT>() {});
+  }
+
+  /**
+   * Returns a type descriptor for the output of the given {@link 
SerializableFunction}, subject to
+   * Java type erasure: returns {@code null} if the type was erased.
+   */
+  @Nullable
+  public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
+      SerializableFunction<InputT, OutputT> fn) {
+    return extractFromTypeParameters(
+        fn,
+        SerializableFunction.class,
+        new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, 
OutputT>() {});
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
index 1bf0fc9..a4f58da 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
@@ -25,6 +25,7 @@ import static org.apache.beam.sdk.values.TypeDescriptors.sets;
 import static org.apache.beam.sdk.values.TypeDescriptors.strings;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 
 import java.util.List;
 import java.util.Set;
@@ -70,4 +71,52 @@ public class TypeDescriptorsTest {
     assertNotEquals(descriptor, new TypeDescriptor<List<String>>() {});
     assertNotEquals(descriptor, new TypeDescriptor<List<Boolean>>() {});
   }
+
+  private interface Generic<FooT, BarT> {}
+
+  private static <ActualFooT> Generic<ActualFooT, String> typeErasedGeneric() {
+    return new Generic<ActualFooT, String>() {};
+  }
+
+  private static <ActualFooT, ActualBarT> TypeDescriptor<ActualFooT> 
extractFooT(
+      Generic<ActualFooT, ActualBarT> instance) {
+    return TypeDescriptors.extractFromTypeParameters(
+        instance,
+        Generic.class,
+        new TypeDescriptors.TypeVariableExtractor<
+            Generic<ActualFooT, ActualBarT>, ActualFooT>() {});
+  }
+
+  private static <ActualFooT, ActualBarT> TypeDescriptor<ActualBarT> 
extractBarT(
+      Generic<ActualFooT, ActualBarT> instance) {
+    return TypeDescriptors.extractFromTypeParameters(
+        instance,
+        Generic.class,
+        new TypeDescriptors.TypeVariableExtractor<
+            Generic<ActualFooT, ActualBarT>, ActualBarT>() {});
+  }
+
+  private static <ActualFooT, ActualBarT> TypeDescriptor<KV<ActualFooT, 
ActualBarT>> extractKV(
+      Generic<ActualFooT, ActualBarT> instance) {
+    return TypeDescriptors.extractFromTypeParameters(
+        instance,
+        Generic.class,
+        new TypeDescriptors.TypeVariableExtractor<
+            Generic<ActualFooT, ActualBarT>, KV<ActualFooT, ActualBarT>>() {});
+  }
+
+  @Test
+  public void testTypeDescriptorsTypeParameterOf() throws Exception {
+    assertEquals(strings(), extractFooT(new Generic<String, Integer>() {}));
+    assertEquals(integers(), extractBarT(new Generic<String, Integer>() {}));
+    assertEquals(kvs(strings(), integers()), extractKV(new Generic<String, 
Integer>() {}));
+  }
+
+  @Test
+  public void testTypeDescriptorsTypeParameterOfErased() throws Exception {
+    Generic<Integer, String> instance = 
TypeDescriptorsTest.typeErasedGeneric();
+    assertNull(extractFooT(instance));
+    assertEquals(strings(), extractBarT(instance));
+    assertNull(extractKV(instance));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index c5c2462..ea4fc4e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -19,11 +19,11 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters;
 
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /**
@@ -157,17 +158,16 @@ public abstract class DynamicDestinations<T, 
DestinationT> implements Serializab
       return destinationCoder;
     }
     // If dynamicDestinations doesn't provide a coder, try to find it in the 
coder registry.
-    // We must first use reflection to figure out what the type parameter is.
-    TypeDescriptor<?> superDescriptor =
-        TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
-    if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
-      throw new AssertionError(
-          "Couldn't find the DynamicDestinations superclass of " + 
this.getClass());
-    }
-    TypeVariable typeVariable = 
superDescriptor.getTypeParameter("DestinationT");
-    @SuppressWarnings("unchecked")
     TypeDescriptor<DestinationT> descriptor =
-        (TypeDescriptor<DestinationT>) 
superDescriptor.resolveType(typeVariable);
+        extractFromTypeParameters(
+            this,
+            DynamicDestinations.class,
+            new TypeDescriptors.TypeVariableExtractor<
+                DynamicDestinations<T, DestinationT>, DestinationT>() {});
+    checkArgument(
+        descriptor != null,
+        "Unable to infer a coder for DestinationT, "
+            + "please specify it explicitly by overriding 
getDestinationCoder()");
     return registry.getCoder(descriptor);
   }
 }

Reply via email to