[ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=103493&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103493
 ]

ASF GitHub Bot logged work on BEAM-3776:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/May/18 17:35
            Start Date: 18/May/18 17:35
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #4793: [BEAM-3776] Fix issue 
with merging late windows where a watermark hold could be added behind the 
input watermark.
URL: https://github.com/apache/beam/pull/4793
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 987291c261a..37101ca90f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1450,6 +1450,13 @@
         <scope>test</scope>
       </dependency>
 
+      <dependency>
+        <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-simple</artifactId>
+        <version>${slf4j.version}</version>
+        <scope>test</scope>
+      </dependency>
+
       <dependency>
         <groupId>org.mockito</groupId>
         <artifactId>mockito-core</artifactId>
diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle
index 11d606372ea..3d77c40c103 100644
--- a/runners/core-java/build.gradle
+++ b/runners/core-java/build.gradle
@@ -46,6 +46,6 @@ dependencies {
   shadowTest library.java.hamcrest_core
   shadowTest library.java.mockito_core
   shadowTest library.java.junit
-  shadowTest library.java.slf4j_jdk14
+  shadowTest library.java.slf4j_simple
   shadowTest library.java.jackson_dataformat_yaml
 }
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index f5215ca3239..6e0aa03d259 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -100,6 +100,11 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
@@ -141,7 +146,7 @@
 
     <dependency>
       <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-jdk14</artifactId>
+      <artifactId>slf4j-simple</artifactId>
       <scope>test</scope>
     </dependency>
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index 8285c7245d2..78fc60e8ea4 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -29,9 +27,7 @@
 import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
 
 /**
  * Helpers for merging state.
@@ -212,98 +208,7 @@
     result.addAccum(merged);
   }
 
-  /**
-   * Prefetch all watermark state for {@code address} across all merging 
windows in
-   * {@code context}.
-   */
-  public static <K, W extends BoundedWindow> void prefetchWatermarks(
-      MergingStateAccessor<K, W> context,
-      StateTag<WatermarkHoldState> address) {
-    Map<W, WatermarkHoldState> map = 
context.accessInEachMergingWindow(address);
-    WatermarkHoldState result = context.access(address);
-    if (map.isEmpty()) {
-      // Nothing to prefetch.
-      return;
-    }
-    if (map.size() == 1 && map.values().contains(result)
-        && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
-      // Nothing to change.
-      return;
-    }
-    if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
-      // No need to read existing holds.
-      return;
-    }
-    // Prefetch.
-    for (WatermarkHoldState source : map.values()) {
-      prefetchRead(source);
-    }
-  }
-
   private static void prefetchRead(ReadableState<?> source) {
     source.readLater();
   }
-
-  /**
-   * Merge all watermark state in {@code address} across all merging windows 
in {@code context},
-   * where the final merge result window is {@code mergeResult}.
-   */
-  public static <K, W extends BoundedWindow> void mergeWatermarks(
-      MergingStateAccessor<K, W> context,
-      StateTag<WatermarkHoldState> address,
-      W mergeResult) {
-    mergeWatermarks(
-        context.accessInEachMergingWindow(address).values(), 
context.access(address), mergeResult);
-  }
-
-  /**
-   * Merge all watermark state in {@code sources} (which must include {@code 
result} if non-empty)
-   * into {@code result}, where the final merge result window is {@code 
mergeResult}.
-   */
-  public static <W extends BoundedWindow> void mergeWatermarks(
-      Collection<WatermarkHoldState> sources, WatermarkHoldState result,
-      W resultWindow) {
-    if (sources.isEmpty()) {
-      // Nothing to merge.
-      return;
-    }
-    if (sources.size() == 1 && sources.contains(result)
-        && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
-      // Nothing to merge.
-      return;
-    }
-    if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
-      // Clear sources.
-      for (WatermarkHoldState source : sources) {
-        source.clear();
-      }
-      // Update directly from window-derived hold.
-      Instant hold =
-          result.getTimestampCombiner().assign(resultWindow, 
BoundedWindow.TIMESTAMP_MIN_VALUE);
-      checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
-      result.add(hold);
-    } else {
-      // Prefetch.
-      List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
-      for (WatermarkHoldState source : sources) {
-        futures.add(source);
-      }
-      // Read.
-      List<Instant> outputTimesToMerge = new ArrayList<>(sources.size());
-      for (ReadableState<Instant> future : futures) {
-        Instant sourceOutputTime = future.read();
-        if (sourceOutputTime != null) {
-          outputTimesToMerge.add(sourceOutputTime);
-        }
-      }
-      // Clear sources.
-      for (WatermarkHoldState source : sources) {
-        source.clear();
-      }
-      if (!outputTimesToMerge.isEmpty()) {
-        // Merge and update.
-        result.add(result.getTimestampCombiner().merge(resultWindow, 
outputTimesToMerge));
-      }
-    }
-  }
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index c90887ca953..0562ab1dd4b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -21,6 +21,8 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.WatermarkHoldState;
@@ -97,7 +99,7 @@ public WatermarkHold(TimerInternals timerInternals, 
WindowingStrategy<?, W> wind
    */
   @Nullable
   public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
-    Instant hold = addElementHold(context);
+    Instant hold = addElementHold(context.timestamp(), context);
     if (hold == null) {
       hold = addGarbageCollectionHold(context, false /*paneIsEmpty*/);
     }
@@ -142,11 +144,11 @@ private Instant shift(Instant timestamp, W window) {
    * any downstream computation when it is eventually emitted.
    */
   @Nullable
-  private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext 
context) {
+  private Instant addElementHold(Instant timestamp, ReduceFn<?, ?, ?, 
W>.Context context) {
     // Give the window function a chance to move the hold timestamp forward to 
encourage progress.
     // (A later hold implies less impediment to the output watermark making 
progress, which in
     // turn encourages end-of-window triggers to fire earlier in following 
computations.)
-    Instant elementHold = shift(context.timestamp(), context.window());
+    Instant elementHold = shift(timestamp, context.window());
 
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
@@ -247,8 +249,26 @@ private Instant addGarbageCollectionHold(
   /**
    * Prefetch watermark holds in preparation for merging.
    */
-  public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
-    StateMerging.prefetchWatermarks(state, elementHoldTag);
+  public void prefetchOnMerge(MergingStateAccessor<?, W> context) {
+    Map<W, WatermarkHoldState> map = 
context.accessInEachMergingWindow(elementHoldTag);
+    WatermarkHoldState result = context.access(elementHoldTag);
+    if (map.isEmpty()) {
+      // Nothing to prefetch.
+      return;
+    }
+    if (map.size() == 1 && map.values().contains(result) && 
result.getTimestampCombiner()
+        .dependsOnlyOnEarliestTimestamp()) {
+      // Nothing to merge if our source and sink is the same.
+      return;
+    }
+    if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
+      // No need to read existing holds since we will just clear.
+      return;
+    }
+    // Prefetch.
+    for (WatermarkHoldState source : map.values()) {
+      source.readLater();
+    }
   }
 
   /**
@@ -261,7 +281,49 @@ public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext 
context) {
             + "outputWatermark:{}",
         context.key(), context.window(), 
timerInternals.currentInputWatermarkTime(),
         timerInternals.currentOutputWatermarkTime());
-    StateMerging.mergeWatermarks(context.state(), elementHoldTag, 
context.window());
+    Collection<WatermarkHoldState> sources =
+        context.state().accessInEachMergingWindow(elementHoldTag).values();
+    WatermarkHoldState result = context.state().access(elementHoldTag);
+    if (sources.isEmpty()) {
+      // No element holds to merge.
+    } else if (sources.size() == 1 && sources.contains(result) && 
result.getTimestampCombiner()
+        .dependsOnlyOnEarliestTimestamp()) {
+      // Nothing to merge if our source and sink is the same.
+    } else if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
+      // Clear sources.
+      for (WatermarkHoldState source : sources) {
+        source.clear();
+      }
+      // Update directly from window-derived hold.
+      addElementHold(BoundedWindow.TIMESTAMP_MIN_VALUE, context);
+    } else {
+      // Prefetch.
+      for (WatermarkHoldState source : sources) {
+        source.readLater();
+      }
+      // Read and merge.
+      Instant mergedHold = null;
+      for (ReadableState<Instant> source : sources) {
+        Instant sourceOutputTime = source.read();
+        if (sourceOutputTime != null) {
+          if (mergedHold == null) {
+            mergedHold = sourceOutputTime;
+          } else {
+            mergedHold = result.getTimestampCombiner().merge(
+                context.window(), mergedHold, sourceOutputTime);
+          }
+        }
+      }
+      // Clear sources.
+      for (WatermarkHoldState source : sources) {
+        source.clear();
+      }
+      // Write merged value if there was one.
+      if (mergedHold != null) {
+        result.add(mergedHold);
+      }
+    }
+
     // If we had a cheap way to determine if we have an element hold then we 
could
     // avoid adding an unnecessary end-of-window or garbage collection hold.
     // Simply reading the above merged watermark would impose an additional 
read for the
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 0a3c0d6c5da..25a80bfead0 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -25,6 +25,7 @@
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -38,7 +39,11 @@
 import static org.mockito.Mockito.withSettings;
 
 import com.google.common.collect.Iterables;
+import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
@@ -88,6 +93,8 @@
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests for {@link ReduceFnRunner}. These tests instantiate a full "stack" of
@@ -97,6 +104,8 @@
  */
 @RunWith(JUnit4.class)
 public class ReduceFnRunnerTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReduceFnRunnerTest.class);
+
   @Mock private SideInputReader mockSideInputReader;
   private TriggerStateMachine mockTriggerStateMachine;
   private PCollectionView<Integer> mockView;
@@ -129,6 +138,23 @@ private void injectElement(ReduceFnTester<Integer, ?, 
IntervalWindow> tester, in
     tester.injectElements(TimestampedValue.of(element, new Instant(element)));
   }
 
+  private void injectElements(
+      ReduceFnTester<Integer, ?, IntervalWindow> tester, Iterable<Integer> 
values)
+      throws Exception {
+    doNothing().when(mockTriggerStateMachine).onElement(anyElementContext());
+    List<TimestampedValue<Integer>> timestampedValues = new LinkedList<>();
+    for (int value : values) {
+      timestampedValues.add(TimestampedValue.of(value, new Instant(value)));
+    }
+    tester.injectElements(timestampedValues);
+  }
+
+  private void injectElements(
+      ReduceFnTester<Integer, ?, IntervalWindow> tester, Integer... values)
+      throws Exception {
+    injectElements(tester, Arrays.asList(values));
+  }
+
   private void triggerShouldFinish(TriggerStateMachine mockTrigger) throws 
Exception {
     doAnswer(
             invocation -> {
@@ -873,6 +899,198 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
     tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  @Test
+  public void testWatermarkHoldForLateNewWindow() throws Exception {
+    Duration allowedLateness = Duration.standardMinutes(1);
+    Duration gapDuration = Duration.millis(10);
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
+                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+                .withTrigger(
+                    Repeatedly.forever(
+                        AfterWatermark.pastEndOfWindow()
+                            
.withLateFirings(AfterPane.elementCountAtLeast(1))))
+                .withAllowedLateness(allowedLateness));
+    tester.setAutoAdvanceOutputWatermark(false);
+
+    assertEquals(null, tester.getWatermarkHold());
+    assertEquals(null, tester.getOutputWatermark());
+    tester.advanceInputWatermark(new Instant(40));
+    injectElements(tester, 1);
+    assertThat(tester.getWatermarkHold(), nullValue());
+    injectElements(tester, 10);
+    assertThat(tester.getWatermarkHold(), nullValue());
+  }
+
+  @Test
+  public void testMergingWatermarkHoldLateNewWindowMerged() throws Exception {
+    Duration allowedLateness = Duration.standardMinutes(1);
+    Duration gapDuration = Duration.millis(10);
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
+                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+                .withTrigger(
+                    Repeatedly.forever(
+                        AfterWatermark.pastEndOfWindow()
+                            
.withLateFirings(AfterPane.elementCountAtLeast(1))))
+                .withAllowedLateness(allowedLateness));
+    tester.setAutoAdvanceOutputWatermark(false);
+
+    assertEquals(null, tester.getWatermarkHold());
+    assertEquals(null, tester.getOutputWatermark());
+    tester.advanceInputWatermark(new Instant(24));
+    injectElements(tester, 1);
+    assertThat(tester.getWatermarkHold(), nullValue());
+    injectElements(tester, 14);
+    assertThat(tester.getWatermarkHold(), nullValue());
+    injectElements(tester, 6, 16);
+    // There should now be a watermark hold since the window has extended past 
the input watermark.
+    // The hold should be for the end of the window (last element + 
gapDuration - 1).
+    assertEquals(tester.getWatermarkHold(), new Instant(25));
+    injectElements(tester, 6, 21);
+    // The hold should be extended with the window.
+    assertEquals(tester.getWatermarkHold(), new Instant(30));
+    // Advancing the watermark should remove the hold.
+    tester.advanceInputWatermark(new Instant(31));
+    assertThat(tester.getWatermarkHold(), nullValue());
+    // Late elements added to the window should not generate a hold.
+    injectElements(tester, 0);
+    assertThat(tester.getWatermarkHold(), nullValue());
+    // Generate a new window that is ontime.
+    injectElements(tester, 32, 40);
+    assertEquals(tester.getWatermarkHold(), new Instant(49));
+    // Join the closed window with the new window.
+    injectElements(tester, 24);
+    assertEquals(tester.getWatermarkHold(), new Instant(49));
+    tester.advanceInputWatermark(new Instant(50));
+    assertThat(tester.getWatermarkHold(), nullValue());
+  }
+
+  @Test
+  public void testMergingLateWatermarkHolds() throws Exception {
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
+    Duration gapDuration = Duration.millis(10);
+    Duration allowedLateness = Duration.standardMinutes(100);
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
+                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+                .withTrigger(
+                    Repeatedly.forever(
+                        AfterWatermark.pastEndOfWindow()
+                            
.withLateFirings(AfterPane.elementCountAtLeast(10))))
+                .withAllowedLateness(allowedLateness));
+    tester.setAutoAdvanceOutputWatermark(false);
+
+    // Input watermark -> null
+    assertEquals(null, tester.getWatermarkHold());
+    assertEquals(null, tester.getOutputWatermark());
+
+    tester.advanceInputWatermark(new Instant(20));
+    // Add two late elements that cause a window to merge.
+    injectElements(tester, Arrays.asList(3));
+    assertThat(tester.getWatermarkHold(), nullValue());
+    injectElements(tester, Arrays.asList(4));
+    Instant endOfWindow = new Instant(4).plus(gapDuration);
+    // We expect a GC hold to be one less than the end of window plus the 
allowed lateness.
+    Instant expectedGcHold = endOfWindow.plus(allowedLateness).minus(1);
+    assertEquals(
+        expectedGcHold,
+        tester.getWatermarkHold());
+    tester.advanceInputWatermark(new Instant(1000));
+    assertEquals(
+        expectedGcHold,
+        tester.getWatermarkHold());
+  }
+
+  @Test
+  public void testMergingWatermarkHoldAndLateDataFuzz() throws Exception {
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
+    // Test handling of late data. Specifically, ensure the watermark hold is 
correct.
+    Duration allowedLateness = Duration.standardMinutes(100);
+    long seed = ThreadLocalRandom.current().nextLong();
+    LOG.info("Random seed: {}", seed);
+    Random r = new Random(seed);
+
+    Duration gapDuration = Duration.millis(10 + r.nextInt(40));
+    LOG.info("Gap duration {}", gapDuration);
+
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
+                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+                .withTrigger(
+                    Repeatedly.forever(
+                        AfterWatermark.pastEndOfWindow()
+                            
.withLateFirings(AfterPane.elementCountAtLeast(1))))
+                .withAllowedLateness(allowedLateness));
+    tester.setAutoAdvanceOutputWatermark(true);
+
+    // Input watermark -> null
+    assertEquals(null, tester.getWatermarkHold());
+    assertEquals(null, tester.getOutputWatermark());
+
+    // All on time data, verify watermark hold.
+    List<Integer> times = new LinkedList<>();
+
+    int numTs = 3 + r.nextInt(100);
+    int maxTs = 1 + r.nextInt(400);
+    LOG.info("Num ts {}", numTs);
+    LOG.info("Max ts {}", maxTs);
+    for (int i = numTs; i >= 0; --i) {
+      times.add(r.nextInt(maxTs));
+    }
+    LOG.info("Times: {}", times);
+
+    int split = 0;
+    long watermark = 0;
+    while (split < times.size()) {
+      int nextSplit = split + r.nextInt(times.size());
+      if (nextSplit > times.size()) {
+        nextSplit = times.size();
+      }
+      LOG.info("nextSplit {}", nextSplit);
+      injectElements(tester, times.subList(split, nextSplit));
+      if (r.nextInt(3) == 0) {
+        int nextWatermark = r.nextInt((int) (maxTs + gapDuration.getMillis()));
+        if (nextWatermark > watermark) {
+          Boolean enabled = r.nextBoolean();
+          LOG.info("nextWatermark {} {}", nextWatermark, enabled);
+          watermark = nextWatermark;
+          tester.setAutoAdvanceOutputWatermark(enabled);
+          tester.advanceInputWatermark(new Instant(watermark));
+        }
+      }
+      split = nextSplit;
+      Instant hold = tester.getWatermarkHold();
+      if (hold != null) {
+        assertThat(hold, greaterThanOrEqualTo(new Instant(watermark)));
+        assertThat(watermark, lessThan((maxTs + gapDuration.getMillis())));
+      }
+    }
+    tester.setAutoAdvanceOutputWatermark(true);
+    watermark = gapDuration.getMillis() + maxTs;
+    tester.advanceInputWatermark(new Instant(watermark));
+    LOG.info("Output {}", tester.extractOutput());
+    if (tester.getWatermarkHold() != null) {
+      assertThat(
+          tester.getWatermarkHold(),
+          equalTo(new Instant(watermark).plus(allowedLateness)));
+    }
+    // Nothing dropped.
+    long droppedElements =
+        container
+            .getCounter(
+                MetricName.named(ReduceFnRunner.class, 
ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+            .getCumulative()
+            .longValue();
+    assertEquals(0, droppedElements);
+  }
+
   /** Make sure that if data comes in too late to make it on time, the hold is 
the GC time. */
   @Test
   public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception {
@@ -2037,7 +2255,7 @@ public void setGarbageCollectionHoldOnLateElements() 
throws Exception {
 
     tester.advanceInputWatermark(new Instant(109));
     tester.advanceOutputWatermark(new Instant(109));
-    tester.injectElements(TimestampedValue.of(2,  new Instant(2)));
+    tester.injectElements(TimestampedValue.of(2, new Instant(2)));
     // We should have set a garbage collection hold for the final pane.
     Instant hold = tester.getWatermarkHold();
     assertEquals(new Instant(109), hold);
@@ -2100,7 +2318,9 @@ public Integer extractOutput(Integer accumulator, Context 
c) {
    * A {@link PipelineOptions} to test combining with context.
    */
   public interface TestOptions extends PipelineOptions {
+
     int getValue();
+
     void setValue(int value);
   }
 }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 7ad19c924b0..0ccf6b9f037 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -533,13 +533,15 @@ public void advanceSynchronizedProcessingTime(
    */
   @SafeVarargs
   public final void injectElements(TimestampedValue<InputT>... values) throws 
Exception {
+    injectElements(Arrays.asList(values));
+  }
+
+  public final void injectElements(List<TimestampedValue<InputT>> values) 
throws Exception {
     for (TimestampedValue<InputT> value : values) {
       WindowTracing.trace("TriggerTester.injectElements: {}", value);
     }
 
-    Iterable<WindowedValue<InputT>> inputs =
-        Arrays.asList(values)
-            .stream()
+    Iterable<WindowedValue<InputT>> inputs = values.stream()
             .map(
                 input -> {
                   try {
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
index eb438bac3ce..79c972df997 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
@@ -543,48 +543,6 @@ public void testWatermarkStateIsEmpty() throws Exception {
     assertThat(readFuture.read(), Matchers.is(true));
   }
 
-  @Test
-  public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the merged value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, 
WINDOW_1);
-
-    assertThat(value1.read(), equalTo(new Instant(2000)));
-    assertThat(value2.read(), equalTo(null));
-  }
-
-  @Test
-  public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value3 =
-        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, 
WINDOW_1);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value3.read(), equalTo(new Instant(5000)));
-    assertThat(value1.read(), equalTo(null));
-    assertThat(value2.read(), equalTo(null));
-  }
-
   @Test
   public void testSetReadable() throws Exception {
     SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index 3409d276049..c62c9a75087 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -84,14 +84,6 @@ public void testWatermarkEndOfWindowState() {}
   @Ignore
   public void testWatermarkStateIsEmpty() {}
 
-  @Override
-  @Ignore
-  public void testMergeEarliestWatermarkIntoSource() {}
-
-  @Override
-  @Ignore
-  public void testMergeLatestWatermarkIntoSource() {}
-
   @Override
   @Ignore
   public void testSetReadable() {}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
index aed14f3de0c..e3492f152fa 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
@@ -130,14 +130,6 @@ public void testWatermarkEndOfWindowState() {}
     @Ignore
     public void testWatermarkStateIsEmpty() {}
 
-    @Override
-    @Ignore
-    public void testMergeEarliestWatermarkIntoSource() {}
-
-    @Override
-    @Ignore
-    public void testMergeLatestWatermarkIntoSource() {}
-
     @Override
     @Ignore
     public void testSetReadable() {}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
index 667b5ba3954..256d44ce54f 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
@@ -112,14 +112,6 @@ public void testWatermarkEndOfWindowState() {}
   @Ignore
   public void testWatermarkStateIsEmpty() {}
 
-  @Override
-  @Ignore
-  public void testMergeEarliestWatermarkIntoSource() {}
-
-  @Override
-  @Ignore
-  public void testMergeLatestWatermarkIntoSource() {}
-
   @Override
   @Ignore
   public void testSetReadable() {}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index fc5575cb161..4736d82cf4e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -200,6 +200,11 @@ public long getEstimatedSizeBytes(PipelineOptions options) 
throws Exception {
       return null;
     }
 
+    @Override
+    public boolean requiresDeduping() {
+      return true;
+    }
+
     @Override
     public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 103493)
    Time Spent: 6h  (was: 5h 50m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3776
>                 URL: https://issues.apache.org/jira/browse/BEAM-3776
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>    Affects Versions: 2.1.0, 2.2.0, 2.3.0
>            Reporter: Sam Whittle
>            Assignee: Sam Whittle
>            Priority: Critical
>          Time Spent: 6h
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10))))
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to