Repository: incubator-beam Updated Branches: refs/heads/master 4f91c2eae -> cd0b6ec7d
Move tests from the sdk folder to the sdks/java/core folder Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd0b6ec7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd0b6ec7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd0b6ec7 Branch: refs/heads/master Commit: cd0b6ec7de2f335bbd7b8462cba049e9f0aca38a Parents: 4f91c2e Author: Kenneth Knowles <[email protected]> Authored: Mon Mar 28 14:45:37 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Mar 28 14:45:37 2016 -0700 ---------------------------------------------------------------------- .../EncodabilityEnforcementFactoryTest.java | 260 ------------------- .../ImmutabilityEnforcementFactoryTest.java | 130 ---------- .../EncodabilityEnforcementFactoryTest.java | 260 +++++++++++++++++++ .../ImmutabilityEnforcementFactoryTest.java | 130 ++++++++++ 4 files changed, 390 insertions(+), 390 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java deleted file mode 100644 index dcc9775..0000000 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.isA; - -import com.google.cloud.dataflow.sdk.coders.AtomicCoder; -import com.google.cloud.dataflow.sdk.coders.CoderException; -import com.google.cloud.dataflow.sdk.coders.VarIntCoder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.util.UserCodeException; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; - -/** - * Tests for {@link EncodabilityEnforcementFactory}. - */ -public class EncodabilityEnforcementFactoryTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create(); - - @Test - public void encodeFailsThrows() { - TestPipeline p = TestPipeline.create(); - PCollection<Record> unencodable = - p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder())); - AppliedPTransform<?, ?, ?> consumer = - unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); - - WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record()); - CommittedBundle<Record> input = - InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); - ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); - - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(CoderException.class)); - thrown.expectMessage("Encode not allowed"); - enforcement.beforeElement(record); - } - - @Test - public void decodeFailsThrows() { - TestPipeline p = TestPipeline.create(); - PCollection<Record> unencodable = - p.apply(Create.of(new Record()).withCoder(new RecordNoDecodeCoder())); - AppliedPTransform<?, ?, ?> consumer = - unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); - WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record()); - - CommittedBundle<Record> input = - InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); - ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); - - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(CoderException.class)); - thrown.expectMessage("Decode not allowed"); - enforcement.beforeElement(record); - } - - @Test - public void consistentWithEqualsStructuralValueNotEqualThrows() { - TestPipeline p = TestPipeline.create(); - PCollection<Record> unencodable = - p.apply(Create.of(new Record()).withCoder(new RecordStructuralValueCoder())); - AppliedPTransform<?, ?, ?> consumer = - unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); - - WindowedValue<Record> record = - WindowedValue.<Record>valueInGlobalWindow( - new Record() { - @Override - public String toString() { - return "OriginalRecord"; - } - }); - - CommittedBundle<Record> input = - InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); - ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); - - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalArgumentException.class)); - thrown.expectMessage("does not maintain structural value equality"); - thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName()); - thrown.expectMessage("OriginalRecord"); - enforcement.beforeElement(record); - } - - @Test - public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() { - TestPipeline p = TestPipeline.create(); - PCollection<Record> unencodable = - p.apply( - Create.of(new Record()) - .withCoder(new RecordNotConsistentWithEqualsStructuralValueCoder())); - AppliedPTransform<?, ?, ?> consumer = - unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); - - WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record()); - - CommittedBundle<Record> input = - InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); - ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); - - enforcement.beforeElement(record); - enforcement.afterElement(record); - enforcement.afterFinish( - input, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } - - @Test - public void structurallyEqualResultsSucceeds() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> unencodable = p.apply(Create.of(1).withCoder(VarIntCoder.of())); - AppliedPTransform<?, ?, ?> consumer = - unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal(); - - WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1); - - CommittedBundle<Integer> input = - InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now()); - ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer); - - enforcement.beforeElement(value); - enforcement.afterElement(value); - enforcement.afterFinish( - input, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } - - private static class Record {} - private static class RecordNoEncodeCoder extends AtomicCoder<Record> { - - @Override - public void encode( - Record value, - OutputStream outStream, - com.google.cloud.dataflow.sdk.coders.Coder.Context context) - throws CoderException, IOException { - throw new CoderException("Encode not allowed"); - } - - @Override - public Record decode( - InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context) - throws CoderException, IOException { - return null; - } - } - - private static class RecordNoDecodeCoder extends AtomicCoder<Record> { - @Override - public void encode( - Record value, - OutputStream outStream, - com.google.cloud.dataflow.sdk.coders.Coder.Context context) - throws CoderException, IOException {} - - @Override - public Record decode( - InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context) - throws CoderException, IOException { - throw new CoderException("Decode not allowed"); - } - } - - private static class RecordStructuralValueCoder extends AtomicCoder<Record> { - @Override - public void encode( - Record value, - OutputStream outStream, - com.google.cloud.dataflow.sdk.coders.Coder.Context context) - throws CoderException, IOException {} - - @Override - public Record decode( - InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context) - throws CoderException, IOException { - return new Record() { - @Override - public String toString() { - return "DecodedRecord"; - } - }; - } - - @Override - public boolean consistentWithEquals() { - return true; - } - - @Override - public Object structuralValue(Record value) { - return value; - } - } - - private static class RecordNotConsistentWithEqualsStructuralValueCoder - extends AtomicCoder<Record> { - @Override - public void encode( - Record value, - OutputStream outStream, - com.google.cloud.dataflow.sdk.coders.Coder.Context context) - throws CoderException, IOException {} - - @Override - public Record decode( - InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context) - throws CoderException, IOException { - return new Record() { - @Override - public String toString() { - return "DecodedRecord"; - } - }; - } - - @Override - public boolean consistentWithEquals() { - return false; - } - - @Override - public Object structuralValue(Record value) { - return value; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java deleted file mode 100644 index 87e12ce..0000000 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.isA; - -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.util.IllegalMutationException; -import com.google.cloud.dataflow.sdk.util.UserCodeException; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.Collections; - -/** - * Tests for {@link ImmutabilityEnforcementFactory}. - */ -@RunWith(JUnit4.class) -public class ImmutabilityEnforcementFactoryTest implements Serializable { - @Rule public transient ExpectedException thrown = ExpectedException.none(); - private transient ImmutabilityEnforcementFactory factory; - private transient PCollection<byte[]> pcollection; - private transient AppliedPTransform<?, ?, ?> consumer; - - @Before - public void setup() { - factory = new ImmutabilityEnforcementFactory(); - TestPipeline p = TestPipeline.create(); - pcollection = - p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes())) - .apply( - ParDo.of( - new DoFn<byte[], byte[]>() { - @Override - public void processElement(DoFn<byte[], byte[]>.ProcessContext c) - throws Exception { - c.element()[0] = 'b'; - } - })); - consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal(); - } - - @Test - public void unchangedSucceeds() { - WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); - CommittedBundle<byte[]> elements = - InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now()); - - ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); - enforcement.beforeElement(element); - enforcement.afterElement(element); - enforcement.afterFinish( - elements, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } - - @Test - public void mutatedDuringProcessElementThrows() { - WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); - CommittedBundle<byte[]> elements = - InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now()); - - ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); - enforcement.beforeElement(element); - element.getValue()[0] = 'f'; - thrown.equals(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); - thrown.expectMessage(consumer.getFullName()); - thrown.expectMessage("illegaly mutated"); - thrown.expectMessage("Input values must not be mutated"); - enforcement.afterElement(element); - enforcement.afterFinish( - elements, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } - - @Test - public void mutatedAfterProcessElementFails() { - - WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); - CommittedBundle<byte[]> elements = - InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now()); - - ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); - enforcement.beforeElement(element); - enforcement.afterElement(element); - - element.getValue()[0] = 'f'; - thrown.equals(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); - thrown.expectMessage(consumer.getFullName()); - thrown.expectMessage("illegaly mutated"); - thrown.expectMessage("Input values must not be mutated"); - enforcement.afterFinish( - elements, - StepTransformResult.withoutHold(consumer).build(), - Collections.<CommittedBundle<?>>emptyList()); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java new file mode 100644 index 0000000..dcc9775 --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java @@ -0,0 +1,260 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.isA; + +import com.google.cloud.dataflow.sdk.coders.AtomicCoder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; + +/** + * Tests for {@link EncodabilityEnforcementFactory}. + */ +public class EncodabilityEnforcementFactoryTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create(); + + @Test + public void encodeFailsThrows() { + TestPipeline p = TestPipeline.create(); + PCollection<Record> unencodable = + p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder())); + AppliedPTransform<?, ?, ?> consumer = + unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); + + WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record()); + CommittedBundle<Record> input = + InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); + ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); + + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(CoderException.class)); + thrown.expectMessage("Encode not allowed"); + enforcement.beforeElement(record); + } + + @Test + public void decodeFailsThrows() { + TestPipeline p = TestPipeline.create(); + PCollection<Record> unencodable = + p.apply(Create.of(new Record()).withCoder(new RecordNoDecodeCoder())); + AppliedPTransform<?, ?, ?> consumer = + unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); + WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record()); + + CommittedBundle<Record> input = + InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); + ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); + + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(CoderException.class)); + thrown.expectMessage("Decode not allowed"); + enforcement.beforeElement(record); + } + + @Test + public void consistentWithEqualsStructuralValueNotEqualThrows() { + TestPipeline p = TestPipeline.create(); + PCollection<Record> unencodable = + p.apply(Create.of(new Record()).withCoder(new RecordStructuralValueCoder())); + AppliedPTransform<?, ?, ?> consumer = + unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); + + WindowedValue<Record> record = + WindowedValue.<Record>valueInGlobalWindow( + new Record() { + @Override + public String toString() { + return "OriginalRecord"; + } + }); + + CommittedBundle<Record> input = + InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); + ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); + + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalArgumentException.class)); + thrown.expectMessage("does not maintain structural value equality"); + thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName()); + thrown.expectMessage("OriginalRecord"); + enforcement.beforeElement(record); + } + + @Test + public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() { + TestPipeline p = TestPipeline.create(); + PCollection<Record> unencodable = + p.apply( + Create.of(new Record()) + .withCoder(new RecordNotConsistentWithEqualsStructuralValueCoder())); + AppliedPTransform<?, ?, ?> consumer = + unencodable.apply(Count.<Record>globally()).getProducingTransformInternal(); + + WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record()); + + CommittedBundle<Record> input = + InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); + ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); + + enforcement.beforeElement(record); + enforcement.afterElement(record); + enforcement.afterFinish( + input, + StepTransformResult.withoutHold(consumer).build(), + Collections.<CommittedBundle<?>>emptyList()); + } + + @Test + public void structurallyEqualResultsSucceeds() { + TestPipeline p = TestPipeline.create(); + PCollection<Integer> unencodable = p.apply(Create.of(1).withCoder(VarIntCoder.of())); + AppliedPTransform<?, ?, ?> consumer = + unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal(); + + WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1); + + CommittedBundle<Integer> input = + InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now()); + ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer); + + enforcement.beforeElement(value); + enforcement.afterElement(value); + enforcement.afterFinish( + input, + StepTransformResult.withoutHold(consumer).build(), + Collections.<CommittedBundle<?>>emptyList()); + } + + private static class Record {} + private static class RecordNoEncodeCoder extends AtomicCoder<Record> { + + @Override + public void encode( + Record value, + OutputStream outStream, + com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException { + throw new CoderException("Encode not allowed"); + } + + @Override + public Record decode( + InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException { + return null; + } + } + + private static class RecordNoDecodeCoder extends AtomicCoder<Record> { + @Override + public void encode( + Record value, + OutputStream outStream, + com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException {} + + @Override + public Record decode( + InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException { + throw new CoderException("Decode not allowed"); + } + } + + private static class RecordStructuralValueCoder extends AtomicCoder<Record> { + @Override + public void encode( + Record value, + OutputStream outStream, + com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException {} + + @Override + public Record decode( + InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException { + return new Record() { + @Override + public String toString() { + return "DecodedRecord"; + } + }; + } + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public Object structuralValue(Record value) { + return value; + } + } + + private static class RecordNotConsistentWithEqualsStructuralValueCoder + extends AtomicCoder<Record> { + @Override + public void encode( + Record value, + OutputStream outStream, + com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException {} + + @Override + public Record decode( + InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException { + return new Record() { + @Override + public String toString() { + return "DecodedRecord"; + } + }; + } + + @Override + public boolean consistentWithEquals() { + return false; + } + + @Override + public Object structuralValue(Record value) { + return value; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java new file mode 100644 index 0000000..87e12ce --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.isA; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.util.IllegalMutationException; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; +import java.util.Collections; + +/** + * Tests for {@link ImmutabilityEnforcementFactory}. + */ +@RunWith(JUnit4.class) +public class ImmutabilityEnforcementFactoryTest implements Serializable { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + private transient ImmutabilityEnforcementFactory factory; + private transient PCollection<byte[]> pcollection; + private transient AppliedPTransform<?, ?, ?> consumer; + + @Before + public void setup() { + factory = new ImmutabilityEnforcementFactory(); + TestPipeline p = TestPipeline.create(); + pcollection = + p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes())) + .apply( + ParDo.of( + new DoFn<byte[], byte[]>() { + @Override + public void processElement(DoFn<byte[], byte[]>.ProcessContext c) + throws Exception { + c.element()[0] = 'b'; + } + })); + consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal(); + } + + @Test + public void unchangedSucceeds() { + WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); + CommittedBundle<byte[]> elements = + InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now()); + + ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); + enforcement.beforeElement(element); + enforcement.afterElement(element); + enforcement.afterFinish( + elements, + StepTransformResult.withoutHold(consumer).build(), + Collections.<CommittedBundle<?>>emptyList()); + } + + @Test + public void mutatedDuringProcessElementThrows() { + WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); + CommittedBundle<byte[]> elements = + InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now()); + + ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); + enforcement.beforeElement(element); + element.getValue()[0] = 'f'; + thrown.equals(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectMessage(consumer.getFullName()); + thrown.expectMessage("illegaly mutated"); + thrown.expectMessage("Input values must not be mutated"); + enforcement.afterElement(element); + enforcement.afterFinish( + elements, + StepTransformResult.withoutHold(consumer).build(), + Collections.<CommittedBundle<?>>emptyList()); + } + + @Test + public void mutatedAfterProcessElementFails() { + + WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); + CommittedBundle<byte[]> elements = + InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now()); + + ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); + enforcement.beforeElement(element); + enforcement.afterElement(element); + + element.getValue()[0] = 'f'; + thrown.equals(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectMessage(consumer.getFullName()); + thrown.expectMessage("illegaly mutated"); + thrown.expectMessage("Input values must not be mutated"); + enforcement.afterFinish( + elements, + StepTransformResult.withoutHold(consumer).build(), + Collections.<CommittedBundle<?>>emptyList()); + } +} +
