Updated Branches: refs/heads/apache-crunch-0.8 9a0882c03 -> 834260caa
CRUNCH-328: Add support for ExtensionRegistry in PTypes.protos with a SerializableSupplier Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/834260ca Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/834260ca Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/834260ca Branch: refs/heads/apache-crunch-0.8 Commit: 834260caaa41da8cb12eac891cc0eb238998739c Parents: 9a0882c Author: Josh Wills <[email protected]> Authored: Tue Jan 21 18:36:03 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Wed Jan 22 11:22:58 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/types/PTypes.java | 50 +++++++++++++++++++- .../crunch/util/SerializableSupplier.java | 31 ++++++++++++ 2 files changed, 79 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/834260ca/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java index cbb9c7c..e701747 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java @@ -21,8 +21,10 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.UUID; +import com.google.protobuf.ExtensionRegistry; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.MapFn; +import org.apache.crunch.util.SerializableSupplier; import org.apache.hadoop.util.ReflectionUtils; import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; @@ -41,27 +43,59 @@ import com.google.protobuf.Message; */ public class PTypes { + /** + * A PType for Java's {@link BigInteger} type. + */ public static PType<BigInteger> bigInt(PTypeFamily typeFamily) { return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes()); } + /** + * A PType for Java's {@link UUID} type. + */ public static PType<UUID> uuid(PTypeFamily ptf) { return ptf.derived(UUID.class, BYTE_TO_UUID, UUID_TO_BYTE, ptf.bytes()); } - + + /** + * Constructs a PType for reading a Java type from a JSON string using Jackson's {@link ObjectMapper}. + */ public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) { return typeFamily .derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings()); } + /** + * Constructs a PType for the given protocol buffer. + */ public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) { return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes()); } + /** + * Constructs a PType for a protocol buffer, using the given {@code SerializableSupplier} to provide + * an {@link ExtensionRegistry} to use in reading the given protobuf. + */ + public static <T extends Message> PType<T> protos( + Class<T> clazz, + PTypeFamily typeFamily, + SerializableSupplier<ExtensionRegistry> supplier) { + return typeFamily.derived(clazz, + new ProtoInputMapFn<T>(clazz, supplier), + new ProtoOutputMapFn<T>(), + typeFamily.bytes()); + } + + /** + * Constructs a PType for a Thrift record. + */ public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) { return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes()); } + /** + * Constructs a PType for a Java {@code Enum} type. + */ public static <T extends Enum> PType<T> enums(Class<T> type, PTypeFamily typeFamily) { return typeFamily.derived(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(), typeFamily.strings()); } @@ -126,21 +160,33 @@ public class PTypes { private static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> { private final Class<T> clazz; + private final SerializableSupplier<ExtensionRegistry> extensionSupplier; private transient T instance; + private transient ExtensionRegistry registry; ProtoInputMapFn(Class<T> clazz) { + this(clazz, null); + } + + ProtoInputMapFn(Class<T> clazz, SerializableSupplier<ExtensionRegistry> extensionSupplier) { this.clazz = clazz; + this.extensionSupplier = extensionSupplier; } @Override public void initialize() { this.instance = Protos.getDefaultInstance(clazz); + if (this.extensionSupplier != null) { + this.registry = extensionSupplier.get(); + } else { + this.registry = ExtensionRegistry.getEmptyRegistry(); + } } @Override public T map(ByteBuffer bb) { try { - return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build(); + return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit(), registry).build(); } catch (InvalidProtocolBufferException e) { throw new CrunchRuntimeException(e); } http://git-wip-us.apache.org/repos/asf/crunch/blob/834260ca/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java b/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java new file mode 100644 index 0000000..3642feb --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import com.google.common.base.Supplier; + +import java.io.Serializable; + +/** + * An extension of Guava's {@link Supplier} interface that indicates that an instance + * will also implement {@link Serializable}, which makes this object suitable for use + * with Crunch's DoFns when we need to construct an instance of a non-serializable + * type for use in processing. + */ +public interface SerializableSupplier<T> extends Supplier<T>, Serializable { +}
