Repository: beam Updated Branches: refs/heads/master d7e7af8eb -> 57535f35e
http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java new file mode 100644 index 0000000..a8c6a81 --- /dev/null +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageA; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageB; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageC; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageWithMap; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ProtoCoder}. + */ +@RunWith(JUnit4.class) +public class ProtoCoderTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testFactoryMethodAgreement() throws Exception { + assertEquals(ProtoCoder.of(new TypeDescriptor<MessageA>() {}), ProtoCoder.of(MessageA.class)); + + assertEquals( + ProtoCoder.of(new TypeDescriptor<MessageA>() {}), + ProtoCoder.coderProvider().getCoder(new TypeDescriptor<MessageA>() {})); + } + + @Test + public void testProviderCannotProvideCoder() throws Exception { + thrown.expect(CannotProvideCoderException.class); + thrown.expectMessage("java.lang.Integer is not a subclass of com.google.protobuf.Message"); + + ProtoCoder.coderProvider().getCoder(new TypeDescriptor<Integer>() {}); + } + + @Test + public void testCoderEncodeDecodeEqual() throws Exception { + MessageA value = + MessageA.newBuilder() + .setField1("hello") + .addField2(MessageB.newBuilder().setField1(true).build()) + .addField2(MessageB.newBuilder().setField1(false).build()) + .build(); + CoderProperties.coderDecodeEncodeEqual(ProtoCoder.of(MessageA.class), value); + } + + @Test + public void testCoderEncodeDecodeEqualNestedContext() throws Exception { + MessageA value1 = + MessageA.newBuilder() + .setField1("hello") + .addField2(MessageB.newBuilder().setField1(true).build()) + .addField2(MessageB.newBuilder().setField1(false).build()) + .build(); + MessageA value2 = + MessageA.newBuilder() + .setField1("world") + .addField2(MessageB.newBuilder().setField1(false).build()) + .addField2(MessageB.newBuilder().setField1(true).build()) + .build(); + CoderProperties.coderDecodeEncodeEqual( + ListCoder.of(ProtoCoder.of(MessageA.class)), ImmutableList.of(value1, value2)); + } + + @Test + public void testCoderEncodeDecodeExtensionsEqual() throws Exception { + MessageC value = + MessageC.newBuilder() + .setExtension( + Proto2CoderTestMessages.field1, + MessageA.newBuilder() + .setField1("hello") + .addField2(MessageB.newBuilder().setField1(true).build()) + .build()) + .setExtension( + Proto2CoderTestMessages.field2, MessageB.newBuilder().setField1(false).build()) + .build(); + CoderProperties.coderDecodeEncodeEqual( + ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class), value); + } + + @Test + public void testCoderSerialization() throws Exception { + ProtoCoder<MessageA> coder = ProtoCoder.of(MessageA.class); + CoderProperties.coderSerializable(coder); + } + + @Test + public void testCoderExtensionsSerialization() throws Exception { + ProtoCoder<MessageC> coder = + ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class); + CoderProperties.coderSerializable(coder); + } + + @Test + public void testEncodingId() throws Exception { + Coder<MessageA> coderA = ProtoCoder.of(MessageA.class); + CoderProperties.coderHasEncodingId(coderA, MessageA.class.getName() + "[]"); + + ProtoCoder<MessageC> coder = + ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class); + CoderProperties.coderHasEncodingId( + coder, + String.format("%s[%s]", MessageC.class.getName(), Proto2CoderTestMessages.class.getName())); + } + + @Test + public void encodeNullThrowsCoderException() throws Exception { + thrown.expect(CoderException.class); + thrown.expectMessage("cannot encode a null MessageA"); + + CoderUtils.encodeToBase64(ProtoCoder.of(MessageA.class), null); + } + + @Test + public void testDeterministicCoder() throws NonDeterministicException { + Coder<MessageA> coder = ProtoCoder.of(MessageA.class); + coder.verifyDeterministic(); + } + + @Test + public void testNonDeterministicCoder() throws NonDeterministicException { + thrown.expect(NonDeterministicException.class); + thrown.expectMessage(MessageWithMap.class.getName() + " transitively includes Map field"); + + Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class); + coder.verifyDeterministic(); + } + + @Test + public void testNonDeterministicProperty() throws CoderException { + MessageWithMap.Builder msg1B = MessageWithMap.newBuilder(); + MessageWithMap.Builder msg2B = MessageWithMap.newBuilder(); + + // Built in reverse order but with equal contents. + for (int i = 0; i < 10; ++i) { + msg1B.getMutableField1().put("key" + i, MessageA.getDefaultInstance()); + msg2B.getMutableField1().put("key" + (9 - i), MessageA.getDefaultInstance()); + } + + // Assert the messages are equal. + MessageWithMap msg1 = msg1B.build(); + MessageWithMap msg2 = msg2B.build(); + assertEquals(msg2, msg1); + + // Assert the encoded messages are not equal. + Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class); + assertNotEquals(CoderUtils.encodeToBase64(coder, msg2), CoderUtils.encodeToBase64(coder, msg1)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtilTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtilTest.java new file mode 100644 index 0000000..b2e5c10 --- /dev/null +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtilTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import static org.apache.beam.sdk.extensions.protobuf.ProtobufUtil.checkProto2Syntax; +import static org.apache.beam.sdk.extensions.protobuf.ProtobufUtil.getRecursiveDescriptorsForClass; +import static org.apache.beam.sdk.extensions.protobuf.ProtobufUtil.verifyDeterministic; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.google.protobuf.Any; +import com.google.protobuf.Descriptors.GenericDescriptor; +import com.google.protobuf.Duration; +import com.google.protobuf.ExtensionRegistry; +import com.google.protobuf.Message; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageA; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageB; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageC; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.MessageWithMap; +import org.apache.beam.sdk.extensions.protobuf.Proto2CoderTestMessages.ReferencesMessageWithMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ProtobufUtil}. + */ +@RunWith(JUnit4.class) +public class ProtobufUtilTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final Set<String> MESSAGE_A_ONLY = + ImmutableSet.of("proto2_coder_test_messages.MessageA"); + + private static final Set<String> MESSAGE_B_ONLY = + ImmutableSet.of("proto2_coder_test_messages.MessageB"); + + private static final Set<String> MESSAGE_C_ONLY = + ImmutableSet.of("proto2_coder_test_messages.MessageC"); + + // map fields are actually represented as a nested Message in generated Java code. + private static final Set<String> WITH_MAP_ONLY = + ImmutableSet.of( + "proto2_coder_test_messages.MessageWithMap", + "proto2_coder_test_messages.MessageWithMap.Field1Entry"); + + private static final Set<String> REFERS_MAP_ONLY = + ImmutableSet.of("proto2_coder_test_messages.ReferencesMessageWithMap"); + + // A references A and B. + private static final Set<String> MESSAGE_A_ALL = Sets.union(MESSAGE_A_ONLY, MESSAGE_B_ONLY); + + // C, only with registered extensions, references A. + private static final Set<String> MESSAGE_C_EXT = Sets.union(MESSAGE_C_ONLY, MESSAGE_A_ALL); + + // MessageWithMap references A. + private static final Set<String> WITH_MAP_ALL = Sets.union(WITH_MAP_ONLY, MESSAGE_A_ALL); + + // ReferencesMessageWithMap references MessageWithMap. + private static final Set<String> REFERS_MAP_ALL = Sets.union(REFERS_MAP_ONLY, WITH_MAP_ALL); + + @Test + public void testRecursiveDescriptorsMessageA() { + assertThat(getRecursiveDescriptorFullNames(MessageA.class), equalTo(MESSAGE_A_ALL)); + } + + @Test + public void testRecursiveDescriptorsMessageB() { + assertThat(getRecursiveDescriptorFullNames(MessageB.class), equalTo(MESSAGE_B_ONLY)); + } + + @Test + public void testRecursiveDescriptorsMessageC() { + assertThat(getRecursiveDescriptorFullNames(MessageC.class), equalTo(MESSAGE_C_ONLY)); + } + + @Test + public void testRecursiveDescriptorsMessageCWithExtensions() { + // With extensions, Message C has a reference to Message A and Message B. + ExtensionRegistry registry = ExtensionRegistry.newInstance(); + Proto2CoderTestMessages.registerAllExtensions(registry); + assertThat(getRecursiveDescriptorFullNames(MessageC.class, registry), equalTo(MESSAGE_C_EXT)); + } + + @Test + public void testRecursiveDescriptorsMessageWithMap() { + assertThat(getRecursiveDescriptorFullNames(MessageWithMap.class), equalTo(WITH_MAP_ALL)); + } + + @Test + public void testRecursiveDescriptorsReferencesMessageWithMap() { + assertThat( + getRecursiveDescriptorFullNames(ReferencesMessageWithMap.class), equalTo(REFERS_MAP_ALL)); + } + + @Test + public void testVerifyProto2() { + checkProto2Syntax(MessageA.class, ExtensionRegistry.getEmptyRegistry()); + checkProto2Syntax(MessageB.class, ExtensionRegistry.getEmptyRegistry()); + checkProto2Syntax(MessageC.class, ExtensionRegistry.getEmptyRegistry()); + checkProto2Syntax(MessageWithMap.class, ExtensionRegistry.getEmptyRegistry()); + checkProto2Syntax(ReferencesMessageWithMap.class, ExtensionRegistry.getEmptyRegistry()); + } + + @Test + public void testAnyIsNotProto2() { + // Any is a core Protocol Buffers type that uses proto3 syntax. + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(Any.class.getCanonicalName()); + thrown.expectMessage("in file " + Any.getDescriptor().getFile().getName()); + + checkProto2Syntax(Any.class, ExtensionRegistry.getEmptyRegistry()); + } + + @Test + public void testDurationIsNotProto2() { + // Duration is a core Protocol Buffers type that uses proto3 syntax. + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(Duration.class.getCanonicalName()); + thrown.expectMessage("in file " + Duration.getDescriptor().getFile().getName()); + + checkProto2Syntax(Duration.class, ExtensionRegistry.getEmptyRegistry()); + } + + @Test + public void testDurationIsDeterministic() throws NonDeterministicException { + // Duration can be encoded deterministically. + verifyDeterministic(ProtoCoder.of(Duration.class)); + } + + @Test + public void testMessageWithMapIsNotDeterministic() throws NonDeterministicException { + String mapFieldName = MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName(); + thrown.expect(NonDeterministicException.class); + thrown.expectMessage(MessageWithMap.class.getName()); + thrown.expectMessage("transitively includes Map field " + mapFieldName); + thrown.expectMessage("file " + MessageWithMap.getDescriptor().getFile().getName()); + + verifyDeterministic(ProtoCoder.of(MessageWithMap.class)); + } + + @Test + public void testMessageWithTransitiveMapIsNotDeterministic() throws NonDeterministicException { + String mapFieldName = MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName(); + thrown.expect(NonDeterministicException.class); + thrown.expectMessage(ReferencesMessageWithMap.class.getName()); + thrown.expectMessage("transitively includes Map field " + mapFieldName); + thrown.expectMessage("file " + MessageWithMap.getDescriptor().getFile().getName()); + + verifyDeterministic(ProtoCoder.of(ReferencesMessageWithMap.class)); + } + + //////////////////////////////////////////////////////////////////////////////////////////// + + /** Helper used to test the recursive class traversal and print good error messages. */ + private static Set<String> getRecursiveDescriptorFullNames(Class<? extends Message> clazz) { + return getRecursiveDescriptorFullNames(clazz, ExtensionRegistry.getEmptyRegistry()); + } + + /** Helper used to test the recursive class traversal and print good error messages. */ + private static Set<String> getRecursiveDescriptorFullNames( + Class<? extends Message> clazz, ExtensionRegistry registry) { + Set<String> result = new HashSet<>(); + for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) { + result.add(d.getFullName()); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/test/proto/proto2_coder_test_messages.proto ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/test/proto/proto2_coder_test_messages.proto b/sdks/java/extensions/protobuf/src/test/proto/proto2_coder_test_messages.proto new file mode 100644 index 0000000..137431a --- /dev/null +++ b/sdks/java/extensions/protobuf/src/test/proto/proto2_coder_test_messages.proto @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Protocol Buffer messages used for testing Proto2Coder implementation. + */ + +syntax = "proto2"; + +package proto2_coder_test_messages; + +option java_package = "org.apache.beam.sdk.extensions.protobuf"; + +message MessageA { + optional string field1 = 1; + repeated MessageB field2 = 2; +} + +message MessageB { + optional bool field1 = 1; +} + +message MessageC { + extensions 100 to 105; +} + +extend MessageC { + optional MessageA field1 = 101; + optional MessageB field2 = 102; +} + +message MessageWithMap { + map<string, MessageA> field1 = 1; +} + +message ReferencesMessageWithMap { + repeated MessageWithMap field1 = 1; +}
