Repository: incubator-beam Updated Branches: refs/heads/master ad45d5f75 -> fdec569f3
Remove isKeyed property of InProcess Bundles The property of keyedness belongs to a PCollection. A BundleFactory propogates the key as far as possible, but does not track if a bundle is keyed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/48126dc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/48126dc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/48126dc5 Branch: refs/heads/master Commit: 48126dc5713d6302bd40cf5c441ddb8a5bff88c9 Parents: 442435e Author: Thomas Groh <[email protected]> Authored: Mon Apr 18 12:59:24 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Wed Apr 20 11:19:03 2016 -0700 ---------------------------------------------------------------------- .../inprocess/InProcessBundleFactory.java | 38 +++++--------------- .../inprocess/InProcessPipelineRunner.java | 6 ---- .../inprocess/InProcessBundleFactoryTest.java | 11 ------ 3 files changed, 9 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48126dc5/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java index 2d02401..e39d02e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java @@ -43,20 +43,18 @@ class InProcessBundleFactory implements BundleFactory { @Override public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { - return InProcessBundle.unkeyed(output); + return InProcessBundle.create(output, null); } @Override public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) { - return input.isKeyed() - ? InProcessBundle.keyed(output, input.getKey()) - : InProcessBundle.unkeyed(output); + return InProcessBundle.create(output, input.getKey()); } @Override public <T> UncommittedBundle<T> createKeyedBundle( - CommittedBundle<?> input, Object key, PCollection<T> output) { - return InProcessBundle.keyed(output, key); + CommittedBundle<?> input, @Nullable Object key, PCollection<T> output) { + return InProcessBundle.create(output, key); } /** @@ -64,32 +62,19 @@ class InProcessBundleFactory implements BundleFactory { */ private static final class InProcessBundle<T> implements UncommittedBundle<T> { private final PCollection<T> pcollection; - private final boolean keyed; - private final Object key; + @Nullable private final Object key; private boolean committed = false; private ImmutableList.Builder<WindowedValue<T>> elements; /** - * Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key. + * Create a new {@link InProcessBundle} for the specified {@link PCollection}. */ - public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) { - return new InProcessBundle<T>(pcollection, false, null); + public static <T> InProcessBundle<T> create(PCollection<T> pcollection, @Nullable Object key) { + return new InProcessBundle<T>(pcollection, key); } - /** - * Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified - * key. - * - * <p>See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more - * information. - */ - public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, Object key) { - return new InProcessBundle<T>(pcollection, true, key); - } - - private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object key) { + private InProcessBundle(PCollection<T> pcollection, Object key) { this.pcollection = pcollection; - this.keyed = keyed; this.key = key; this.elements = ImmutableList.builder(); } @@ -123,11 +108,6 @@ class InProcessBundleFactory implements BundleFactory { } @Override - public boolean isKeyed() { - return keyed; - } - - @Override public Iterable<WindowedValue<T>> getElements() { return committedElements; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48126dc5/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java index a1a2567..6cc35fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -135,12 +135,6 @@ public class InProcessPipelineRunner PCollection<T> getPCollection(); /** - * Returns whether this bundle is keyed. A bundle that is part of a {@link PCollection} that - * occurs after a {@link GroupByKey} is keyed by the result of the last {@link GroupByKey}. - */ - boolean isKeyed(); - - /** * Returns the (possibly null) key that was output in the most recent {@link GroupByKey} in the * execution of this bundle. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48126dc5/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactoryTest.java index 50122aa..9adb6f9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactoryTest.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.runners.inprocess; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; @@ -75,7 +74,6 @@ public class InProcessBundleFactoryTest { CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now()); - assertThat(bundle.isKeyed(), is(false)); assertThat(bundle.getKey(), nullValue()); } @@ -86,7 +84,6 @@ public class InProcessBundleFactoryTest { bundleFactory.createKeyedBundle(null, key, pcollection); CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now()); - assertThat(bundle.isKeyed(), is(true)); assertThat(bundle.getKey(), equalTo(key)); } @@ -165,7 +162,6 @@ public class InProcessBundleFactoryTest { bundleFactory .createBundle(bundleFactory.createRootBundle(created).commit(Instant.now()), downstream) .commit(Instant.now()); - assertThat(newBundle.isKeyed(), is(false)); } @Test @@ -176,23 +172,16 @@ public class InProcessBundleFactoryTest { bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()), downstream) .commit(Instant.now()); - assertThat(newBundle.isKeyed(), is(true)); assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo")); } @Test - public void createRootBundleUnkeyed() { - assertThat(bundleFactory.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false)); - } - - @Test public void createKeyedBundleKeyed() { CommittedBundle<KV<String, Integer>> keyedBundle = bundleFactory .createKeyedBundle( bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream) .commit(Instant.now()); - assertThat(keyedBundle.isKeyed(), is(true)); assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo")); } }
