Use Structural Value keys instead of User Values This fixes problems with lookup by basing entirely on structural equality.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3cc4fa4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3cc4fa4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3cc4fa4 Branch: refs/heads/master Commit: e3cc4fa4724625f49e3e6690e878a4713615b2e1 Parents: b9a8cbe Author: Thomas Groh <[email protected]> Authored: Wed Jun 1 14:28:18 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Jun 1 17:56:29 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/BundleFactory.java | 4 +- .../direct/ExecutorServiceParallelExecutor.java | 8 +- .../ImmutabilityCheckingBundleFactory.java | 4 +- .../direct/InMemoryWatermarkManager.java | 70 ++++++++------- .../runners/direct/InProcessBundleFactory.java | 25 +++--- .../direct/InProcessEvaluationContext.java | 10 +-- .../direct/InProcessExecutionContext.java | 4 +- ...InProcessGroupByKeyOnlyEvaluatorFactory.java | 6 +- .../runners/direct/InProcessPipelineRunner.java | 7 +- .../apache/beam/runners/direct/StepAndKey.java | 8 +- .../beam/runners/direct/StructuralKey.java | 77 ++++++++++++++++ .../direct/GroupByKeyEvaluatorFactoryTest.java | 27 ++++-- .../ImmutabilityCheckingBundleFactoryTest.java | 13 ++- .../direct/InMemoryWatermarkManagerTest.java | 95 +++++++++++--------- .../direct/InProcessBundleFactoryTest.java | 54 ++++++----- .../direct/InProcessEvaluationContextTest.java | 52 ++++++----- ...ocessGroupByKeyOnlyEvaluatorFactoryTest.java | 33 ++++--- .../direct/InProcessPipelineRunnerTest.java | 60 +++++++++++++ .../direct/InProcessTimerInternalsTest.java | 3 +- .../direct/ParDoInProcessEvaluatorTest.java | 2 +- .../direct/ParDoMultiEvaluatorFactoryTest.java | 29 +++--- .../direct/ParDoSingleEvaluatorFactoryTest.java | 35 +++++--- .../beam/runners/direct/StructuralKeyTest.java | 81 +++++++++++++++++ 23 files changed, 500 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java index fea4841..a0511df 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java @@ -44,6 +44,6 @@ public interface BundleFactory { * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle * belong to the {@code output} {@link PCollection}. */ - public <T> UncommittedBundle<T> createKeyedBundle( - CommittedBundle<?> input, Object key, PCollection<T> output); + public <K, T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 70a8035..a627125 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -396,17 +396,19 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { private boolean fireTimers() throws Exception { try { boolean firedTimers = false; - for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers : + for (Map.Entry< + AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> transformTimers : evaluationContext.extractFiredTimers().entrySet()) { AppliedPTransform<?, ?, ?> transform = transformTimers.getKey(); - for (Map.Entry<Object, FiredTimers> keyTimers : transformTimers.getValue().entrySet()) { + for (Map.Entry<StructuralKey<?>, FiredTimers> keyTimers : + transformTimers.getValue().entrySet()) { for (TimeDomain domain : TimeDomain.values()) { Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain); if (delivery.isEmpty()) { continue; } KeyedWorkItem<Object, Object> work = - KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery); + KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery); @SuppressWarnings({"unchecked", "rawtypes"}) CommittedBundle<?> bundle = evaluationContext http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java index 3b38211..92a57dd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java @@ -72,8 +72,8 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory { } @Override - public <T> UncommittedBundle<T> createKeyedBundle( - CommittedBundle<?> input, Object key, PCollection<T> output) { + public <K, T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) { return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java index f8cf343..95095fa 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java @@ -207,7 +207,7 @@ public class InMemoryWatermarkManager { private static class AppliedPTransformInputWatermark implements Watermark { private final Collection<? extends Watermark> inputWatermarks; private final SortedMultiset<WindowedValue<?>> pendingElements; - private final Map<Object, NavigableSet<TimerData>> objectTimers; + private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers; private AtomicReference<Instant> currentWatermark; @@ -286,7 +286,7 @@ public class InMemoryWatermarkManager { // We don't keep references to timers that have been fired and delivered via #getFiredTimers() } - private synchronized Map<Object, List<TimerData>> extractFiredEventTimeTimers() { + private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() { return extractFiredTimers(currentWatermark.get(), objectTimers); } @@ -384,8 +384,8 @@ public class InMemoryWatermarkManager { private static class SynchronizedProcessingTimeInputWatermark implements Watermark { private final Collection<? extends Watermark> inputWms; private final Collection<CommittedBundle<?>> pendingBundles; - private final Map<Object, NavigableSet<TimerData>> processingTimers; - private final Map<Object, NavigableSet<TimerData>> synchronizedProcessingTimers; + private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers; + private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers; private final PriorityQueue<TimerData> pendingTimers; @@ -490,9 +490,9 @@ public class InMemoryWatermarkManager { } } - private synchronized Map<Object, List<TimerData>> extractFiredDomainTimers( + private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredDomainTimers( TimeDomain domain, Instant firingTime) { - Map<Object, List<TimerData>> firedTimers; + Map<StructuralKey<?>, List<TimerData>> firedTimers; switch (domain) { case PROCESSING_TIME: firedTimers = extractFiredTimers(firingTime, processingTimers); @@ -509,13 +509,14 @@ public class InMemoryWatermarkManager { + " and gave a non-processing time domain " + domain); } - for (Map.Entry<Object, ? extends Collection<TimerData>> firedTimer : firedTimers.entrySet()) { + for (Map.Entry<StructuralKey<?>, ? extends Collection<TimerData>> firedTimer : + firedTimers.entrySet()) { pendingTimers.addAll(firedTimer.getValue()); } return firedTimers; } - private Map<TimeDomain, NavigableSet<TimerData>> timerMap(Object key) { + private Map<TimeDomain, NavigableSet<TimerData>> timerMap(StructuralKey<?> key) { NavigableSet<TimerData> processingQueue = processingTimers.get(key); if (processingQueue == null) { processingQueue = new TreeSet<>(); @@ -647,11 +648,12 @@ public class InMemoryWatermarkManager { * * The result collection retains ordering of timers (from earliest to latest). */ - private static Map<Object, List<TimerData>> extractFiredTimers( - Instant latestTime, Map<Object, NavigableSet<TimerData>> objectTimers) { - Map<Object, List<TimerData>> result = new HashMap<>(); - Set<Object> emptyKeys = new HashSet<>(); - for (Map.Entry<Object, NavigableSet<TimerData>> pendingTimers : objectTimers.entrySet()) { + private static Map<StructuralKey<?>, List<TimerData>> extractFiredTimers( + Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers) { + Map<StructuralKey<?>, List<TimerData>> result = new HashMap<>(); + Set<StructuralKey<?>> emptyKeys = new HashSet<>(); + for (Map.Entry<StructuralKey<?>, NavigableSet<TimerData>> pendingTimers : + objectTimers.entrySet()) { NavigableSet<TimerData> timers = pendingTimers.getValue(); if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) { ArrayList<TimerData> keyFiredTimers = new ArrayList<>(); @@ -923,11 +925,12 @@ public class InMemoryWatermarkManager { * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the * pending timers will be removed from this {@link InMemoryWatermarkManager}. */ - public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() { - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> allTimers = new HashMap<>(); + public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() { + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> allTimers = new HashMap<>(); for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry : transformToWatermarks.entrySet()) { - Map<Object, FiredTimers> keyFiredTimers = watermarksEntry.getValue().extractFiredTimers(); + Map<StructuralKey<?>, FiredTimers> keyFiredTimers = + watermarksEntry.getValue().extractFiredTimers(); if (!keyFiredTimers.isEmpty()) { allTimers.put(watermarksEntry.getKey(), keyFiredTimers); } @@ -1130,10 +1133,11 @@ public class InMemoryWatermarkManager { return FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN); } - private Map<Object, FiredTimers> extractFiredTimers() { - Map<Object, List<TimerData>> eventTimeTimers = inputWatermark.extractFiredEventTimeTimers(); - Map<Object, List<TimerData>> processingTimers; - Map<Object, List<TimerData>> synchronizedTimers; + private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() { + Map<StructuralKey<?>, List<TimerData>> eventTimeTimers = + inputWatermark.extractFiredEventTimeTimers(); + Map<StructuralKey<?>, List<TimerData>> processingTimers; + Map<StructuralKey<?>, List<TimerData>> synchronizedTimers; if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -1145,11 +1149,11 @@ public class InMemoryWatermarkManager { synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime()); } - Map<Object, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>(); + Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>(); groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers); - Map<Object, FiredTimers> keyFiredTimers = new HashMap<>(); - for (Map.Entry<Object, Map<TimeDomain, List<TimerData>>> firedTimers : + Map<StructuralKey<?>, FiredTimers> keyFiredTimers = new HashMap<>(); + for (Map.Entry<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> firedTimers : groupedTimers.entrySet()) { keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue())); } @@ -1158,10 +1162,10 @@ public class InMemoryWatermarkManager { @SafeVarargs private final void groupFiredTimers( - Map<Object, Map<TimeDomain, List<TimerData>>> groupedToMutate, - Map<Object, List<TimerData>>... timersToGroup) { - for (Map<Object, List<TimerData>> subGroup : timersToGroup) { - for (Map.Entry<Object, List<TimerData>> newTimers : subGroup.entrySet()) { + Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedToMutate, + Map<StructuralKey<?>, List<TimerData>>... timersToGroup) { + for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) { + for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : subGroup.entrySet()) { Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey()); if (grouped == null) { grouped = new HashMap<>(); @@ -1196,7 +1200,7 @@ public class InMemoryWatermarkManager { * the input to the executed step. */ public static class TimerUpdate { - private final Object key; + private final StructuralKey<?> key; private final Iterable<? extends TimerData> completedTimers; private final Iterable<? extends TimerData> setTimers; @@ -1217,7 +1221,7 @@ public class InMemoryWatermarkManager { * Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the * set and deleted timers to be added to it. */ - public static TimerUpdateBuilder builder(Object key) { + public static TimerUpdateBuilder builder(StructuralKey<?> key) { return new TimerUpdateBuilder(key); } @@ -1225,12 +1229,12 @@ public class InMemoryWatermarkManager { * A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers. */ public static final class TimerUpdateBuilder { - private final Object key; + private final StructuralKey<?> key; private final Collection<TimerData> completedTimers; private final Collection<TimerData> setTimers; private final Collection<TimerData> deletedTimers; - private TimerUpdateBuilder(Object key) { + private TimerUpdateBuilder(StructuralKey<?> key) { this.key = key; this.completedTimers = new HashSet<>(); this.setTimers = new HashSet<>(); @@ -1280,7 +1284,7 @@ public class InMemoryWatermarkManager { } private TimerUpdate( - Object key, + StructuralKey<?> key, Iterable<? extends TimerData> completedTimers, Iterable<? extends TimerData> setTimers, Iterable<? extends TimerData> deletedTimers) { @@ -1291,7 +1295,7 @@ public class InMemoryWatermarkManager { } @VisibleForTesting - Object getKey() { + StructuralKey<?> getKey() { return key; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java index bc9b04c..52bc575 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState; import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -29,8 +30,6 @@ import com.google.common.collect.ImmutableList; import org.joda.time.Instant; -import javax.annotation.Nullable; - /** * A factory that produces bundles that perform no additional validation. */ @@ -43,7 +42,7 @@ class InProcessBundleFactory implements BundleFactory { @Override public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { - return InProcessBundle.create(output, null); + return InProcessBundle.create(output, StructuralKey.of(null, VoidCoder.of())); } @Override @@ -52,8 +51,8 @@ class InProcessBundleFactory implements BundleFactory { } @Override - public <T> UncommittedBundle<T> createKeyedBundle( - CommittedBundle<?> input, @Nullable Object key, PCollection<T> output) { + public <K, T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) { return InProcessBundle.create(output, key); } @@ -62,18 +61,18 @@ class InProcessBundleFactory implements BundleFactory { */ private static final class InProcessBundle<T> implements UncommittedBundle<T> { private final PCollection<T> pcollection; - @Nullable private final Object key; + private final StructuralKey<?> key; private boolean committed = false; private ImmutableList.Builder<WindowedValue<T>> elements; /** * Create a new {@link InProcessBundle} for the specified {@link PCollection}. */ - public static <T> InProcessBundle<T> create(PCollection<T> pcollection, @Nullable Object key) { - return new InProcessBundle<T>(pcollection, key); + public static <T> InProcessBundle<T> create(PCollection<T> pcollection, StructuralKey<?> key) { + return new InProcessBundle<>(pcollection, key); } - private InProcessBundle(PCollection<T> pcollection, Object key) { + private InProcessBundle(PCollection<T> pcollection, StructuralKey<?> key) { this.pcollection = pcollection; this.key = key; this.elements = ImmutableList.builder(); @@ -108,7 +107,7 @@ class InProcessBundleFactory implements BundleFactory { private static class CommittedInProcessBundle<T> implements CommittedBundle<T> { public CommittedInProcessBundle( PCollection<T> pcollection, - Object key, + StructuralKey<?> key, Iterable<WindowedValue<T>> committedElements, Instant synchronizedCompletionTime) { this.pcollection = pcollection; @@ -118,13 +117,13 @@ class InProcessBundleFactory implements BundleFactory { } private final PCollection<T> pcollection; - private final Object key; + /** The structural value key of the Bundle, as specified by the coder that created it. */ + private final StructuralKey<?> key; private final Iterable<WindowedValue<T>> committedElements; private final Instant synchronizedCompletionTime; @Override - @Nullable - public Object getKey() { + public StructuralKey<?> getKey() { return key; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java index 732a279..981a842 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java @@ -233,8 +233,8 @@ class InProcessEvaluationContext { * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. */ - public <T> UncommittedBundle<T> createKeyedBundle( - CommittedBundle<?> input, Object key, PCollection<T> output) { + public <K, T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) { return bundleFactory.createKeyedBundle(input, key, output); } @@ -302,7 +302,7 @@ class InProcessEvaluationContext { * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key. */ public InProcessExecutionContext getExecutionContext( - AppliedPTransform<?, ?, ?> application, Object key) { + AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) { StepAndKey stepAndKey = StepAndKey.of(application, key); return new InProcessExecutionContext( options.getClock(), @@ -372,9 +372,9 @@ class InProcessEvaluationContext { * <p>This is a destructive operation. Timers will only appear in the result of this method once * for each time they are set. */ - public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() { + public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() { forceRefresh(); - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired = watermarkManager.extractFiredTimers(); return fired; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java index 44d8bd9..4f10b3a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java @@ -33,11 +33,11 @@ import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; class InProcessExecutionContext extends BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> { private final Clock clock; - private final Object key; + private final StructuralKey<?> key; private final CopyOnAccessInMemoryStateInternals<Object> existingState; private final TransformWatermarks watermarks; - public InProcessExecutionContext(Clock clock, Object key, + public InProcessExecutionContext(Clock clock, StructuralKey<?> key, CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) { this.clock = clock; this.key = key; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java index 79db5b6..a10d496 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java @@ -147,8 +147,10 @@ class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFacto K key = groupedEntry.getKey().key; KeyedWorkItem<K, V> groupedKv = KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue()); - UncommittedBundle<KeyedWorkItem<K, V>> bundle = - evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput()); + UncommittedBundle<KeyedWorkItem<K, V>> bundle = evaluationContext.createKeyedBundle( + inputBundle, + StructuralKey.of(key, keyCoder), + application.getOutput()); bundle.add(WindowedValue.valueInGlobalWindow(groupedKv)); resultBuilder.addOutput(bundle); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java index 5a04af4..8847c58 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java @@ -60,8 +60,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; -import javax.annotation.Nullable; - /** * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded * {@link PCollection PCollections}. @@ -130,11 +128,10 @@ public class InProcessPipelineRunner PCollection<T> getPCollection(); /** - * Returns the (possibly null) key that was output in the most recent {@link GroupByKey} in the + * Returns the key that was output in the most recent {@link GroupByKey} in the * execution of this bundle. */ - @Nullable - Object getKey(); + StructuralKey<?> getKey(); /** * Returns an {@link Iterable} containing all of the elements that have been added to this http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java index 1c7cf6c..18fe04f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java @@ -29,16 +29,16 @@ import java.util.Objects; */ final class StepAndKey { private final AppliedPTransform<?, ?, ?> step; - private final Object key; + private final StructuralKey<?> key; /** * Create a new {@link StepAndKey} with the provided step and key. */ - public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) { + public static StepAndKey of(AppliedPTransform<?, ?, ?> step, StructuralKey<?> key) { return new StepAndKey(step, key); } - private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) { + private StepAndKey(AppliedPTransform<?, ?, ?> step, StructuralKey<?> key) { this.step = step; this.key = key; } @@ -47,7 +47,7 @@ final class StepAndKey { public String toString() { return MoreObjects.toStringHelper(StepAndKey.class) .add("step", step.getFullName()) - .add("key", key) + .add("key", key.getKey()) .toString(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java new file mode 100644 index 0000000..249ccfe --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.CoderUtils; + +/** + * A (Key, Coder) pair that uses the structural value of the key (as provided by + * {@link Coder#structuralValue(Object)}) to perform equality and hashing. + */ +class StructuralKey<K> { + /** + * Create a new Structural Key of the provided key that can be encoded by the provided coder. + */ + public static <K> StructuralKey<K> of(K key, Coder<K> coder) { + try { + return new StructuralKey<>(coder, key); + } catch (Exception e) { + throw new IllegalArgumentException( + "Could not encode a key with its provided coder " + coder.getClass().getSimpleName(), e); + } + } + + private final Coder<K> coder; + private final Object structuralValue; + private final byte[] encoded; + + private StructuralKey(Coder<K> coder, K key) throws Exception { + this.coder = coder; + this.structuralValue = coder.structuralValue(key); + this.encoded = CoderUtils.encodeToByteArray(coder, key); + } + + public K getKey() { + try { + return CoderUtils.decodeFromByteArray(coder, encoded); + } catch (CoderException e) { + throw new IllegalArgumentException( + "Could not decode Key with coder of type " + coder.getClass().getSimpleName()); + } + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (other instanceof StructuralKey) { + StructuralKey that = (StructuralKey) other; + return structuralValue.equals(that.structuralValue); + } + return false; + } + + @Override + public int hashCode() { + return structuralValue.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index 92f845c..a4f900c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -26,6 +26,7 @@ import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; @@ -72,17 +73,27 @@ public class GroupByKeyEvaluatorFactoryTest { CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle = bundleFactory.createRootBundle(kvs).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - + StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of()); UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = - bundleFactory.createKeyedBundle(null, "foo", groupedKvs); + bundleFactory.createKeyedBundle(null, fooKey, groupedKvs); + + StructuralKey<String> barKey = StructuralKey.of("bar", StringUtf8Coder.of()); UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle = - bundleFactory.createKeyedBundle(null, "bar", groupedKvs); - UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = - bundleFactory.createKeyedBundle(null, "baz", groupedKvs); + bundleFactory.createKeyedBundle(null, barKey, groupedKvs); - when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle); - when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle); - when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle); + StructuralKey<String> bazKey = StructuralKey.of("baz", StringUtf8Coder.of()); + UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = + bundleFactory.createKeyedBundle(null, bazKey, groupedKvs); + + when(evaluationContext.createKeyedBundle(inputBundle, + fooKey, + groupedKvs)).thenReturn(fooBundle); + when(evaluationContext.createKeyedBundle(inputBundle, + barKey, + groupedKvs)).thenReturn(barBundle); + when(evaluationContext.createKeyedBundle(inputBundle, + bazKey, + groupedKvs)).thenReturn(bazBundle); // The input to a GroupByKey is assumed to be a KvCoder @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index 557ebff..2e7847d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat; import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -75,7 +76,9 @@ public class ImmutabilityCheckingBundleFactoryTest { @Test public void noMutationKeyedBundleSucceeds() { CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); - UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed); + UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, + StructuralKey.of("mykey", StringUtf8Coder.of()), + transformed); WindowedValue<byte[]> windowedArray = WindowedValue.of( @@ -121,7 +124,9 @@ public class ImmutabilityCheckingBundleFactoryTest { @Test public void mutationBeforeAddKeyedBundleSucceeds() { CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); - UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed); + UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, + StructuralKey.of("mykey", StringUtf8Coder.of()), + transformed); byte[] array = new byte[] {4, 8, 12}; array[0] = Byte.MAX_VALUE; @@ -172,7 +177,9 @@ public class ImmutabilityCheckingBundleFactoryTest { @Test public void mutationAfterAddKeyedBundleThrows() { CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); - UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed); + UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, + StructuralKey.of("mykey", StringUtf8Coder.of()), + transformed); byte[] array = new byte[] {4, 8, 12}; WindowedValue<byte[]> windowedArray = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java index 7f202fb..af08d02 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java @@ -31,6 +31,9 @@ import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.Timer import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; @@ -397,16 +400,18 @@ public class InMemoryWatermarkManagerTest implements Serializable { */ @Test public void updateWatermarkWithKeyedWatermarkHolds() { - CommittedBundle<Integer> firstKeyBundle = - bundleFactory.createKeyedBundle(null, "Odd", createdInts) - .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L))) - .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L))) - .commit(clock.now()); + CommittedBundle<Integer> firstKeyBundle = bundleFactory.createKeyedBundle(null, + StructuralKey.of("Odd", StringUtf8Coder.of()), + createdInts) + .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L))) + .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L))) + .commit(clock.now()); - CommittedBundle<Integer> secondKeyBundle = - bundleFactory.createKeyedBundle(null, "Even", createdInts) - .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L))) - .commit(clock.now()); + CommittedBundle<Integer> secondKeyBundle = bundleFactory.createKeyedBundle(null, + StructuralKey.of("Even", StringUtf8Coder.of()), + createdInts) + .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L))) + .commit(clock.now()); manager.updateWatermarks(null, TimerUpdate.empty(), @@ -435,8 +440,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L)))); - CommittedBundle<Integer> fauxFirstKeyTimerBundle = - bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now()); + CommittedBundle<Integer> fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle(null, + StructuralKey.of("Odd", StringUtf8Coder.of()), + createdInts).commit(clock.now()); manager.updateWatermarks(fauxFirstKeyTimerBundle, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), @@ -447,8 +453,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L))); - CommittedBundle<Integer> fauxSecondKeyTimerBundle = - bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now()); + CommittedBundle<Integer> fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle(null, + StructuralKey.of("Even", StringUtf8Coder.of()), + createdInts).commit(clock.now()); manager.updateWatermarks(fauxSecondKeyTimerBundle, TimerUpdate.empty(), result(filtered.getProducingTransformInternal(), @@ -846,13 +853,14 @@ public class InMemoryWatermarkManagerTest implements Serializable { Instant initialFilteredWm = filteredWms.getSynchronizedProcessingOutputTime(); Instant initialFilteredDoubledWm = filteredDoubledWms.getSynchronizedProcessingOutputTime(); + StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of()); CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 8); TimerData pastTimer = TimerData.of(StateNamespaces.global(), new Instant(250L), TimeDomain.PROCESSING_TIME); TimerData futureTimer = TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME); TimerUpdate timers = - TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build(); + TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build(); manager.updateWatermarks(createdBundle, timers, result(filtered.getProducingTransformInternal(), @@ -872,11 +880,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(earlierThan(initialFilteredDoubledWm))); - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firedTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firedTimers = manager.extractFiredTimers(); assertThat( firedTimers.get(filtered.getProducingTransformInternal()) - .get("key") + .get(key) .getTimers(TimeDomain.PROCESSING_TIME), contains(pastTimer)); // Our timer has fired, but has not been completed, so it holds our synchronized processing WM @@ -885,14 +893,14 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> filteredTimerBundle = bundleFactory - .createKeyedBundle(null, "key", filtered) + .createKeyedBundle(null, key, filtered) .commit(BoundedWindow.TIMESTAMP_MAX_VALUE); CommittedBundle<Integer> filteredTimerResult = - bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo) + bundleFactory.createKeyedBundle(null, key, filteredTimesTwo) .commit(filteredWms.getSynchronizedProcessingOutputTime()); // Complete the processing time timer manager.updateWatermarks(filteredTimerBundle, - TimerUpdate.builder("key") + TimerUpdate.builder(key) .withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(), result(filtered.getProducingTransformInternal(), filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), @@ -988,7 +996,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { TimerData upstreamProcessingTimer = TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME); manager.updateWatermarks(created, - TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(), + TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of())) + .setTimer(upstreamProcessingTimer) + .build(), result(filtered.getProducingTransformInternal(), created.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>singleton(filteredBundle)), @@ -1009,7 +1019,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> otherCreated = multiWindowedBundle(createdInts, 4, 8, 12); manager.updateWatermarks(otherCreated, - TimerUpdate.builder("key") + TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of())) .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(), result(filtered.getProducingTransformInternal(), otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()), @@ -1032,8 +1042,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { new Instant(29_919_235L)); Instant upstreamHold = new Instant(2048L); - CommittedBundle<Integer> filteredBundle = - bundleFactory.createKeyedBundle(created, "key", filtered).commit(upstreamHold); + CommittedBundle<Integer> filteredBundle = bundleFactory.createKeyedBundle(created, + StructuralKey.of("key", StringUtf8Coder.of()), + filtered).commit(upstreamHold); manager.updateWatermarks( created, TimerUpdate.empty(), @@ -1053,7 +1064,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void extractFiredTimersReturnsFiredEventTimeTimers() { - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers = manager.extractFiredTimers(); // Watermarks haven't advanced assertThat(initialTimers.entrySet(), emptyIterable()); @@ -1074,7 +1085,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.EVENT_TIME); TimerData lastTimer = TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.EVENT_TIME); - Object key = new Object(); + StructuralKey<byte[]> key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) .setTimer(earliestTimer) @@ -1090,11 +1101,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { new Instant(1000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers = manager.extractFiredTimers(); assertThat( firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<Object, FiredTimers> firstFilteredTimers = + Map<StructuralKey<?>, FiredTimers> firstFilteredTimers = firstTransformFiredTimers.get(filtered.getProducingTransformInternal()); assertThat(firstFilteredTimers.get(key), not(nullValue())); FiredTimers firstFired = firstFilteredTimers.get(key); @@ -1107,11 +1118,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<Object, FiredTimers> secondFilteredTimers = + Map<StructuralKey<?>, FiredTimers> secondFilteredTimers = secondTransformFiredTimers.get(filtered.getProducingTransformInternal()); assertThat(secondFilteredTimers.get(key), not(nullValue())); FiredTimers secondFired = secondFilteredTimers.get(key); @@ -1121,7 +1132,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void extractFiredTimersReturnsFiredProcessingTimeTimers() { - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers = manager.extractFiredTimers(); // Watermarks haven't advanced assertThat(initialTimers.entrySet(), emptyIterable()); @@ -1141,7 +1152,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME); TimerData lastTimer = TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME); - Object key = new Object(); + StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of()); TimerUpdate update = TimerUpdate.builder(key) .setTimer(lastTimer) @@ -1158,11 +1169,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { new Instant(1000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers = manager.extractFiredTimers(); assertThat( firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<Object, FiredTimers> firstFilteredTimers = + Map<StructuralKey<?>, FiredTimers> firstFilteredTimers = firstTransformFiredTimers.get(filtered.getProducingTransformInternal()); assertThat(firstFilteredTimers.get(key), not(nullValue())); FiredTimers firstFired = firstFilteredTimers.get(key); @@ -1176,11 +1187,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<Object, FiredTimers> secondFilteredTimers = + Map<StructuralKey<?>, FiredTimers> secondFilteredTimers = secondTransformFiredTimers.get(filtered.getProducingTransformInternal()); assertThat(secondFilteredTimers.get(key), not(nullValue())); FiredTimers secondFired = secondFilteredTimers.get(key); @@ -1190,7 +1201,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers = manager.extractFiredTimers(); // Watermarks haven't advanced assertThat(initialTimers.entrySet(), emptyIterable()); @@ -1210,7 +1221,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { StateNamespaces.global(), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - Object key = new Object(); + StructuralKey<byte[]> key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) .setTimer(lastTimer) @@ -1227,11 +1238,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { new Instant(1000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers = manager.extractFiredTimers(); assertThat( firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<Object, FiredTimers> firstFilteredTimers = + Map<StructuralKey<?>, FiredTimers> firstFilteredTimers = firstTransformFiredTimers.get(filtered.getProducingTransformInternal()); assertThat(firstFilteredTimers.get(key), not(nullValue())); FiredTimers firstFired = firstFilteredTimers.get(key); @@ -1246,11 +1257,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<Object, FiredTimers> secondFilteredTimers = + Map<StructuralKey<?>, FiredTimers> secondFilteredTimers = secondTransformFiredTimers.get(filtered.getProducingTransformInternal()); assertThat(secondFilteredTimers.get(key), not(nullValue())); FiredTimers secondFired = secondFilteredTimers.get(key); @@ -1271,7 +1282,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { TimerData.of(StateNamespaces.global(), new Instant(2048L), TimeDomain.EVENT_TIME); TimerUpdate update = - TimerUpdate.builder("foo") + TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of())) .withCompletedTimers(ImmutableList.of(completedOne, completedTwo)) .setTimer(set) .deletedTimer(deleted) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java index 1809dc6..abe2a19 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java @@ -19,11 +19,15 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.WithKeys; @@ -69,34 +73,38 @@ public class InProcessBundleFactoryTest { } @Test - public void createRootBundleShouldCreateWithNullKey() { + public void createRootBundleShouldCreateWithEmptyKey() { PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1)); UncommittedBundle<Integer> inFlightBundle = bundleFactory.createRootBundle(pcollection); CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now()); - assertThat(bundle.getKey(), nullValue()); + assertThat(bundle.getKey(), + Matchers.<StructuralKey<?>>equalTo(StructuralKey.of(null, VoidCoder.of()))); } - private void createKeyedBundle(Object key) { + private <T> void createKeyedBundle(Coder<T> coder, T key) throws Exception { PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1)); + StructuralKey skey = StructuralKey.of(key, coder); UncommittedBundle<Integer> inFlightBundle = - bundleFactory.createKeyedBundle(null, key, pcollection); + bundleFactory.createKeyedBundle(null, skey, pcollection); CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now()); - assertThat(bundle.getKey(), equalTo(key)); + assertThat(bundle.getKey(), equalTo(skey)); } @Test - public void keyedWithNullKeyShouldCreateKeyedBundle() { - createKeyedBundle(null); + public void keyedWithNullKeyShouldCreateKeyedBundle() throws Exception { + createKeyedBundle(VoidCoder.of(), null); } @Test - public void keyedWithKeyShouldCreateKeyedBundle() { - createKeyedBundle(new Object()); + public void keyedWithKeyShouldCreateKeyedBundle() throws Exception { + createKeyedBundle(StringUtf8Coder.of(), "foo"); + createKeyedBundle(VarIntCoder.of(), 1234); + createKeyedBundle(ByteArrayCoder.of(), new byte[] {0, 2, 4, 99}); } private <T> CommittedBundle<T> @@ -154,7 +162,7 @@ public class InProcessBundleFactoryTest { assertThat(withed.getElements(), containsInAnyOrder(firstReplacement, secondReplacement)); assertThat(committed.getElements(), containsInAnyOrder(firstValue, secondValue)); - assertThat(withed.getKey(), equalTo(committed.getKey())); + assertThat(withed.getKey(), Matchers.<StructuralKey<?>>equalTo(committed.getKey())); assertThat(withed.getPCollection(), equalTo(committed.getPCollection())); assertThat( withed.getSynchronizedProcessingOutputWatermark(), @@ -203,21 +211,21 @@ public class InProcessBundleFactoryTest { @Test public void createBundleKeyedResultPropagatesKey() { CommittedBundle<KV<String, Integer>> newBundle = - bundleFactory - .createBundle( - bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()), - downstream) - .commit(Instant.now()); - assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo")); + bundleFactory.createBundle( + bundleFactory.createKeyedBundle( + null, + StructuralKey.of("foo", StringUtf8Coder.of()), + created).commit(Instant.now()), + downstream).commit(Instant.now()); + assertThat(newBundle.getKey().getKey(), Matchers.<Object>equalTo("foo")); } @Test public void createKeyedBundleKeyed() { - CommittedBundle<KV<String, Integer>> keyedBundle = - bundleFactory - .createKeyedBundle( - bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream) - .commit(Instant.now()); - assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo")); + CommittedBundle<KV<String, Integer>> keyedBundle = bundleFactory.createKeyedBundle( + bundleFactory.createRootBundle(created).commit(Instant.now()), + StructuralKey.of("foo", StringUtf8Coder.of()), + downstream).commit(Instant.now()); + assertThat(keyedBundle.getKey().getKey(), Matchers.<Object>equalTo("foo")); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java index 10b8721..18db400 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java @@ -31,6 +31,8 @@ import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepCon import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -159,16 +161,16 @@ public class InProcessEvaluationContextTest { @Test public void getExecutionContextSameStepSameKeyState() { InProcessExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + context.getExecutionContext(created.getProducingTransformInternal(), + StructuralKey.of("foo", StringUtf8Coder.of())); StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1"); stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); - context.handleResult( - InProcessBundleFactory.create() - .createKeyedBundle(null, "foo", created) + context.handleResult(InProcessBundleFactory.create() + .createKeyedBundle(null, StructuralKey.of("foo", StringUtf8Coder.of()), created) .commit(Instant.now()), ImmutableList.<TimerData>of(), StepTransformResult.withoutHold(created.getProducingTransformInternal()) @@ -176,7 +178,8 @@ public class InProcessEvaluationContextTest { .build()); InProcessExecutionContext secondFooContext = - context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + context.getExecutionContext(created.getProducingTransformInternal(), + StructuralKey.of("foo", StringUtf8Coder.of())); assertThat( secondFooContext .getOrCreateStepContext("s1", "s1") @@ -190,7 +193,8 @@ public class InProcessEvaluationContextTest { @Test public void getExecutionContextDifferentKeysIndependentState() { InProcessExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + context.getExecutionContext(created.getProducingTransformInternal(), + StructuralKey.of("foo", StringUtf8Coder.of())); StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); @@ -201,7 +205,8 @@ public class InProcessEvaluationContextTest { .add(1); InProcessExecutionContext barContext = - context.getExecutionContext(created.getProducingTransformInternal(), "bar"); + context.getExecutionContext(created.getProducingTransformInternal(), + StructuralKey.of("bar", StringUtf8Coder.of())); assertThat(barContext, not(equalTo(fooContext))); assertThat( barContext @@ -214,7 +219,7 @@ public class InProcessEvaluationContextTest { @Test public void getExecutionContextDifferentStepsIndependentState() { - String myKey = "foo"; + StructuralKey<?> myKey = StructuralKey.of("foo", StringUtf8Coder.of()); InProcessExecutionContext fooContext = context.getExecutionContext(created.getProducingTransformInternal(), myKey); @@ -269,7 +274,7 @@ public class InProcessEvaluationContextTest { @Test public void handleResultStoresState() { - String myKey = "foo"; + StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of()); InProcessExecutionContext fooContext = context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); @@ -359,7 +364,7 @@ public class InProcessEvaluationContextTest { .build(); context.handleResult(null, ImmutableList.<TimerData>of(), holdResult); - String key = "foo"; + StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of()); TimerData toFire = TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); InProcessTransformResult timerResult = @@ -383,11 +388,12 @@ public class InProcessEvaluationContextTest { // Should cause the downstream timer to fire context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult); - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = context.extractFiredTimers(); + Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired = + context.extractFiredTimers(); assertThat( fired, Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal())); - Map<Object, FiredTimers> downstreamFired = + Map<StructuralKey<?>, FiredTimers> downstreamFired = fired.get(downstream.getProducingTransformInternal()); assertThat(downstreamFired, Matchers.<Object>hasKey(key)); @@ -402,23 +408,27 @@ public class InProcessEvaluationContextTest { @Test public void createBundleKeyedResultPropagatesKey() { + StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of()); CommittedBundle<KV<String, Integer>> newBundle = context .createBundle( - bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()), - downstream) - .commit(Instant.now()); - assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo")); + bundleFactory.createKeyedBundle( + null, key, + created).commit(Instant.now()), + downstream).commit(Instant.now()); + assertThat(newBundle.getKey(), Matchers.<StructuralKey<?>>equalTo(key)); } @Test public void createKeyedBundleKeyed() { + StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of()); CommittedBundle<KV<String, Integer>> keyedBundle = - context - .createKeyedBundle( - bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream) - .commit(Instant.now()); - assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo")); + context.createKeyedBundle( + bundleFactory.createRootBundle(created).commit(Instant.now()), + key, + downstream).commit(Instant.now()); + assertThat(keyedBundle.getKey(), + Matchers.<StructuralKey<?>>equalTo(key)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java index 1172a4d..28a3cf6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java @@ -26,6 +26,7 @@ import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; @@ -73,16 +74,28 @@ public class InProcessGroupByKeyOnlyEvaluatorFactoryTest { bundleFactory.createRootBundle(kvs).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = - bundleFactory.createKeyedBundle(null, "foo", groupedKvs); - UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle = - bundleFactory.createKeyedBundle(null, "bar", groupedKvs); - UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = - bundleFactory.createKeyedBundle(null, "baz", groupedKvs); - - when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle); - when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle); - when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle); + StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of()); + UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = bundleFactory.createKeyedBundle( + null, fooKey, + groupedKvs); + StructuralKey<String> barKey = StructuralKey.of("bar", StringUtf8Coder.of()); + UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle = bundleFactory.createKeyedBundle( + null, barKey, + groupedKvs); + StructuralKey<String> bazKey = StructuralKey.of("baz", StringUtf8Coder.of()); + UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = bundleFactory.createKeyedBundle( + null, bazKey, + groupedKvs); + + when(evaluationContext.createKeyedBundle(inputBundle, + fooKey, + groupedKvs)).thenReturn(fooBundle); + when(evaluationContext.createKeyedBundle(inputBundle, + barKey, + groupedKvs)).thenReturn(barBundle); + when(evaluationContext.createKeyedBundle(inputBundle, + bazKey, + groupedKvs)).thenReturn(bazBundle); // The input to a GroupByKey is assumed to be a KvCoder @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java index 5a92ce3..9314f5e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java @@ -18,23 +18,34 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.fail; import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.TypeDescriptor; + +import com.google.common.collect.ImmutableMap; import com.fasterxml.jackson.annotation.JsonValue; + import org.junit.Rule; import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; @@ -43,6 +54,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.Serializable; +import java.util.Map; /** * Tests for basic {@link InProcessPipelineRunner} functionality. @@ -79,6 +91,54 @@ public class InProcessPipelineRunnerTest implements Serializable { result.awaitCompletion(); } + @Test(timeout = 5000L) + public void byteArrayCountShouldSucceed() { + Pipeline p = getPipeline(); + + SerializableFunction<Integer, byte[]> getBytes = new SerializableFunction<Integer, byte[]>() { + @Override + public byte[] apply(Integer input) { + try { + return CoderUtils.encodeToByteArray(VarIntCoder.of(), input); + } catch (CoderException e) { + fail("Unexpected Coder Exception " + e); + throw new AssertionError("Unreachable"); + } + } + }; + TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() { + }; + PCollection<byte[]> foos = + p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td)); + PCollection<byte[]> msync = + p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td)); + PCollection<byte[]> bytes = + PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections()); + PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement()); + PCollection<KV<Integer, Long>> countsBackToString = + counts.apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() { + @Override + public KV<Integer, Long> apply(KV<byte[], Long> input) { + try { + return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), input.getKey()), + input.getValue()); + } catch (CoderException e) { + fail("Unexpected Coder Exception " + e); + throw new AssertionError("Unreachable"); + } + } + })); + + Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder().put(1, 4L) + .put(2, 2L) + .put(3, 1L) + .put(-2, 1L) + .put(-8, 1L) + .put(-16, 1L) + .build(); + PAssert.thatMap(countsBackToString).isEqualTo(expected); + } + @Test public void transformDisplayDataExceptionShouldFail() { DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java index 34a8980..3e01f44 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.state.StateNamespaces; @@ -55,7 +56,7 @@ public class InProcessTimerInternalsTest { MockitoAnnotations.initMocks(this); clock = MockClock.fromInstant(new Instant(0)); - timerUpdateBuilder = TimerUpdate.builder(1234); + timerUpdateBuilder = TimerUpdate.builder(StructuralKey.of(1234, VarIntCoder.of())); internals = InProcessTimerInternals.create(clock, watermarks, timerUpdateBuilder); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java index ca15d9c..1127ed2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java @@ -154,7 +154,7 @@ public class ParDoInProcessEvaluatorTest { when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty()); when( evaluationContext.getExecutionContext( - Mockito.any(AppliedPTransform.class), Mockito.any(Object.class))) + Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class))) .thenReturn(executionContext); when(evaluationContext.createCounterSet()).thenReturn(new CounterSet()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index cecfe01..a6f31c0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -114,8 +114,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { InProcessExecutionContext executionContext = new InProcessExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -199,8 +199,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { InProcessExecutionContext executionContext = new InProcessExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -287,10 +287,12 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.createBundle(inputBundle, elementOutput)) .thenReturn(elementOutputBundle); - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, "myKey", null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); + InProcessExecutionContext executionContext = new InProcessExecutionContext(null, + StructuralKey.of("myKey", StringUtf8Coder.of()), + null, + null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -397,10 +399,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.createBundle(inputBundle, elementOutput)) .thenReturn(elementOutputBundle); - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, "myKey", null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); + InProcessExecutionContext executionContext = new InProcessExecutionContext(null, + StructuralKey.of("myKey", StringUtf8Coder.of()), + null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -419,7 +422,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { assertThat( result.getTimerUpdate(), equalTo( - TimerUpdate.builder("myKey") + TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of())) .setTimer(addedTimer) .setTimer(addedTimer) .setTimer(addedTimer) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java index 236ad17..a1480e5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -90,8 +90,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); InProcessExecutionContext executionContext = new InProcessExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null)) - .thenReturn(executionContext); + when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -142,8 +142,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); InProcessExecutionContext executionContext = new InProcessExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null)) - .thenReturn(executionContext); + when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -204,9 +204,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, "myKey", null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + InProcessExecutionContext executionContext = new InProcessExecutionContext(null, + StructuralKey.of("myKey", StringUtf8Coder.of()), + null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())) .thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -292,6 +294,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { }); PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); + StructuralKey<?> key = StructuralKey.of("myKey", StringUtf8Coder.of()); CommittedBundle<String> inputBundle = bundleFactory.createRootBundle(input).commit(Instant.now()); @@ -301,9 +304,12 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, "myKey", null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + InProcessExecutionContext executionContext = new InProcessExecutionContext(null, + key, + null, + null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())) .thenReturn(executionContext); CounterSet counters = new CounterSet(); when(evaluationContext.createCounterSet()).thenReturn(counters); @@ -316,9 +322,10 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); InProcessTransformResult result = evaluator.finishBundle(); - assertThat( - result.getTimerUpdate(), - equalTo( - TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build())); + assertThat(result.getTimerUpdate(), + equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of())) + .setTimer(addedTimer) + .deletedTimer(deletedTimer) + .build())); } }
