http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java index 87aa8c2..a7e64af 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -18,109 +18,350 @@ package org.apache.beam.runners.apex.translation.utils; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import com.datatorrent.lib.util.KryoCloneUtils; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateInternalsTest; +import java.util.Arrays; +import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; +import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory; +import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.GroupingState; +import org.apache.beam.sdk.state.ReadableState; import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.hamcrest.Matchers; -import org.junit.Ignore; +import org.joda.time.Instant; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.junit.runners.Suite; /** * Tests for {@link ApexStateInternals}. This is based on the tests for - * {@code StateInternalsTest}. + * {@code InMemoryStateInternals}. */ -@RunWith(Suite.class) [email protected]({ - ApexStateInternalsTest.StandardStateInternalsTests.class, - ApexStateInternalsTest.OtherTests.class -}) public class ApexStateInternalsTest { + private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); + private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); + private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); + private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - private static StateInternals newStateInternals() { - return new ApexStateInternals.ApexStateBackend() + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<CombiningState<Integer, int[], Integer>> + SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( + "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); + private static final StateTag<BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<WatermarkHoldState> + WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); + private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); + + private ApexStateInternals<String> underTest; + + @Before + public void initStateInternals() { + underTest = new ApexStateInternals.ApexStateBackend() .newStateInternalsFactory(StringUtf8Coder.of()) - .stateInternalsForKey("dummyKey"); - } - - /** - * A standard StateInternals test. Ignore set and map tests. - */ - @RunWith(JUnit4.class) - public static class StandardStateInternalsTests extends StateInternalsTest { - @Override - protected StateInternals createStateInternals() { - return newStateInternals(); - } - - @Override - @Ignore - public void testSet() {} - - @Override - @Ignore - public void testSetIsEmpty() {} - - @Override - @Ignore - public void testMergeSetIntoSource() {} - - @Override - @Ignore - public void testMergeSetIntoNewNamespace() {} - - @Override - @Ignore - public void testMap() {} - - @Override - @Ignore - public void testSetReadable() {} - - @Override - @Ignore - public void testMapReadable() {} - } - - /** - * A specific test of ApexStateInternalsTest. - */ - @RunWith(JUnit4.class) - public static class OtherTests { - - private static final StateNamespace NAMESPACE = new StateNamespaceForTest("ns"); - private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - - @Test - public void testSerialization() throws Exception { - ApexStateInternals.ApexStateInternalsFactory<String> sif = - new ApexStateInternals.ApexStateBackend(). - newStateInternalsFactory(StringUtf8Coder.of()); - ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy"); - - ValueState<String> value = keyAndState.state(NAMESPACE, STRING_VALUE_ADDR); - assertEquals(keyAndState.state(NAMESPACE, STRING_VALUE_ADDR), value); - value.write("hello"); - - ApexStateInternals.ApexStateInternalsFactory<String> cloned; - assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif)); - ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy"); - - ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE, STRING_VALUE_ADDR); - assertThat(clonedValue.read(), Matchers.equalTo("hello")); - assertEquals(clonedKeyAndState.state(NAMESPACE, STRING_VALUE_ADDR), value); - } + .stateInternalsForKey((String) null); + } + + @Test + public void testBag() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); + + assertThat(value.read(), Matchers.emptyIterable()); + value.add("hello"); + assertThat(value.read(), Matchers.containsInAnyOrder("hello")); + + value.add("world"); + assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); + + value.clear(); + assertThat(value.read(), Matchers.emptyIterable()); + assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); + + } + + @Test + public void testBagIsEmpty() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add("hello"); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeBagIntoSource() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); + + // Reading the merged bag gets both the contents + assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testMergeBagIntoNewNamespace() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); + + // Reading the merged bag gets both the contents + assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag1.read(), Matchers.emptyIterable()); + assertThat(bag2.read(), Matchers.emptyIterable()); } + + @Test + public void testCombiningValue() throws Exception { + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); + + assertThat(value.read(), Matchers.equalTo(0)); + value.add(2); + assertThat(value.read(), Matchers.equalTo(2)); + + value.add(3); + assertThat(value.read(), Matchers.equalTo(5)); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(0)); + assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value); + } + + @Test + public void testCombiningIsEmpty() throws Exception { + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(5); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeCombiningValueIntoSource() throws Exception { + CombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + assertThat(value1.read(), Matchers.equalTo(11)); + assertThat(value2.read(), Matchers.equalTo(10)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); + + assertThat(value1.read(), Matchers.equalTo(21)); + assertThat(value2.read(), Matchers.equalTo(0)); + } + + @Test + public void testMergeCombiningValueIntoNewNamespace() throws Exception { + CombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value3 = + underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); + + // Merging clears the old values and updates the result value. + assertThat(value1.read(), Matchers.equalTo(0)); + assertThat(value2.read(), Matchers.equalTo(0)); + assertThat(value3.read(), Matchers.equalTo(21)); + } + + @Test + public void testWatermarkEarliestState() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(1000)); + assertThat(value.read(), Matchers.equalTo(new Instant(1000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value); + } + + @Test + public void testWatermarkLatestState() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), Matchers.equalTo(new Instant(3000))); + + value.add(new Instant(1000)); + assertThat(value.read(), Matchers.equalTo(new Instant(3000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value); + } + + @Test + public void testWatermarkEndOfWindowState() throws Exception { + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value); + } + + @Test + public void testWatermarkStateIsEmpty() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(new Instant(1000)); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeEarliestWatermarkIntoSource() throws Exception { + WatermarkHoldState value1 = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + WatermarkHoldState value2 = + underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the merged value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); + + assertThat(value1.read(), Matchers.equalTo(new Instant(2000))); + assertThat(value2.read(), Matchers.equalTo(null)); + } + + @Test + public void testMergeLatestWatermarkIntoSource() throws Exception { + WatermarkHoldState value1 = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + WatermarkHoldState value2 = + underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); + WatermarkHoldState value3 = + underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); + + // Merging clears the old values and updates the result value. + assertThat(value3.read(), Matchers.equalTo(new Instant(5000))); + assertThat(value1.read(), Matchers.equalTo(null)); + assertThat(value2.read(), Matchers.equalTo(null)); + } + + @Test + public void testSerialization() throws Exception { + ApexStateInternalsFactory<String> sif = new ApexStateBackend(). + newStateInternalsFactory(StringUtf8Coder.of()); + ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy"); + + ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR); + assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value); + value.write("hello"); + + ApexStateInternalsFactory<String> cloned; + assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif)); + ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy"); + + ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR); + assertThat(clonedValue.read(), Matchers.equalTo("hello")); + assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value); + } + }
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml index b85b5f5..67951e9 100644 --- a/runners/core-construction-java/pom.xml +++ b/runners/core-construction-java/pom.xml @@ -24,7 +24,7 @@ <parent> <artifactId>beam-runners-parent</artifactId> <groupId>org.apache.beam</groupId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java index 8fc99b9..aa24909 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java @@ -56,8 +56,8 @@ public class CreatePCollectionViewTranslation { @Deprecated public static <ElemT, ViewT> PCollectionView<ViewT> getView( AppliedPTransform< - PCollection<ElemT>, PCollection<ElemT>, - PTransform<PCollection<ElemT>, PCollection<ElemT>>> + PCollection<ElemT>, PCollectionView<ViewT>, + PTransform<PCollection<ElemT>, PCollectionView<ViewT>>> application) throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java new file mode 100644 index 0000000..53a86b1 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * A tuple of an element and a restriction applied to processing it with a + * <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. + */ +@Experimental(Experimental.Kind.SPLITTABLE_DO_FN) +@AutoValue +public abstract class ElementAndRestriction<ElementT, RestrictionT> { + /** The element to process. */ + public abstract ElementT element(); + + /** The restriction applied to processing the element. */ + public abstract RestrictionT restriction(); + + /** Constructs the {@link ElementAndRestriction}. */ + public static <InputT, RestrictionT> ElementAndRestriction<InputT, RestrictionT> of( + InputT element, RestrictionT restriction) { + return new AutoValue_ElementAndRestriction<>(element, restriction); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java new file mode 100644 index 0000000..5ff0aae --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StructuredCoder; + +/** A {@link Coder} for {@link ElementAndRestriction}. */ +@Experimental(Experimental.Kind.SPLITTABLE_DO_FN) +public class ElementAndRestrictionCoder<ElementT, RestrictionT> + extends StructuredCoder<ElementAndRestriction<ElementT, RestrictionT>> { + private final Coder<ElementT> elementCoder; + private final Coder<RestrictionT> restrictionCoder; + + /** + * Creates an {@link ElementAndRestrictionCoder} from an element coder and a restriction coder. + */ + public static <ElementT, RestrictionT> ElementAndRestrictionCoder<ElementT, RestrictionT> of( + Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) { + return new ElementAndRestrictionCoder<>(elementCoder, restrictionCoder); + } + + private ElementAndRestrictionCoder( + Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) { + this.elementCoder = elementCoder; + this.restrictionCoder = restrictionCoder; + } + + @Override + public void encode( + ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream) + throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null ElementAndRestriction"); + } + elementCoder.encode(value.element(), outStream); + restrictionCoder.encode(value.restriction(), outStream); + } + + @Override + public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream) + throws IOException { + ElementT key = elementCoder.decode(inStream); + RestrictionT value = restrictionCoder.decode(inStream); + return ElementAndRestriction.of(key, value); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return ImmutableList.of(elementCoder, restrictionCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + elementCoder.verifyDeterministic(); + restrictionCoder.verifyDeterministic(); + } + + public Coder<ElementT> getElementCoder() { + return elementCoder; + } + + public Coder<RestrictionT> getRestrictionCoder() { + return restrictionCoder; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java index 52526bb..968966f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.values.PCollection; @@ -48,21 +47,6 @@ public class PCollectionTranslation { .build(); } - public static PCollection<?> fromProto( - Pipeline pipeline, RunnerApi.PCollection pCollection, RunnerApi.Components components) - throws IOException { - return PCollection.createPrimitiveOutputInternal( - pipeline, - WindowingStrategyTranslation.fromProto( - components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), - components), - fromProto(pCollection.getIsBounded())) - .setCoder( - (Coder) - CoderTranslation.fromProto( - components.getCodersOrThrow(pCollection.getCoderId()), components)); - } - public static IsBounded isBounded(RunnerApi.PCollection pCollection) { return fromProto(pCollection.getIsBounded()); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index 0d27241..bfe24a0 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -17,14 +17,12 @@ */ package org.apache.beam.runners.core.construction; -import static org.apache.beam.runners.core.construction.PTransformTranslation.WRITE_FILES_TRANSFORM_URN; - import com.google.common.base.MoreObjects; -import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.transforms.DoFn; @@ -52,34 +50,6 @@ public class PTransformMatchers { private PTransformMatchers() {} /** - * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the URN of the - * {@link PTransform} is equal to the URN provided ot this matcher. - */ - public static PTransformMatcher urnEqualTo(String urn) { - return new EqualUrnPTransformMatcher(urn); - } - - private static class EqualUrnPTransformMatcher implements PTransformMatcher { - private final String urn; - - private EqualUrnPTransformMatcher(String urn) { - this.urn = urn; - } - - @Override - public boolean matches(AppliedPTransform<?, ?, ?> application) { - return urn.equals(PTransformTranslation.urnForTransformOrNull(application.getTransform())); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("urn", urn) - .toString(); - } - } - - /** * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the class of the * {@link PTransform} is equal to the {@link Class} provided ot this matcher. */ @@ -181,68 +151,6 @@ public class PTransformMatchers { } /** - * A {@link PTransformMatcher} that matches a {@link ParDo} by URN if it has a splittable {@link - * DoFn}. - */ - public static PTransformMatcher splittableParDo() { - return new PTransformMatcher() { - @Override - public boolean matches(AppliedPTransform<?, ?, ?> application) { - if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals( - PTransformTranslation.urnForTransformOrNull(application.getTransform()))) { - - try { - return ParDoTranslation.isSplittable(application); - } catch (IOException e) { - throw new RuntimeException( - String.format( - "Transform with URN %s could not be translated", - PTransformTranslation.PAR_DO_TRANSFORM_URN), - e); - } - } - return false; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper("SplittableParDoMultiMatcher").toString(); - } - }; - } - - /** - * A {@link PTransformMatcher} that matches a {@link ParDo} transform by URN - * and whether it contains state or timers as specified by {@link ParDoTranslation}. - */ - public static PTransformMatcher stateOrTimerParDo() { - return new PTransformMatcher() { - @Override - public boolean matches(AppliedPTransform<?, ?, ?> application) { - if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals( - PTransformTranslation.urnForTransformOrNull(application.getTransform()))) { - - try { - return ParDoTranslation.usesStateOrTimers(application); - } catch (IOException e) { - throw new RuntimeException( - String.format( - "Transform with URN %s could not be translated", - PTransformTranslation.PAR_DO_TRANSFORM_URN), - e); - } - } - return false; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper("StateOrTimerParDoMatcher").toString(); - } - }; - } - - /** * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} containing a {@link DoFn} * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and * {@link DoFnSignature#usesTimers()}. @@ -360,18 +268,9 @@ public class PTransformMatchers { return new PTransformMatcher() { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { - if (WRITE_FILES_TRANSFORM_URN.equals( - PTransformTranslation.urnForTransformOrNull(application.getTransform()))) { - try { - return WriteFilesTranslation.isRunnerDeterminedSharding( - (AppliedPTransform) application); - } catch (IOException exc) { - throw new RuntimeException( - String.format( - "Transform with URN %s failed to parse: %s", - WRITE_FILES_TRANSFORM_URN, application.getTransform()), - exc); - } + if (application.getTransform() instanceof WriteFiles) { + WriteFiles write = (WriteFiles) application.getTransform(); + return write.getSharding() == null && write.getNumShards() == null; } return false; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index bae7b05..32ecf43 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -179,12 +179,13 @@ public class PTransformTranslation { * Returns the URN for the transform if it is known, otherwise throws. */ public static String urnForTransform(PTransform<?, ?> transform) { - String urn = urnForTransformOrNull(transform); - if (urn == null) { + TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()); + if (translator == null) { throw new IllegalStateException( String.format("No translator known for %s", transform.getClass().getName())); } - return urn; + + return translator.getUrn(transform); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index 03f29ff..34e0d86 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN; @@ -35,11 +34,9 @@ import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -75,10 +72,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; /** @@ -220,67 +215,11 @@ public class ParDoTranslation { return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn(); } - public static DoFn<?, ?> getDoFn(AppliedPTransform<?, ?, ?> application) throws IOException { - return getDoFn(getParDoPayload(application)); - } - public static TupleTag<?> getMainOutputTag(ParDoPayload payload) throws InvalidProtocolBufferException { return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag(); } - public static TupleTag<?> getMainOutputTag(AppliedPTransform<?, ?, ?> application) - throws IOException { - return getMainOutputTag(getParDoPayload(application)); - } - - public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, ?> application) - throws IOException { - - RunnerApi.PTransform protoTransform = - PTransformTranslation.toProto(application, SdkComponents.create()); - - ParDoPayload payload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class); - TupleTag<?> mainOutputTag = getMainOutputTag(payload); - Set<String> outputTags = - Sets.difference( - protoTransform.getOutputsMap().keySet(), Collections.singleton(mainOutputTag.getId())); - - ArrayList<TupleTag<?>> additionalOutputTags = new ArrayList<>(); - for (String outputTag : outputTags) { - additionalOutputTags.add(new TupleTag<>(outputTag)); - } - return TupleTagList.of(additionalOutputTags); - } - - public static List<PCollectionView<?>> getSideInputs(AppliedPTransform<?, ?, ?> application) - throws IOException { - - SdkComponents sdkComponents = SdkComponents.create(); - RunnerApi.PTransform parDoProto = - PTransformTranslation.toProto(application, sdkComponents); - ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class); - - List<PCollectionView<?>> views = new ArrayList<>(); - for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) { - String sideInputTag = sideInputEntry.getKey(); - RunnerApi.SideInput sideInput = sideInputEntry.getValue(); - PCollection<?> originalPCollection = - checkNotNull( - (PCollection<?>) application.getInputs().get(new TupleTag<>(sideInputTag)), - "no input with tag %s", - sideInputTag); - views.add( - viewFromProto( - sideInput, - sideInputTag, - originalPCollection, - parDoProto, - sdkComponents.toComponents())); - } - return views; - } - public static RunnerApi.PCollection getMainInput( RunnerApi.PTransform ptransform, Components components) throws IOException { checkArgument( @@ -508,27 +447,15 @@ public class ParDoTranslation { return builder.build(); } - /** - * Create a {@link PCollectionView} from a side input spec and an already-deserialized {@link - * PCollection} that should be wired up. - */ - public static PCollectionView<?> viewFromProto( - SideInput sideInput, - String localName, - PCollection<?> pCollection, - RunnerApi.PTransform parDoTransform, - Components components) + public static PCollectionView<?> fromProto( + SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components) throws IOException { - checkArgument( - localName != null, - "%s.viewFromProto: localName must not be null", - ParDoTranslation.class.getSimpleName()); - TupleTag<?> tag = new TupleTag<>(localName); + TupleTag<?> tag = new TupleTag<>(id); WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn()); ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn()); RunnerApi.PCollection inputCollection = - components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(localName)); + components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id)); WindowingStrategy<?, ?> windowingStrategy = WindowingStrategyTranslation.fromProto( components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()), @@ -548,7 +475,6 @@ public class ParDoTranslation { PCollectionView<?> view = new RunnerPCollectionView<>( - pCollection, (TupleTag<Iterable<WindowedValue<?>>>) tag, (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn, windowMappingFn, http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java index 85139e8..89e8784 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.core.construction; -import java.util.Map; -import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; @@ -28,7 +26,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.PValueBase; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -40,19 +37,16 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T> private final WindowMappingFn<?> windowMappingFn; private final WindowingStrategy<?, ?> windowingStrategy; private final Coder<Iterable<WindowedValue<?>>> coder; - private final transient PCollection<?> pCollection; /** * Create a new {@link RunnerPCollectionView} from the provided components. */ RunnerPCollectionView( - PCollection<?> pCollection, TupleTag<Iterable<WindowedValue<?>>> tag, ViewFn<Iterable<WindowedValue<?>>, T> viewFn, WindowMappingFn<?> windowMappingFn, @Nullable WindowingStrategy<?, ?> windowingStrategy, @Nullable Coder<Iterable<WindowedValue<?>>> coder) { - this.pCollection = pCollection; this.tag = tag; this.viewFn = viewFn; this.windowMappingFn = windowMappingFn; @@ -60,9 +54,11 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T> this.coder = coder; } + @Nullable @Override public PCollection<?> getPCollection() { - return pCollection; + throw new IllegalStateException( + String.format("Cannot call getPCollection on a %s", getClass().getSimpleName())); } @Override @@ -89,25 +85,4 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T> public Coder<Iterable<WindowedValue<?>>> getCoderInternal() { return coder; } - - @Override - public Map<TupleTag<?>, PValue> expand() { - throw new UnsupportedOperationException(String.format( - "A %s cannot be expanded", RunnerPCollectionView.class.getSimpleName())); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof PCollectionView)) { - return false; - } - @SuppressWarnings("unchecked") - PCollectionView<?> otherView = (PCollectionView<?>) other; - return tag.equals(otherView.getTagInternal()); - } - - @Override - public int hashCode() { - return Objects.hash(tag); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index e71187b..665e39d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -18,16 +18,13 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; -import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.UUID; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -42,8 +39,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; @@ -59,7 +54,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; * <li>Explode windows, since splitting within each window has to happen independently * <li>Assign a unique key to each element/restriction pair * <li>Process the keyed element/restriction pairs in a runner-specific way with the splittable - * {@link DoFn}'s {@link DoFn.ProcessElement} method. + * {@link DoFn}'s {@link DoFn.ProcessElement} method. * </ol> * * <p>This transform is intended as a helper for internal use by runners when implementing {@code @@ -68,11 +63,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) public class SplittableParDo<InputT, OutputT, RestrictionT> extends PTransform<PCollection<InputT>, PCollectionTuple> { - - private final DoFn<InputT, OutputT> doFn; - private final List<PCollectionView<?>> sideInputs; - private final TupleTag<OutputT> mainOutputTag; - private final TupleTagList additionalOutputTags; + private final ParDo.MultiOutput<InputT, OutputT> parDo; public static final String SPLITTABLE_PROCESS_URN = "urn:beam:runners_core:transforms:splittable_process:v1"; @@ -83,97 +74,56 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> public static final String SPLITTABLE_GBKIKWI_URN = "urn:beam:runners_core:transforms:splittable_gbkikwi:v1"; - private SplittableParDo( - DoFn<InputT, OutputT> doFn, - TupleTag<OutputT> mainOutputTag, - List<PCollectionView<?>> sideInputs, - TupleTagList additionalOutputTags) { - checkArgument( - DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(), - "fn must be a splittable DoFn"); - this.doFn = doFn; - this.mainOutputTag = mainOutputTag; - this.sideInputs = sideInputs; - this.additionalOutputTags = additionalOutputTags; - } - /** - * Creates a {@link SplittableParDo} from an original Java {@link ParDo}. + * Creates the transform for the given original multi-output {@link ParDo}. * * @param parDo The splittable {@link ParDo} transform. */ - public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo( - ParDo.MultiOutput<InputT, OutputT> parDo) { - checkArgument(parDo != null, "parDo must not be null"); + public SplittableParDo(ParDo.MultiOutput<InputT, OutputT> parDo) { + checkNotNull(parDo, "parDo must not be null"); + this.parDo = parDo; checkArgument( DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(), "fn must be a splittable DoFn"); - return new SplittableParDo( - parDo.getFn(), - parDo.getMainOutputTag(), - parDo.getSideInputs(), - parDo.getAdditionalOutputTags()); - } - - /** - * Creates the transform for a {@link ParDo}-compatible {@link AppliedPTransform}. - * - * <p>The input may generally be a deserialized transform so it may not actually be a {@link - * ParDo}. Instead {@link ParDoTranslation} will be used to extract fields. - */ - public static SplittableParDo<?, ?, ?> forAppliedParDo(AppliedPTransform<?, ?, ?> parDo) { - checkArgument(parDo != null, "parDo must not be null"); - - try { - return new SplittableParDo<>( - ParDoTranslation.getDoFn(parDo), - (TupleTag) ParDoTranslation.getMainOutputTag(parDo), - ParDoTranslation.getSideInputs(parDo), - ParDoTranslation.getAdditionalOutputTags(parDo)); - } catch (IOException exc) { - throw new RuntimeException(exc); - } } @Override public PCollectionTuple expand(PCollection<InputT> input) { + DoFn<InputT, OutputT> fn = parDo.getFn(); Coder<RestrictionT> restrictionCoder = - DoFnInvokers.invokerFor(doFn) + DoFnInvokers.invokerFor(fn) .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry()); - Coder<KV<InputT, RestrictionT>> splitCoder = KvCoder.of(input.getCoder(), restrictionCoder); + Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder = + ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder); - PCollection<KV<String, KV<InputT, RestrictionT>>> keyedRestrictions = + PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>> keyedRestrictions = input .apply( "Pair with initial restriction", - ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(doFn))) + ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(fn))) .setCoder(splitCoder) - .apply( - "Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(doFn))) + .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn))) .setCoder(splitCoder) // ProcessFn requires all input elements to be in a single window and have a single // element per work item. This must precede the unique keying so each key has a single // associated element. - .apply("Explode windows", ParDo.of(new ExplodeWindowsFn<KV<InputT, RestrictionT>>())) + .apply( + "Explode windows", + ParDo.of(new ExplodeWindowsFn<ElementAndRestriction<InputT, RestrictionT>>())) .apply( "Assign unique key", - WithKeys.of(new RandomUniqueKeyFn<KV<InputT, RestrictionT>>())); + WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>())); return keyedRestrictions.apply( "ProcessKeyedElements", new ProcessKeyedElements<>( - doFn, + fn, input.getCoder(), restrictionCoder, (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(), - sideInputs, - mainOutputTag, - additionalOutputTags)); - } - - @Override - public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return PCollectionViews.toAdditionalInputs(sideInputs); + parDo.getSideInputs(), + parDo.getMainOutputTag(), + parDo.getAdditionalOutputTags())); } /** @@ -190,11 +140,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> /** * Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement} - * method for a splittable {@link DoFn} on each {@link KV} of the input {@link PCollection} of - * {@link KV KVs} keyed with arbitrary but globally unique keys. + * method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input {@link + * PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys. */ public static class ProcessKeyedElements<InputT, OutputT, RestrictionT> - extends RawPTransform<PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple> { + extends RawPTransform< + PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> { private final DoFn<InputT, OutputT> fn; private final Coder<InputT> elementCoder; private final Coder<RestrictionT> restrictionCoder; @@ -257,7 +208,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> } @Override - public PCollectionTuple expand(PCollection<KV<String, KV<InputT, RestrictionT>>> input) { + public PCollectionTuple expand( + PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>> + input) { return createPrimitiveOutputFor( input, fn, mainOutputTag, additionalOutputTags, windowingStrategy); } @@ -283,11 +236,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> } @Override - public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return PCollectionViews.toAdditionalInputs(sideInputs); - } - - @Override public String getUrn() { return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN; } @@ -309,7 +257,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> * Pairs each input element with its initial restriction using the given splittable {@link DoFn}. */ private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT> - extends DoFn<InputT, KV<InputT, RestrictionT>> { + extends DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>> { private DoFn<InputT, OutputT> fn; private transient DoFnInvoker<InputT, OutputT> invoker; @@ -325,7 +273,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> @ProcessElement public void processElement(ProcessContext context) { context.output( - KV.of( + ElementAndRestriction.of( context.element(), invoker.<RestrictionT>invokeGetInitialRestriction(context.element()))); } @@ -333,7 +281,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> /** Splits the restriction using the given {@link SplitRestriction} method. */ private static class SplitRestrictionFn<InputT, RestrictionT> - extends DoFn<KV<InputT, RestrictionT>, KV<InputT, RestrictionT>> { + extends DoFn< + ElementAndRestriction<InputT, RestrictionT>, + ElementAndRestriction<InputT, RestrictionT>> { private final DoFn<InputT, ?> splittableFn; private transient DoFnInvoker<InputT, ?> invoker; @@ -348,14 +298,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> @ProcessElement public void processElement(final ProcessContext c) { - final InputT element = c.element().getKey(); + final InputT element = c.element().element(); invoker.invokeSplitRestriction( element, - c.element().getValue(), + c.element().restriction(), new OutputReceiver<RestrictionT>() { @Override public void output(RestrictionT part) { - c.output(KV.of(element, part)); + c.output(ElementAndRestriction.of(element, part)); } }); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java index 515de57..90e6304 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java @@ -18,9 +18,6 @@ package org.apache.beam.runners.core.construction; -import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN; - import com.google.auto.service.AutoService; import com.google.protobuf.Any; import com.google.protobuf.ByteString; @@ -36,8 +33,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -62,48 +57,6 @@ public class TestStreamTranslation { return builder.build(); } - private static TestStream<?> fromProto( - RunnerApi.TestStreamPayload testStreamPayload, RunnerApi.Components components) - throws IOException { - - Coder<Object> coder = - (Coder<Object>) - CoderTranslation.fromProto( - components.getCodersOrThrow(testStreamPayload.getCoderId()), components); - - List<TestStream.Event<Object>> events = new ArrayList<>(); - - for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) { - events.add(fromProto(event, coder)); - } - return TestStream.fromRawEvents(coder, events); - } - - /** - * Converts an {@link AppliedPTransform}, which may be a rehydrated transform or an original - * {@link TestStream}, to a {@link TestStream}. - */ - public static <T> TestStream<T> getTestStream( - AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> application) - throws IOException { - // For robustness, we don't take this shortcut: - // if (application.getTransform() instanceof TestStream) { - // return application.getTransform() - // } - - SdkComponents sdkComponents = SdkComponents.create(); - RunnerApi.PTransform transformProto = PTransformTranslation.toProto(application, sdkComponents); - checkArgument( - TEST_STREAM_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()), - "Attempt to get %s from a transform with wrong URN %s", - TestStream.class.getSimpleName(), - transformProto.getSpec().getUrn()); - RunnerApi.TestStreamPayload testStreamPayload = - transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class); - - return (TestStream<T>) fromProto(testStreamPayload, sdkComponents.toComponents()); - } - static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder) throws IOException { switch (event.getType()) { @@ -177,7 +130,7 @@ public class TestStreamTranslation { static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> { @Override public String getUrn(TestStream<?> transform) { - return TEST_STREAM_TRANSFORM_URN; + return PTransformTranslation.TEST_STREAM_TRANSFORM_URN; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java deleted file mode 100644 index 2baf93a..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.ImmutableList; -import java.util.Collection; -import java.util.Map; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; - -/** Utilities for extracting subsets of inputs from an {@link AppliedPTransform}. */ -public class TransformInputs { - /** - * Gets all inputs of the {@link AppliedPTransform} that are not returned by {@link - * PTransform#getAdditionalInputs()}. - */ - public static Collection<PValue> nonAdditionalInputs(AppliedPTransform<?, ?, ?> application) { - ImmutableList.Builder<PValue> mainInputs = ImmutableList.builder(); - PTransform<?, ?> transform = application.getTransform(); - for (Map.Entry<TupleTag<?>, PValue> input : application.getInputs().entrySet()) { - if (!transform.getAdditionalInputs().containsKey(input.getKey())) { - mainInputs.add(input.getValue()); - } - } - checkArgument( - !mainInputs.build().isEmpty() || application.getInputs().isEmpty(), - "Expected at least one main input if any inputs exist"); - return mainInputs.build(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 1456a3f..718efe7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.WindowingStrategy; @@ -120,27 +119,6 @@ public class WindowingStrategyTranslation implements Serializable { } } - - public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior proto) { - switch (proto) { - case FIRE_ALWAYS: - return OnTimeBehavior.FIRE_ALWAYS; - case FIRE_IF_NONEMPTY: - return OnTimeBehavior.FIRE_IF_NON_EMPTY; - case UNRECOGNIZED: - default: - // Whether or not it is proto that cannot recognize it (due to the version of the - // generated code we link to) or the switch hasn't been updated to handle it, - // the situation is the same: we don't know what this OutputTime means - throw new IllegalArgumentException( - String.format( - "Cannot convert unknown %s to %s: %s", - RunnerApi.OnTimeBehavior.class.getCanonicalName(), - OnTimeBehavior.class.getCanonicalName(), - proto)); - } - } - public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) { switch(timestampCombiner) { case EARLIEST: @@ -307,7 +285,6 @@ public class WindowingStrategyTranslation implements Serializable { .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) .setTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger())) .setWindowFn(windowFnSpec) - .setAssignsToOneWindow(windowingStrategy.getWindowFn().assignsToOneWindow()) .setWindowCoderId( components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); @@ -346,15 +323,13 @@ public class WindowingStrategyTranslation implements Serializable { Trigger trigger = TriggerTranslation.fromProto(proto.getTrigger()); ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); Duration allowedLateness = Duration.millis(proto.getAllowedLateness()); - OnTimeBehavior onTimeBehavior = fromProto(proto.getOnTimeBehavior()); return WindowingStrategy.of(windowFn) .withAllowedLateness(allowedLateness) .withMode(accumulationMode) .withTrigger(trigger) .withTimestampCombiner(timestampCombiner) - .withClosingBehavior(closingBehavior) - .withOnTimeBehavior(onTimeBehavior); + .withClosingBehavior(closingBehavior); } public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java index b1d2da4..99b77ef 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -26,7 +26,6 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import java.io.IOException; -import java.io.Serializable; import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; @@ -38,7 +37,6 @@ import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -53,45 +51,32 @@ public class WriteFilesTranslation { public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN = "urn:beam:file_based_sink:javasdk:0.1"; - public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN = - "urn:beam:file_based_sink_format_function:javasdk:0.1"; - @VisibleForTesting - static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) { + static WriteFilesPayload toProto(WriteFiles<?> transform) { return WriteFilesPayload.newBuilder() .setSink(toProto(transform.getSink())) - .setFormatFunction(toProto(transform.getFormatFunction())) .setWindowedWrites(transform.isWindowedWrites()) .setRunnerDeterminedSharding( transform.getNumShards() == null && transform.getSharding() == null) .build(); } - private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) { - return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink); - } - - private static SdkFunctionSpec toProto(SerializableFunction<?, ?> serializableFunction) { - return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, serializableFunction); - } - - private static SdkFunctionSpec toProto(String urn, Serializable serializable) { + private static SdkFunctionSpec toProto(FileBasedSink<?> sink) { return SdkFunctionSpec.newBuilder() .setSpec( FunctionSpec.newBuilder() - .setUrn(urn) + .setUrn(CUSTOM_JAVA_FILE_BASED_SINK_URN) .setParameter( Any.pack( BytesValue.newBuilder() .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(serializable))) + ByteString.copyFrom(SerializableUtils.serializeToByteArray(sink))) .build()))) .build(); } @VisibleForTesting - static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException { + static FileBasedSink<?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException { checkArgument( sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN), "Cannot extract %s instance from %s with URN %s", @@ -102,44 +87,16 @@ public class WriteFilesTranslation { byte[] serializedSink = sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); - return (FileBasedSink<?, ?>) + return (FileBasedSink<?>) SerializableUtils.deserializeFromByteArray( serializedSink, FileBasedSink.class.getSimpleName()); } - @VisibleForTesting - static <InputT, OutputT> SerializableFunction<InputT, OutputT> formatFunctionFromProto( - SdkFunctionSpec sinkProto) throws IOException { - checkArgument( - sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN), - "Cannot extract %s instance from %s with URN %s", - SerializableFunction.class.getSimpleName(), - FunctionSpec.class.getSimpleName(), - sinkProto.getSpec().getUrn()); - - byte[] serializedFunction = - sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); - - return (SerializableFunction<InputT, OutputT>) - SerializableUtils.deserializeFromByteArray( - serializedFunction, FileBasedSink.class.getSimpleName()); - } - - public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, DestinationT> getSink( - AppliedPTransform<PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>> - transform) - throws IOException { - return (FileBasedSink<OutputT, DestinationT>) - sinkFromProto(getWriteFilesPayload(transform).getSink()); - } - - public static <InputT, OutputT> SerializableFunction<InputT, OutputT> getFormatFunction( - AppliedPTransform< - PCollection<InputT>, PDone, ? extends PTransform<PCollection<InputT>, PDone>> + public static <T> FileBasedSink<T> getSink( + AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>> transform) throws IOException { - return formatFunctionFromProto( - getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction()); + return (FileBasedSink<T>) sinkFromProto(getWriteFilesPayload(transform).getSink()); } public static <T> boolean isWindowedWrites( @@ -167,15 +124,15 @@ public class WriteFilesTranslation { .unpack(WriteFilesPayload.class); } - static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> { + static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?>> { @Override - public String getUrn(WriteFiles<?, ?, ?> transform) { + public String getUrn(WriteFiles<?> transform) { return PTransformTranslation.WRITE_FILES_TRANSFORM_URN; } @Override public FunctionSpec translate( - AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents components) { + AppliedPTransform<?, ?, WriteFiles<?>> transform, SdkComponents components) { return FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) .setParameter(Any.pack(toProto(transform.getTransform()))) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java new file mode 100644 index 0000000..051cbaa --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.CoderUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +/** + * Tests for {@link ElementAndRestrictionCoder}. + */ +@RunWith(Parameterized.class) +public class ElementAndRestrictionCoderTest<K, V> { + private static class CoderAndData<T> { + Coder<T> coder; + List<T> data; + } + + private static class AnyCoderAndData { + private CoderAndData<?> coderAndData; + } + + private static <T> AnyCoderAndData coderAndData(Coder<T> coder, List<T> data) { + CoderAndData<T> coderAndData = new CoderAndData<>(); + coderAndData.coder = coder; + coderAndData.data = data; + AnyCoderAndData res = new AnyCoderAndData(); + res.coderAndData = coderAndData; + return res; + } + + private static final List<AnyCoderAndData> TEST_DATA = + Arrays.asList( + coderAndData( + VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE)), + coderAndData( + BigEndianLongCoder.of(), + Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)), + coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", "goodbye", "1")), + coderAndData( + ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), + Arrays.asList( + ElementAndRestriction.of("", -1), + ElementAndRestriction.of("hello", 0), + ElementAndRestriction.of("goodbye", Integer.MAX_VALUE))), + coderAndData( + ListCoder.of(VarLongCoder.of()), + Arrays.asList(Arrays.asList(1L, 2L, 3L), Collections.<Long>emptyList()))); + + @Parameterized.Parameters(name = "{index}: keyCoder={0} key={1} valueCoder={2} value={3}") + public static Collection<Object[]> data() { + List<Object[]> parameters = new ArrayList<>(); + for (AnyCoderAndData keyCoderAndData : TEST_DATA) { + Coder keyCoder = keyCoderAndData.coderAndData.coder; + for (Object key : keyCoderAndData.coderAndData.data) { + for (AnyCoderAndData valueCoderAndData : TEST_DATA) { + Coder valueCoder = valueCoderAndData.coderAndData.coder; + for (Object value : valueCoderAndData.coderAndData.data) { + parameters.add(new Object[] {keyCoder, key, valueCoder, value}); + } + } + } + } + return parameters; + } + + @Parameter(0) + public Coder<K> keyCoder; + @Parameter(1) + public K key; + @Parameter(2) + public Coder<V> valueCoder; + @Parameter(3) + public V value; + + @Test + @SuppressWarnings("rawtypes") + public void testDecodeEncodeEqual() throws Exception { + CoderProperties.coderDecodeEncodeEqual( + ElementAndRestrictionCoder.of(keyCoder, valueCoder), + ElementAndRestriction.of(key, value)); + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void encodeNullThrowsCoderException() throws Exception { + thrown.expect(CoderException.class); + thrown.expectMessage("cannot encode a null ElementAndRestriction"); + + CoderUtils.encodeToBase64( + ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), null); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java index 5c45487..3b94220 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java @@ -113,28 +113,6 @@ public class PCollectionTranslationTest { @Test public void testEncodeDecodeCycle() throws Exception { - // Encode - SdkComponents sdkComponents = SdkComponents.create(); - RunnerApi.PCollection protoCollection = - PCollectionTranslation.toProto(testCollection, sdkComponents); - RunnerApi.Components protoComponents = sdkComponents.toComponents(); - - // Decode - Pipeline pipeline = Pipeline.create(); - PCollection<?> decodedCollection = - PCollectionTranslation.fromProto(pipeline, protoCollection, protoComponents); - - // Verify - assertThat(decodedCollection.getCoder(), Matchers.<Coder<?>>equalTo(testCollection.getCoder())); - assertThat( - decodedCollection.getWindowingStrategy(), - Matchers.<WindowingStrategy<?, ?>>equalTo( - testCollection.getWindowingStrategy().fixDefaults())); - assertThat(decodedCollection.isBounded(), equalTo(testCollection.isBounded())); - } - - @Test - public void testEncodeDecodeFields() throws Exception { SdkComponents sdkComponents = SdkComponents.create(); RunnerApi.PCollection protoCollection = PCollectionTranslation .toProto(testCollection, sdkComponents); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 99d3dd1..2497598 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -27,12 +27,9 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import java.io.Serializable; import java.util.Collections; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.DefaultFilenamePolicy; -import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.LocalResources; @@ -56,7 +53,6 @@ import org.apache.beam.sdk.transforms.Materialization; import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; @@ -99,14 +95,9 @@ public class PTransformMatchersTest implements Serializable { PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal( p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); - input.setName("dummy input"); - input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); - PCollection<Integer> output = PCollection.createPrimitiveOutputInternal( p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); - output.setName("dummy output"); - output.setCoder(VarIntCoder.of()); return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p); } @@ -281,18 +272,6 @@ public class PTransformMatchersTest implements Serializable { } @Test - public void parDoSplittable() { - AppliedPTransform<?, ?, ?> parDoApplication = - getAppliedTransform( - ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty())); - assertThat(PTransformMatchers.splittableParDo().matches(parDoApplication), is(true)); - - assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false)); - assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); - assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false)); - } - - @Test public void parDoMultiWithState() { AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform( @@ -305,19 +284,6 @@ public class PTransformMatchersTest implements Serializable { } @Test - public void parDoWithState() { - AppliedPTransform<?, ?, ?> statefulApplication = - getAppliedTransform( - ParDo.of(doFnWithState).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty())); - assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), is(true)); - - AppliedPTransform<?, ?, ?> splittableApplication = - getAppliedTransform( - ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty())); - assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication), is(false)); - } - - @Test public void parDoMultiWithTimers() { AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform( @@ -539,32 +505,30 @@ public class PTransformMatchersTest implements Serializable { public void writeWithRunnerDeterminedSharding() { ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */); FilenamePolicy policy = - DefaultFilenamePolicy.fromStandardParameters( + DefaultFilenamePolicy.constructUsingStandardParameters( StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE, "", false); - WriteFiles<Integer, Void, Integer> write = + WriteFiles<Integer> write = WriteFiles.to( - new FileBasedSink<Integer, Void>( - StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) { + new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) { @Override - public WriteOperation<Integer, Void> createWriteOperation() { + public WriteOperation<Integer> createWriteOperation() { return null; } - }, - SerializableFunctions.<Integer>identity()); + }); assertThat( PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)), is(true)); - WriteFiles<Integer, Void, Integer> withStaticSharding = write.withNumShards(3); + WriteFiles<Integer> withStaticSharding = write.withNumShards(3); assertThat( PTransformMatchers.writeWithRunnerDeterminedSharding() .matches(appliedWrite(withStaticSharding)), is(false)); - WriteFiles<Integer, Void, Integer> withCustomSharding = + WriteFiles<Integer> withCustomSharding = write.withSharding(Sum.integersGlobally().asSingletonView()); assertThat( PTransformMatchers.writeWithRunnerDeterminedSharding() @@ -572,8 +536,8 @@ public class PTransformMatchersTest implements Serializable { is(false)); } - private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer, Void, Integer> write) { - return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer, Void, Integer>>of( + private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer> write) { + return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer>>of( "WriteFiles", Collections.<TupleTag<?>, PValue>emptyMap(), Collections.<TupleTag<?>, PValue>emptyMap(),
