Repository: crunch Updated Branches: refs/heads/master 18028aab3 -> eac45b00a
CRUNCH-363: Fix protobufs to work with collection and union wrapper types. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/eac45b00 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/eac45b00 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/eac45b00 Branch: refs/heads/master Commit: eac45b00a90e1c248a4dedeade44a82b25347fec Parents: 18028aa Author: Josh Wills <[email protected]> Authored: Thu Mar 13 11:52:59 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Mar 13 11:55:55 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/CogroupIT.java | 69 ++ .../org/apache/crunch/lib/PersonProtos.java | 695 +++++++++++++++++++ crunch-core/src/it/resources/person.proto | 28 + .../apache/crunch/types/writable/Writables.java | 17 +- 4 files changed, 804 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/eac45b00/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java index 191c737..9be5b1e 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java @@ -23,12 +23,15 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import org.apache.crunch.*; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.lib.PersonProtos.Person; +import org.apache.crunch.lib.PersonProtos.Person.Builder; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.test.Tests; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.PTypes; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.writable.WritableTypeFamily; import org.junit.After; @@ -108,6 +111,26 @@ public class CogroupIT { runCogroupN(AvroTypeFamily.getInstance()); } + @Test + public void testCogroupProtosWritables() { + runCogroupProtos(WritableTypeFamily.getInstance()); + } + + @Test + public void testCogroupProtosAvro() { + runCogroupProtos(AvroTypeFamily.getInstance()); + } + + @Test + public void testCogroupProtosPairsWritables() { + runCogroupProtosPairs(WritableTypeFamily.getInstance()); + } + + @Test + public void testCogroupProtosPairsAvro() { + runCogroupProtosPairs(AvroTypeFamily.getInstance()); + } + public void runCogroup(PTypeFamily ptf) { PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings()); @@ -133,6 +156,32 @@ public class CogroupIT { assertThat(actual, is(expected)); } + public void runCogroupProtos(PTypeFamily ptf) { + PTableType<String, Person> tt = ptf.tableOf(ptf.strings(), PTypes.protos(Person.class, ptf)); + + PTable<String, Person> kv1 = lines1.parallelDo("kv1", new GenerateProto(), tt); + PTable<String, Person> kv2 = lines2.parallelDo("kv2", new GenerateProto(), tt); + + PTable<String, Pair<Collection<Person>, Collection<Person>>> cg = Cogroup.cogroup(kv1, kv2); + + Map<String, Pair<Collection<Person>, Collection<Person>>> result = cg.materializeToMap(); + + assertThat(result.size(), is(4)); + } + + public void runCogroupProtosPairs(PTypeFamily ptf) { + PTableType<String, Pair<String, Person>> tt = ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), PTypes.protos(Person.class, ptf))); + + PTable<String, Pair<String, Person>> kv1 = lines1.parallelDo("kv1", new GenerateProtoPairs(), tt); + PTable<String, Pair<String, Person>> kv2 = lines2.parallelDo("kv2", new GenerateProtoPairs(), tt); + + PTable<String, Pair<Collection<Pair<String, Person>>, Collection<Pair<String, Person>>>> cg = Cogroup.cogroup(kv1, kv2); + + Map<String, Pair<Collection<Pair<String, Person>>, Collection<Pair<String, Person>>>> result = cg.materializeToMap(); + + assertThat(result.size(), is(4)); + } + public void runCogroup3(PTypeFamily ptf) { PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings()); @@ -229,6 +278,26 @@ public class CogroupIT { } } + private static class GenerateProto extends DoFn<String, Pair<String, Person>> { + @Override + public void process(String input, Emitter<Pair<String, Person>> emitter) { + String[] fields = input.split(","); + String key = fields[0]; + Builder b = Person.newBuilder().setFirst("first"+key).setLast("last"+key); + emitter.emit(Pair.of(fields[0], b.build())); + } + } + + private static class GenerateProtoPairs extends DoFn<String, Pair<String, Pair<String, Person>>> { + @Override + public void process(String input, Emitter<Pair<String, Pair<String, Person>>> emitter) { + String[] fields = input.split(","); + String key = fields[0]; + Builder b = Person.newBuilder().setFirst("first"+key).setLast("last"+key); + emitter.emit(Pair.of(fields[0], Pair.of(fields[1], b.build()))); + } + } + private static Collection<String> coll(String... values) { return ImmutableSet.copyOf(values); } http://git-wip-us.apache.org/repos/asf/crunch/blob/eac45b00/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java b/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java new file mode 100644 index 0000000..c604861 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java @@ -0,0 +1,695 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: resources/person.proto + +package org.apache.crunch.lib; + +public final class PersonProtos { + private PersonProtos() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface PersonOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional string first = 1; + /** + * <code>optional string first = 1;</code> + */ + boolean hasFirst(); + /** + * <code>optional string first = 1;</code> + */ + java.lang.String getFirst(); + /** + * <code>optional string first = 1;</code> + */ + com.google.protobuf.ByteString + getFirstBytes(); + + // optional string last = 2; + /** + * <code>optional string last = 2;</code> + */ + boolean hasLast(); + /** + * <code>optional string last = 2;</code> + */ + java.lang.String getLast(); + /** + * <code>optional string last = 2;</code> + */ + com.google.protobuf.ByteString + getLastBytes(); + } + /** + * Protobuf type {@code crunch.Person} + */ + public static final class Person extends + com.google.protobuf.GeneratedMessage + implements PersonOrBuilder { + // Use Person.newBuilder() to construct. + private Person(com.google.protobuf.GeneratedMessage.Builder<?> builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private Person(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final Person defaultInstance; + public static Person getDefaultInstance() { + return defaultInstance; + } + + public Person getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private Person( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + first_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + last_ = input.readBytes(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.crunch.lib.PersonProtos.Person.class, org.apache.crunch.lib.PersonProtos.Person.Builder.class); + } + + public static com.google.protobuf.Parser<Person> PARSER = + new com.google.protobuf.AbstractParser<Person>() { + public Person parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new Person(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser<Person> getParserForType() { + return PARSER; + } + + private int bitField0_; + // optional string first = 1; + public static final int FIRST_FIELD_NUMBER = 1; + private java.lang.Object first_; + /** + * <code>optional string first = 1;</code> + */ + public boolean hasFirst() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <code>optional string first = 1;</code> + */ + public java.lang.String getFirst() { + java.lang.Object ref = first_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + first_ = s; + } + return s; + } + } + /** + * <code>optional string first = 1;</code> + */ + public com.google.protobuf.ByteString + getFirstBytes() { + java.lang.Object ref = first_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + first_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional string last = 2; + public static final int LAST_FIELD_NUMBER = 2; + private java.lang.Object last_; + /** + * <code>optional string last = 2;</code> + */ + public boolean hasLast() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * <code>optional string last = 2;</code> + */ + public java.lang.String getLast() { + java.lang.Object ref = last_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + last_ = s; + } + return s; + } + } + /** + * <code>optional string last = 2;</code> + */ + public com.google.protobuf.ByteString + getLastBytes() { + java.lang.Object ref = last_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + last_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private void initFields() { + first_ = ""; + last_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getFirstBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getLastBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getFirstBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getLastBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.apache.crunch.lib.PersonProtos.Person parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.crunch.lib.PersonProtos.Person parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.crunch.lib.PersonProtos.Person parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.crunch.lib.PersonProtos.Person parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.crunch.lib.PersonProtos.Person parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.crunch.lib.PersonProtos.Person parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.crunch.lib.PersonProtos.Person parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.crunch.lib.PersonProtos.Person parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.crunch.lib.PersonProtos.Person parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.crunch.lib.PersonProtos.Person parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.crunch.lib.PersonProtos.Person prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code crunch.Person} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder<Builder> + implements org.apache.crunch.lib.PersonProtos.PersonOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.crunch.lib.PersonProtos.Person.class, org.apache.crunch.lib.PersonProtos.Person.Builder.class); + } + + // Construct using org.apache.crunch.lib.PersonProtos.Person.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + first_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + last_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor; + } + + public org.apache.crunch.lib.PersonProtos.Person getDefaultInstanceForType() { + return org.apache.crunch.lib.PersonProtos.Person.getDefaultInstance(); + } + + public org.apache.crunch.lib.PersonProtos.Person build() { + org.apache.crunch.lib.PersonProtos.Person result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.crunch.lib.PersonProtos.Person buildPartial() { + org.apache.crunch.lib.PersonProtos.Person result = new org.apache.crunch.lib.PersonProtos.Person(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.first_ = first_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.last_ = last_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.crunch.lib.PersonProtos.Person) { + return mergeFrom((org.apache.crunch.lib.PersonProtos.Person)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.crunch.lib.PersonProtos.Person other) { + if (other == org.apache.crunch.lib.PersonProtos.Person.getDefaultInstance()) return this; + if (other.hasFirst()) { + bitField0_ |= 0x00000001; + first_ = other.first_; + onChanged(); + } + if (other.hasLast()) { + bitField0_ |= 0x00000002; + last_ = other.last_; + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.crunch.lib.PersonProtos.Person parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.crunch.lib.PersonProtos.Person) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional string first = 1; + private java.lang.Object first_ = ""; + /** + * <code>optional string first = 1;</code> + */ + public boolean hasFirst() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <code>optional string first = 1;</code> + */ + public java.lang.String getFirst() { + java.lang.Object ref = first_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + first_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * <code>optional string first = 1;</code> + */ + public com.google.protobuf.ByteString + getFirstBytes() { + java.lang.Object ref = first_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + first_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * <code>optional string first = 1;</code> + */ + public Builder setFirst( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + first_ = value; + onChanged(); + return this; + } + /** + * <code>optional string first = 1;</code> + */ + public Builder clearFirst() { + bitField0_ = (bitField0_ & ~0x00000001); + first_ = getDefaultInstance().getFirst(); + onChanged(); + return this; + } + /** + * <code>optional string first = 1;</code> + */ + public Builder setFirstBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + first_ = value; + onChanged(); + return this; + } + + // optional string last = 2; + private java.lang.Object last_ = ""; + /** + * <code>optional string last = 2;</code> + */ + public boolean hasLast() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * <code>optional string last = 2;</code> + */ + public java.lang.String getLast() { + java.lang.Object ref = last_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + last_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * <code>optional string last = 2;</code> + */ + public com.google.protobuf.ByteString + getLastBytes() { + java.lang.Object ref = last_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + last_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * <code>optional string last = 2;</code> + */ + public Builder setLast( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + last_ = value; + onChanged(); + return this; + } + /** + * <code>optional string last = 2;</code> + */ + public Builder clearLast() { + bitField0_ = (bitField0_ & ~0x00000002); + last_ = getDefaultInstance().getLast(); + onChanged(); + return this; + } + /** + * <code>optional string last = 2;</code> + */ + public Builder setLastBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + last_ = value; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:crunch.Person) + } + + static { + defaultInstance = new Person(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:crunch.Person) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_crunch_Person_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_crunch_Person_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\ngist.proto\022\006crunch\"%\n\006Person\022\r\n\005first\030" + + "\001 \001(\t\022\014\n\004last\030\002 \001(\tB\'\n\025org.apache.crunch" + + ".libB\014PersonProtosH\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_crunch_Person_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_crunch_Person_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_crunch_Person_descriptor, + new java.lang.String[] { "First", "Last", }); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} http://git-wip-us.apache.org/repos/asf/crunch/blob/eac45b00/crunch-core/src/it/resources/person.proto ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/person.proto b/crunch-core/src/it/resources/person.proto new file mode 100644 index 0000000..b973234 --- /dev/null +++ b/crunch-core/src/it/resources/person.proto @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package crunch; + +option java_package = "org.apache.crunch.lib"; +option java_outer_classname = "PersonProtos"; + +option optimize_for = SPEED; + +message Person { + optional string first = 1; + optional string last = 2; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/eac45b00/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java index d087ca3..89464ac 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -359,6 +359,14 @@ public class Writables { return new WritableTableType((WritableType) key, (WritableType) value); } + private static BytesWritable asBytesWritable(Writable w) { + if (w instanceof BytesWritable) { + return (BytesWritable) w; + } else { + return new BytesWritable(WritableUtils.toByteArray(w)); + } + } + private static <W extends Writable> W create(Class<W> clazz, Writable writable) { if (clazz.equals(writable.getClass())) { return (W) writable; @@ -512,7 +520,7 @@ public class Writables { values[i] = w; written[i] = WRITABLE_CODES.inverse().get(w.getClass()); } else { - values[i] = new BytesWritable(WritableUtils.toByteArray(w)); + values[i] = asBytesWritable(w); written[i] = 1; // code for BytesWritable } } @@ -652,7 +660,7 @@ public class Writables { public UnionWritable map(Union input) { int index = input.getIndex(); Writable w = (Writable) fns.get(index).map(input.getValue()); - return new UnionWritable(index, new BytesWritable(WritableUtils.toByteArray(w))); + return new UnionWritable(index, asBytesWritable(w)); } } @@ -744,8 +752,7 @@ public class Writables { BytesWritable[] w = new BytesWritable[input.size()]; int index = 0; for (T in : input) { - Writable v = (Writable) mapFn.map(in); - w[index++] = new BytesWritable(WritableUtils.toByteArray(v)); + w[index++] = asBytesWritable((Writable) mapFn.map(in)); } arrayWritable.set(w); return arrayWritable; @@ -822,7 +829,7 @@ public class Writables { TextMapWritable tmw = new TextMapWritable(); for (Map.Entry<String, T> e : input.entrySet()) { Writable w = mapFn.map(e.getValue()); - tmw.put(new Text(e.getKey()), new BytesWritable(WritableUtils.toByteArray(w))); + tmw.put(new Text(e.getKey()), asBytesWritable(w)); } return tmw; }
