Repository: beam Updated Branches: refs/heads/master 7ab8954b9 -> a94d680ea
Introduces TypeDescriptors.extractFromTypeParameters Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62c922b3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62c922b3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62c922b3 Branch: refs/heads/master Commit: 62c922b3c2dc3163b180f972a7449cf5e6ac501a Parents: 7ab8954 Author: Eugene Kirpichov <[email protected]> Authored: Tue Jul 25 17:18:16 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Jul 28 10:23:15 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSink.java | 24 ++-- .../apache/beam/sdk/values/TypeDescriptor.java | 64 +++++++--- .../apache/beam/sdk/values/TypeDescriptors.java | 118 ++++++++++++++++++- .../beam/sdk/values/TypeDescriptorsTest.java | 49 ++++++++ .../io/gcp/bigquery/DynamicDestinations.java | 22 ++-- 5 files changed, 235 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 9953975..3bf5d5b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verifyNotNull; import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM; +import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -33,7 +34,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.lang.reflect.TypeVariable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; @@ -75,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.joda.time.Instant; @@ -255,17 +256,16 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab return destinationCoder; } // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry. - // We must first use reflection to figure out what the type parameter is. - TypeDescriptor<?> superDescriptor = - TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class); - if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) { - throw new AssertionError( - "Couldn't find the DynamicDestinations superclass of " + this.getClass()); - } - TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT"); - @SuppressWarnings("unchecked") - TypeDescriptor<DestinationT> descriptor = - (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable); + @Nullable TypeDescriptor<DestinationT> descriptor = + extractFromTypeParameters( + this, + DynamicDestinations.class, + new TypeVariableExtractor< + DynamicDestinations<UserT, DestinationT>, DestinationT>() {}); + checkArgument( + descriptor != null, + "Unable to infer a coder for DestinationT, " + + "please specify it explicitly by overriding getDestinationCoder()"); return registry.getCoder(descriptor); } } http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java index 14f2cb8..dd6a0fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java @@ -328,30 +328,64 @@ public abstract class TypeDescriptor<T> implements Serializable { } /** - * Returns a new {@code TypeDescriptor} where type variables represented by - * {@code typeParameter} are substituted by {@code typeDescriptor}. For example, it can be used to - * construct {@code Map<K, V>} for any {@code K} and {@code V} type: <pre> {@code - * static <K, V> TypeDescriptor<Map<K, V>> mapOf( - * TypeDescriptor<K> keyType, TypeDescriptor<V> valueType) { - * return new TypeDescriptor<Map<K, V>>() {} - * .where(new TypeParameter<K>() {}, keyType) - * .where(new TypeParameter<V>() {}, valueType); - * }}</pre> + * Returns a new {@code TypeDescriptor} where the type variable represented by {@code + * typeParameter} are substituted by {@code typeDescriptor}. For example, it can be used to + * construct {@code Map<K, V>} for any {@code K} and {@code V} type: + * + * <pre>{@code + * static <K, V> TypeDescriptor<Map<K, V>> mapOf( + * TypeDescriptor<K> keyType, TypeDescriptor<V> valueType) { + * return new TypeDescriptor<Map<K, V>>() {} + * .where(new TypeParameter<K>() {}, keyType) + * .where(new TypeParameter<V>() {}, valueType); + * } + * }</pre> * * @param <X> The parameter type * @param typeParameter the parameter type variable * @param typeDescriptor the actual type to substitute */ @SuppressWarnings("unchecked") - public <X> TypeDescriptor<T> where(TypeParameter<X> typeParameter, - TypeDescriptor<X> typeDescriptor) { - TypeResolver resolver = - new TypeResolver() - .where( - typeParameter.typeVariable, typeDescriptor.getType()); + public <X> TypeDescriptor<T> where( + TypeParameter<X> typeParameter, TypeDescriptor<X> typeDescriptor) { + return where(typeParameter.typeVariable, typeDescriptor.getType()); + } + + /** + * A more general form of {@link #where(TypeParameter, TypeDescriptor)} that returns a new {@code + * TypeDescriptor} by matching {@code formal} against {@code actual} to resolve type variables in + * the current {@link TypeDescriptor}. + */ + @SuppressWarnings("unchecked") + public TypeDescriptor<T> where(Type formal, Type actual) { + TypeResolver resolver = new TypeResolver().where(formal, actual); return (TypeDescriptor<T>) TypeDescriptor.of(resolver.resolveType(token.getType())); } + /** + * Returns whether this {@link TypeDescriptor} has any unresolved type parameters, as opposed to + * being a concrete type. + * + * <p>For example: + * <pre>{@code + * TypeDescriptor.of(new ArrayList<String>() {}.getClass()).hasUnresolvedTypeParameters() + * => false, because the anonymous class is instantiated with a concrete type + * + * class TestUtils { + * <T> ArrayList<T> createTypeErasedList() { + * return new ArrayList<T>() {}; + * } + * } + * + * TypeDescriptor.of(TestUtils.<String>createTypeErasedList().getClass()) + * => true, because the type variable T got type-erased and the anonymous ArrayList class + * is instantiated with an unresolved type variable T. + * }</pre> + */ + public boolean hasUnresolvedParameters() { + return hasUnresolvedParameters(getType()); + } + @Override public String toString() { return token.toString(); http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java index a4626c9..8207f06 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java @@ -17,16 +17,20 @@ */ package org.apache.beam.sdk.values; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.math.BigDecimal; import java.math.BigInteger; import java.util.List; import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.SerializableFunction; /** - * A utility class containing the Java primitives for - * {@link TypeDescriptor} equivalents. Also, has methods - * for classes that wrap Java primitives like {@link KV}, - * {@link Set}, {@link List}, and {@link Iterable}. + * A utility class for creating {@link TypeDescriptor} objects for different types, such as Java + * primitive types, containers and {@link KV KVs} of other {@link TypeDescriptor} objects, and + * extracting type variables of parameterized types (e.g. extracting the {@code OutputT} type + * variable of a {@code DoFn<InputT, OutputT>}). */ public class TypeDescriptors { /** @@ -286,4 +290,110 @@ public class TypeDescriptors { return typeDescriptor; } + + /** + * A helper interface for use with {@link #extractFromTypeParameters(Object, Class, + * TypeVariableExtractor)}. + */ + public interface TypeVariableExtractor<InputT, OutputT> {} + + /** + * Extracts a type from the actual type parameters of a parameterized class, subject to Java type + * erasure. The type to extract is specified in a way that is safe w.r.t. changing the type + * signature of the parameterized class, as opposed to specifying the name or index of a type + * variable. + * + * <p>Example of use: + * <pre>{@code + * class Foo<BarT> { + * private SerializableFunction<BarT, String> fn; + * + * TypeDescriptor<BarT> inferBarTypeDescriptorFromFn() { + * return TypeDescriptors.extractFromTypeParameters( + * fn, + * SerializableFunction.class, + * // The actual type of "fn" is matched against the input type of the extractor, + * // and the obtained values of type variables of the superclass are substituted + * // into the output type of the extractor. + * new TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>() {}); + * } + * } + * }</pre> + * + * @param instance The object being analyzed + * @param supertype Parameterized superclass of interest + * @param extractor A class for specifying the type to extract from the supertype + * + * @return A {@link TypeDescriptor} for the actual value of the result type of the extractor, + * or {@code null} if the type was erased. + */ + @SuppressWarnings("unchecked") + @Nullable + public static <T, V> TypeDescriptor<V> extractFromTypeParameters( + T instance, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) { + return extractFromTypeParameters( + (TypeDescriptor<T>) TypeDescriptor.of(instance.getClass()), supertype, extractor); + } + + /** + * Like {@link #extractFromTypeParameters(Object, Class, TypeVariableExtractor)}, but takes a + * {@link TypeDescriptor} of the instance being analyzed rather than the instance itself. + */ + @SuppressWarnings("unchecked") + @Nullable + public static <T, V> TypeDescriptor<V> extractFromTypeParameters( + TypeDescriptor<T> type, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) { + // Get the type signature of the extractor, e.g. + // TypeVariableExtractor<SerializableFunction<BarT, String>, BarT> + TypeDescriptor<TypeVariableExtractor<T, V>> extractorSupertype = + (TypeDescriptor<TypeVariableExtractor<T, V>>) + TypeDescriptor.of(extractor.getClass()).getSupertype(TypeVariableExtractor.class); + + // Get the actual type argument, e.g. SerializableFunction<BarT, String> + Type inputT = ((ParameterizedType) extractorSupertype.getType()).getActualTypeArguments()[0]; + + // Get the actual supertype of the type being analyzed, hopefully with all type parameters + // resolved, e.g. SerializableFunction<Integer, String> + TypeDescriptor supertypeDescriptor = type.getSupertype(supertype); + + // Substitute actual supertype into the extractor, e.g. + // TypeVariableExtractor<SerializableFunction<Integer, String>, Integer> + TypeDescriptor<TypeVariableExtractor<T, V>> extractorT = + extractorSupertype.where(inputT, supertypeDescriptor.getType()); + + // Get output of the extractor. + Type outputT = ((ParameterizedType) extractorT.getType()).getActualTypeArguments()[1]; + TypeDescriptor<?> res = TypeDescriptor.of(outputT); + if (res.hasUnresolvedParameters()) { + return null; + } else { + return (TypeDescriptor<V>) res; + } + } + + /** + * Returns a type descriptor for the input of the given {@link SerializableFunction}, subject to + * Java type erasure: returns {@code null} if the type was erased. + */ + @Nullable + public static <InputT, OutputT> TypeDescriptor<InputT> inputOf( + SerializableFunction<InputT, OutputT> fn) { + return extractFromTypeParameters( + fn, + SerializableFunction.class, + new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, InputT>() {}); + } + + /** + * Returns a type descriptor for the output of the given {@link SerializableFunction}, subject to + * Java type erasure: returns {@code null} if the type was erased. + */ + @Nullable + public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf( + SerializableFunction<InputT, OutputT> fn) { + return extractFromTypeParameters( + fn, + SerializableFunction.class, + new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, OutputT>() {}); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java index 1bf0fc9..a4f58da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java @@ -25,6 +25,7 @@ import static org.apache.beam.sdk.values.TypeDescriptors.sets; import static org.apache.beam.sdk.values.TypeDescriptors.strings; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; import java.util.List; import java.util.Set; @@ -70,4 +71,52 @@ public class TypeDescriptorsTest { assertNotEquals(descriptor, new TypeDescriptor<List<String>>() {}); assertNotEquals(descriptor, new TypeDescriptor<List<Boolean>>() {}); } + + private interface Generic<FooT, BarT> {} + + private static <ActualFooT> Generic<ActualFooT, String> typeErasedGeneric() { + return new Generic<ActualFooT, String>() {}; + } + + private static <ActualFooT, ActualBarT> TypeDescriptor<ActualFooT> extractFooT( + Generic<ActualFooT, ActualBarT> instance) { + return TypeDescriptors.extractFromTypeParameters( + instance, + Generic.class, + new TypeDescriptors.TypeVariableExtractor< + Generic<ActualFooT, ActualBarT>, ActualFooT>() {}); + } + + private static <ActualFooT, ActualBarT> TypeDescriptor<ActualBarT> extractBarT( + Generic<ActualFooT, ActualBarT> instance) { + return TypeDescriptors.extractFromTypeParameters( + instance, + Generic.class, + new TypeDescriptors.TypeVariableExtractor< + Generic<ActualFooT, ActualBarT>, ActualBarT>() {}); + } + + private static <ActualFooT, ActualBarT> TypeDescriptor<KV<ActualFooT, ActualBarT>> extractKV( + Generic<ActualFooT, ActualBarT> instance) { + return TypeDescriptors.extractFromTypeParameters( + instance, + Generic.class, + new TypeDescriptors.TypeVariableExtractor< + Generic<ActualFooT, ActualBarT>, KV<ActualFooT, ActualBarT>>() {}); + } + + @Test + public void testTypeDescriptorsTypeParameterOf() throws Exception { + assertEquals(strings(), extractFooT(new Generic<String, Integer>() {})); + assertEquals(integers(), extractBarT(new Generic<String, Integer>() {})); + assertEquals(kvs(strings(), integers()), extractKV(new Generic<String, Integer>() {})); + } + + @Test + public void testTypeDescriptorsTypeParameterOfErased() throws Exception { + Generic<Integer, String> instance = TypeDescriptorsTest.typeErasedGeneric(); + assertNull(extractFooT(instance)); + assertEquals(strings(), extractBarT(instance)); + assertNull(extractKV(instance)); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java index c5c2462..ea4fc4e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java @@ -19,11 +19,11 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.Lists; import java.io.Serializable; -import java.lang.reflect.TypeVariable; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; /** @@ -157,17 +158,16 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab return destinationCoder; } // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry. - // We must first use reflection to figure out what the type parameter is. - TypeDescriptor<?> superDescriptor = - TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class); - if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) { - throw new AssertionError( - "Couldn't find the DynamicDestinations superclass of " + this.getClass()); - } - TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT"); - @SuppressWarnings("unchecked") TypeDescriptor<DestinationT> descriptor = - (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable); + extractFromTypeParameters( + this, + DynamicDestinations.class, + new TypeDescriptors.TypeVariableExtractor< + DynamicDestinations<T, DestinationT>, DestinationT>() {}); + checkArgument( + descriptor != null, + "Unable to infer a coder for DestinationT, " + + "please specify it explicitly by overriding getDestinationCoder()"); return registry.getCoder(descriptor); } }
