[ 
https://issues.apache.org/jira/browse/BEAM-4153?focusedWorklogId=94846&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94846
 ]

ASF GitHub Bot logged work on BEAM-4153:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/18 22:59
            Start Date: 24/Apr/18 22:59
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5181: [BEAM-4153] Use 
Generic types in Bundle
URL: https://github.com/apache/beam/pull/5181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java
index 59d3043450a..fb0660e8e3a 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java
@@ -24,7 +24,8 @@
  * An executor that is capable of processing some bundle of input over some 
executable stage or
  * step.
  */
-interface BundleProcessor<BundleT extends Bundle<?>, ExecutableT> {
+interface BundleProcessor<
+    CollectionT, BundleT extends Bundle<?, ? extends CollectionT>, 
ExecutableT> {
   /**
    * Execute the provided bundle using the provided Executable, calling back 
to the {@link
    * CompletionCallback} when execution completes.
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
index 07c3d22d37f..5522dac8627 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
@@ -19,10 +19,8 @@
 package org.apache.beam.runners.direct;
 
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.local.Bundle;
 import org.apache.beam.runners.local.StructuralKey;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -35,7 +33,7 @@
  * a part of at a later point.
  * @param <T> the type of elements contained within this bundle
  */
-interface CommittedBundle<T> extends Bundle<T> {
+interface CommittedBundle<T> extends Bundle<T, PCollection<T>> {
   /**
    * Returns the PCollection that the elements of this bundle belong to.
    */
@@ -43,7 +41,7 @@
   PCollection<T> getPCollection();
 
   /**
-   * Returns the key that was output in the most recent {@link GroupByKey} in 
the
+   * Returns the key that was output in the most recent {@code GroupByKey} in 
the
    * execution of this bundle.
    */
   StructuralKey<?> getKey();
@@ -54,11 +52,16 @@
    */
   Iterable<WindowedValue<T>> getElements();
 
-  @Override
+  /**
+   * Return the minimum timestamp among elements in this bundle.
+   *
+   * <p>This should be equivalent to iterating over all of the elements within 
a bundle and
+   * selecting the minimum timestamp from among them.
+   */
   Instant getMinimumTimestamp();
 
   /**
-   * Returns the processing time output watermark at the time the producing 
{@link PTransform}
+   * Returns the processing time output watermark at the time the producing 
{@code Executable}
    * committed this bundle. Downstream synchronized processing time watermarks 
cannot progress
    * past this point before consuming this bundle.
    *
@@ -67,7 +70,6 @@
    * timers that fired to produce this bundle.
    */
   Instant getSynchronizedProcessingOutputWatermark();
-
   /**
    * Return a new {@link CommittedBundle} that is like this one, except calls 
to
    * {@link #getElements()} will return the provided elements. This bundle is 
unchanged.
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index bfa65cd2d8e..027e661f5bb 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -77,7 +77,8 @@
   private final BundleFactory bundleFactory;
 
   /** The current processing time and event time watermarks and timers. */
-  private final WatermarkManager<AppliedPTransform<?, ?, ?>, PValue> 
watermarkManager;
+  private final WatermarkManager<AppliedPTransform<?, ?, ?>, ? super 
PCollection<?>>
+      watermarkManager;
 
   /** Executes callbacks based on the progression of the watermark. */
   private final WatermarkCallbackExecutor callbackExecutor;
@@ -121,7 +122,7 @@ private EvaluationContext(
 
   public void initialize(
       Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> 
initialInputs) {
-    watermarkManager.initialize(initialInputs);
+    watermarkManager.initialize((Map) initialInputs);
   }
 
   /**
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index ab759287637..cce4d7e1947 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -44,6 +44,7 @@
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -55,7 +56,8 @@
  * EvaluationContext} to execute a {@link Pipeline}.
  */
 final class ExecutorServiceParallelExecutor
-    implements PipelineExecutor, BundleProcessor<CommittedBundle<?>, 
AppliedPTransform<?, ?, ?>> {
+    implements PipelineExecutor,
+        BundleProcessor<PCollection<?>, CommittedBundle<?>, 
AppliedPTransform<?, ?, ?>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
 
   private final int targetParallelism;
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
index e3632697164..4547a69933c 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
@@ -52,7 +52,8 @@
   public static ExecutionDriver create(
       EvaluationContext context,
       DirectGraph graph,
-      BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, ?>> 
bundleProcessor,
+      BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, 
?, ?>>
+          bundleProcessor,
       PipelineMessageReceiver messageReceiver,
       Map<AppliedPTransform<?, ?, ?>, 
ConcurrentLinkedQueue<CommittedBundle<?>>> initialBundles) {
     return new QuiescenceDriver(context, graph, bundleProcessor, 
messageReceiver, initialBundles);
@@ -60,7 +61,8 @@ public static ExecutionDriver create(
 
   private final EvaluationContext evaluationContext;
   private final DirectGraph graph;
-  private final BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, 
?>> bundleProcessor;
+  private final BundleProcessor<PCollection<?>, CommittedBundle<?>, 
AppliedPTransform<?, ?, ?>>
+      bundleProcessor;
   private final PipelineMessageReceiver pipelineMessageReceiver;
 
   private final CompletionCallback defaultCompletionCallback =
@@ -78,7 +80,8 @@ public static ExecutionDriver create(
   private QuiescenceDriver(
       EvaluationContext evaluationContext,
       DirectGraph graph,
-      BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, ?>> 
bundleProcessor,
+      BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, 
?, ?>>
+          bundleProcessor,
       PipelineMessageReceiver pipelineMessageReceiver,
       Map<AppliedPTransform<?, ?, ?>, 
ConcurrentLinkedQueue<CommittedBundle<?>>>
           pendingRootBundles) {
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 86e904655ce..64f242bf1ff 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -54,6 +54,7 @@
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.local.Bundle;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -61,7 +62,6 @@
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
 import org.joda.time.Instant;
 
 /**
@@ -211,7 +211,7 @@ public static WatermarkUpdate fromTimestamps(Instant 
oldTime, Instant currentTim
    */
   @VisibleForTesting static class AppliedPTransformInputWatermark implements 
Watermark {
     private final Collection<? extends Watermark> inputWatermarks;
-    private final SortedMultiset<CommittedBundle<?>> pendingElements;
+    private final SortedMultiset<Bundle<?, ?>> pendingElements;
 
     // This tracks only the quantity of timers at each timestamp, for quickly 
getting the cross-key
     // minimum
@@ -233,7 +233,7 @@ public AppliedPTransformInputWatermark(Collection<? extends 
Watermark> inputWate
       // be consumed without modifications.
       //
       // The same logic is applied for pending timers
-      Ordering<CommittedBundle<?>> pendingBundleComparator =
+      Ordering<Bundle<?, ?>> pendingBundleComparator =
           new 
BundleByElementTimestampComparator().compound(Ordering.arbitrary());
       this.pendingElements =
           TreeMultiset.create(pendingBundleComparator);
@@ -280,11 +280,11 @@ public synchronized WatermarkUpdate refresh() {
       return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
     }
 
-    private synchronized void addPending(CommittedBundle<?> newPending) {
+    private synchronized void addPending(Bundle<?, ?> newPending) {
       pendingElements.add(newPending);
     }
 
-    private synchronized void removePending(CommittedBundle<?> completed) {
+    private synchronized void removePending(Bundle<?, ?> completed) {
       pendingElements.remove(completed);
     }
 
@@ -446,7 +446,7 @@ public synchronized String toString() {
    */
   private static class SynchronizedProcessingTimeInputWatermark implements 
Watermark {
     private final Collection<? extends Watermark> inputWms;
-    private final Collection<CommittedBundle<?>> pendingBundles;
+    private final Collection<Bundle<?, ?>> pendingBundles;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> 
processingTimers;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> 
synchronizedProcessingTimers;
     private final Map<StructuralKey<?>, Table<StateNamespace, String, 
TimerData>> existingTimers;
@@ -494,7 +494,7 @@ public synchronized WatermarkUpdate refresh() {
       for (Watermark input : inputWms) {
         minTime = INSTANT_ORDERING.min(minTime, input.get());
       }
-      for (CommittedBundle<?> bundle : pendingBundles) {
+      for (Bundle<?, ?> bundle : pendingBundles) {
         // TODO: Track elements in the bundle by the processing time they were 
output instead of
         // entire bundles. Requried to support arbitrarily splitting and 
merging bundles between
         // steps
@@ -504,11 +504,11 @@ public synchronized WatermarkUpdate refresh() {
       return WatermarkUpdate.fromTimestamps(oldHold, minTime);
     }
 
-    public synchronized void addPending(CommittedBundle<?> bundle) {
+    public synchronized void addPending(Bundle<?, ?> bundle) {
       pendingBundles.add(bundle);
     }
 
-    public synchronized void removePending(CommittedBundle<?> bundle) {
+    public synchronized void removePending(Bundle<?, ?> bundle) {
       pendingBundles.remove(bundle);
     }
 
@@ -785,7 +785,7 @@ public Instant get() {
    * @param clock the clock to use to determine processing time
    * @param graph the graph representing this pipeline
    */
-  public static WatermarkManager<AppliedPTransform<?, ?, ?>, PValue> create(
+  public static WatermarkManager<AppliedPTransform<?, ?, ?>, ? super 
PCollection<?>> create(
       Clock clock, DirectGraph graph) {
     return new WatermarkManager<>(clock, graph);
   }
@@ -879,13 +879,14 @@ public TransformWatermarks getWatermarks(ExecutableT 
executable) {
     return transformToWatermarks.get(executable);
   }
 
-  public void initialize(Map<ExecutableT, ? extends 
Iterable<CommittedBundle<?>>> initialBundles) {
+  public void initialize(
+      Map<ExecutableT, ? extends Iterable<Bundle<?,  CollectionT>>> 
initialBundles) {
     refreshLock.lock();
     try {
-      for (Map.Entry<ExecutableT, ? extends Iterable<CommittedBundle<?>>> 
rootEntry :
+      for (Map.Entry<ExecutableT, ? extends Iterable<Bundle<?, CollectionT>>> 
rootEntry :
           initialBundles.entrySet()) {
         TransformWatermarks rootWms = 
transformToWatermarks.get(rootEntry.getKey());
-        for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
+        for (Bundle<?, ? extends CollectionT> initialBundle : 
rootEntry.getValue()) {
           rootWms.addPending(initialBundle);
         }
         pendingRefreshes.add(rootEntry.getKey());
@@ -920,11 +921,11 @@ public void initialize(Map<ExecutableT, ? extends 
Iterable<CommittedBundle<?>>>
    * @param earliestHold the earliest watermark hold in the executable's state.
    */
   public void updateWatermarks(
-      @Nullable CommittedBundle<?> completed,
+      @Nullable Bundle<?, ? extends CollectionT> completed,
       TimerUpdate timerUpdate,
       ExecutableT executable,
-      @Nullable CommittedBundle<?> unprocessedInputs,
-      Iterable<? extends CommittedBundle<?>> outputs,
+      @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
+      Iterable<? extends Bundle<?, ? extends CollectionT>> outputs,
       Instant earliestHold) {
     pendingUpdates.offer(
         PendingWatermarkUpdate.create(
@@ -961,16 +962,16 @@ private void applyAllPendingUpdates() {
   @GuardedBy("refreshLock")
   private void applyNUpdates(int numUpdates) {
     for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates 
<= 0); i++) {
-      PendingWatermarkUpdate<ExecutableT> pending = pendingUpdates.poll();
+      PendingWatermarkUpdate<ExecutableT, CollectionT> pending = 
pendingUpdates.poll();
       applyPendingUpdate(pending);
       pendingRefreshes.add(pending.getExecutable());
     }
   }
 
   /** Apply a {@link PendingWatermarkUpdate} to the {@link WatermarkManager}. 
*/
-  private void applyPendingUpdate(PendingWatermarkUpdate<ExecutableT> pending) 
{
+  private void applyPendingUpdate(PendingWatermarkUpdate<ExecutableT, 
CollectionT> pending) {
     ExecutableT executable = pending.getExecutable();
-    CommittedBundle<?> inputBundle = pending.getInputBundle();
+    Bundle<?, ? extends CollectionT> inputBundle = pending.getInputBundle();
 
     updatePending(
         inputBundle,
@@ -996,19 +997,19 @@ private void 
applyPendingUpdate(PendingWatermarkUpdate<ExecutableT> pending) {
    * watermark but the element it produced is not yet pending. This can cause 
the watermark to
    * erroneously advance.
    *
-   * <p>See {@link #updateWatermarks(CommittedBundle, TimerUpdate, Object, 
CommittedBundle,
+   * <p>See {@link #updateWatermarks(Bundle, TimerUpdate, Object, Bundle,
    * Iterable, Instant)} for information about the parameters of this method.
    */
   private void updatePending(
-      CommittedBundle<?> input,
+      Bundle<?, ? extends CollectionT> input,
       TimerUpdate timerUpdate,
       ExecutableT executable,
-      @Nullable CommittedBundle<?> unprocessedInputs,
-      Iterable<? extends CommittedBundle<?>> outputs) {
+      @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
+      Iterable<? extends Bundle<?, ? extends CollectionT>> outputs) {
     // Newly pending elements must be added before completed elements are 
removed, as the two
     // do not share a Mutex within this call and thus can be interleaved with 
external calls to
     // refresh.
-    for (CommittedBundle<?> bundle : outputs) {
+    for (Bundle<?, ? extends CollectionT> bundle : outputs) {
       for (ExecutableT consumer :
           // TODO: Remove this cast once CommittedBundle returns a CollectionT
           graph.getPerElementConsumers((CollectionT) bundle.getPCollection())) 
{
@@ -1271,12 +1272,12 @@ private void setEventTimeHold(Object key, Instant 
newHold) {
       outputWatermark.updateHold(key, newHold);
     }
 
-    private void removePending(CommittedBundle<?> bundle) {
+    private void removePending(Bundle<?, ?> bundle) {
       inputWatermark.removePending(bundle);
       synchronizedProcessingInputWatermark.removePending(bundle);
     }
 
-    private void addPending(CommittedBundle<?> bundle) {
+    private void addPending(Bundle<?, ?> bundle) {
       inputWatermark.addPending(bundle);
       synchronizedProcessingInputWatermark.addPending(bundle);
     }
@@ -1525,10 +1526,10 @@ public String toString() {
     }
   }
 
-  private static class BundleByElementTimestampComparator extends 
Ordering<CommittedBundle<?>>
+  private static class BundleByElementTimestampComparator extends 
Ordering<Bundle<?, ?>>
       implements Serializable {
     @Override
-    public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
+    public int compare(Bundle<?, ?> o1, Bundle<?, ?> o2) {
       return ComparisonChain.start()
           .compare(o1.getMinimumTimestamp(), o2.getMinimumTimestamp())
           .result();
@@ -1536,28 +1537,29 @@ public int compare(CommittedBundle<?> o1, 
CommittedBundle<?> o2) {
   }
 
   @AutoValue
-  abstract static class PendingWatermarkUpdate<ExecutableT> {
+  abstract static class PendingWatermarkUpdate<ExecutableT, CollectionT> {
     abstract ExecutableT getExecutable();
 
     @Nullable
-    abstract CommittedBundle<?> getInputBundle();
+    abstract Bundle<?, ? extends CollectionT> getInputBundle();
 
     abstract TimerUpdate getTimerUpdate();
 
     @Nullable
-    abstract CommittedBundle<?> getUnprocessedInputs();
+    abstract Bundle<?, ? extends CollectionT> getUnprocessedInputs();
 
-    abstract Iterable<? extends CommittedBundle<?>> getOutputs();
+    abstract Iterable<? extends Bundle<?, ? extends CollectionT>> getOutputs();
 
     abstract Instant getEarliestHold();
 
-    public static <ExecutableT> PendingWatermarkUpdate<ExecutableT> create(
-        ExecutableT executable,
-        @Nullable CommittedBundle<?> inputBundle,
-        TimerUpdate timerUpdate,
-        @Nullable CommittedBundle<?> unprocessedInputs,
-        Iterable<? extends CommittedBundle<?>> outputs,
-        Instant earliestHold) {
+    public static <ExecutableT, CollectionT>
+        PendingWatermarkUpdate<ExecutableT, CollectionT> create(
+            ExecutableT executable,
+            @Nullable Bundle<?, ? extends CollectionT> inputBundle,
+            TimerUpdate timerUpdate,
+            @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
+            Iterable<? extends Bundle<?, ? extends CollectionT>> outputs,
+            Instant earliestHold) {
       return new AutoValue_WatermarkManager_PendingWatermarkUpdate<>(
           executable, inputBundle, timerUpdate, unprocessedInputs, outputs, 
earliestHold);
     }
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 03764c1a1ba..d105efe468b 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -65,7 +65,6 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -101,7 +100,7 @@
   private transient PCollection<Integer> intsToFlatten;
   private transient PCollection<Integer> flattened;
 
-  private transient WatermarkManager<AppliedPTransform<?, ?, ?>, PValue> 
manager;
+  private transient WatermarkManager<AppliedPTransform<?, ?, ?>, ? super 
PCollection<?>> manager;
   private transient BundleFactory bundleFactory;
   private DirectGraph graph;
 
@@ -306,7 +305,7 @@ public void getWatermarkMultiIdenticalInput() {
 
     AppliedPTransform<?, ?, ?> theFlatten = graph.getProducer(multiConsumer);
 
-    WatermarkManager<AppliedPTransform<?, ?, ?>, PValue> tstMgr =
+    WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> 
tstMgr =
         WatermarkManager.create(clock, graph);
     CommittedBundle<Void> root =
         bundleFactory
@@ -323,7 +322,7 @@ public void getWatermarkMultiIdenticalInput() {
         ImmutableMap.<AppliedPTransform<?, ?, ?>, 
Collection<CommittedBundle<?>>>builder()
             .put(graph.getProducer(created), Collections.singleton(root))
             .build();
-    tstMgr.initialize(initialInputs);
+    tstMgr.initialize((Map) initialInputs);
     tstMgr.updateWatermarks(
         root,
         TimerUpdate.empty(),
diff --git 
a/runners/local-java/src/main/java/org/apache/beam/runners/local/Bundle.java 
b/runners/local-java/src/main/java/org/apache/beam/runners/local/Bundle.java
index 959494b2cff..cab9e18128d 100644
--- a/runners/local-java/src/main/java/org/apache/beam/runners/local/Bundle.java
+++ b/runners/local-java/src/main/java/org/apache/beam/runners/local/Bundle.java
@@ -18,11 +18,24 @@
 
 package org.apache.beam.runners.local;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.joda.time.Instant;
 
 /** An immutable collection of elements which are part of a {@code 
PCollection}. */
-public interface Bundle<T> extends Iterable<WindowedValue<T>> {
+public interface Bundle<T, CollectionT> extends Iterable<WindowedValue<T>> {
+  /**
+   * Returns the PCollection that the elements of this bundle belong to.
+   */
+  @Nullable
+  CollectionT getPCollection();
+
+  /**
+   * Returns the key that was output in the most recent {@code GroupByKey} in 
the
+   * execution of this bundle.
+   */
+  StructuralKey<?> getKey();
+
   /**
    * Return the minimum timestamp among elements in this bundle.
    *
@@ -30,4 +43,16 @@
    * selecting the minimum timestamp from among them.
    */
   Instant getMinimumTimestamp();
+
+  /**
+   * Returns the processing time output watermark at the time the producing 
{@code Executable}
+   * committed this bundle. Downstream synchronized processing time watermarks 
cannot progress
+   * past this point before consuming this bundle.
+   *
+   * <p>This value is no greater than the earliest incomplete processing time 
or synchronized
+   * processing time {@link TimerData timer} at the time this bundle was 
committed, including any
+   * timers that fired to produce this bundle.
+   */
+  Instant getSynchronizedProcessingOutputWatermark();
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 94846)
    Time Spent: 50m  (was: 40m)

> Performance tests for spark runner fail
> ---------------------------------------
>
>                 Key: BEAM-4153
>                 URL: https://issues.apache.org/jira/browse/BEAM-4153
>             Project: Beam
>          Issue Type: Bug
>          Components: testing
>            Reporter: Etienne Chauchot
>            Assignee: Jason Kuster
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The error seems to be that one:
> BigQuery error in load operation: Error processing job
> 'apache-beam-testing:bqjob_r2527a0e444514f2b_00000162f128db2b_1': Invalid 
> schema
> update. Field timestamp has changed type from TIMESTAMP to FLOAT



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to