[ 
https://issues.apache.org/jira/browse/BEAM-5063?focusedWorklogId=134284&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134284
 ]

ASF GitHub Bot logged work on BEAM-5063:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Aug/18 20:35
            Start Date: 13/Aug/18 20:35
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #6178: [BEAM-5063] Fix 
Watermark does not progress for low traffic streams
URL: https://github.com/apache/beam/pull/6178
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
index 260cf321fee..ff710b2a48b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
@@ -69,25 +69,21 @@ public MovingFunction(
     numSamples = new int[n];
     Arrays.fill(numSamples, 0);
     currentMsSinceEpoch = -1;
-    currentIndex = -1;
+    currentIndex = 0;
   }
 
   /** Flush stale values. */
   private void flush(long nowMsSinceEpoch) {
     checkArgument(nowMsSinceEpoch >= 0, "Only positive timestamps supported");
-    if (currentIndex < 0) {
-      currentMsSinceEpoch = nowMsSinceEpoch - (nowMsSinceEpoch % 
sampleUpdateMs);
-      currentIndex = 0;
-    }
     checkArgument(nowMsSinceEpoch >= currentMsSinceEpoch, "Attempting to move 
backwards");
     int newBuckets =
         Math.min((int) ((nowMsSinceEpoch - currentMsSinceEpoch) / 
sampleUpdateMs), buckets.length);
+    currentMsSinceEpoch = nowMsSinceEpoch - (nowMsSinceEpoch % sampleUpdateMs);
     while (newBuckets > 0) {
       currentIndex = (currentIndex + 1) % buckets.length;
       buckets[currentIndex] = function.identity();
       numSamples[currentIndex] = 0;
       newBuckets--;
-      currentMsSinceEpoch += sampleUpdateMs;
     }
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
index fbf00319b49..32ca5514a99 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
@@ -107,4 +107,12 @@ public void jumpingSum() {
     assertEquals(1, f.get(SAMPLE_PERIOD + 3 * SAMPLE_UPDATE));
     assertEquals(0, f.get(SAMPLE_PERIOD * 2));
   }
+
+  @Test
+  public void properlyFlushStaleValues() {
+    MovingFunction f = newFunc();
+    f.add(0, 1);
+    f.add(SAMPLE_PERIOD * 3, 1);
+    assertEquals(1, f.get(SAMPLE_PERIOD * 3));
+  }
 }
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index b9167a7e05a..664416075d8 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -22,8 +22,6 @@
 import java.io.IOException;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.util.MovingFunction;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -36,34 +34,16 @@
 class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisReader.class);
-  /** Period of samples to determine watermark. */
-  private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
-
-  /** Period of updates to determine watermark. */
-  private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
-
-  /** Constant representing the maximum Kinesis stream retention period. */
-  static final Duration MAX_KINESIS_STREAM_RETENTION_PERIOD = 
Duration.standardDays(7);
-
-  /** Minimum number of unread messages required before considering updating 
watermark. */
-  static final int MIN_WATERMARK_MESSAGES = 10;
-
-  /**
-   * Minimum number of SAMPLE_UPDATE periods over which unread messages should 
be spread before
-   * considering updating watermark.
-   */
-  private static final int MIN_WATERMARK_SPREAD = 2;
 
   private final SimplifiedKinesisClient kinesis;
   private final KinesisSource source;
   private final CheckpointGenerator initialCheckpointGenerator;
+  private final KinesisWatermark watermark;
+  private final Duration upToDateThreshold;
+  private final Duration backlogBytesCheckThreshold;
   private CustomOptional<KinesisRecord> currentRecord = 
CustomOptional.absent();
-  private MovingFunction minReadTimestampMsSinceEpoch;
-  private Instant lastWatermark = 
Instant.now().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD);
   private long lastBacklogBytes;
   private Instant backlogBytesLastCheckTime = new Instant(0L);
-  private Duration upToDateThreshold;
-  private Duration backlogBytesCheckThreshold;
   private ShardReadersPool shardReadersPool;
 
   KinesisReader(
@@ -75,6 +55,7 @@
         kinesis,
         initialCheckpointGenerator,
         source,
+        new KinesisWatermark(),
         upToDateThreshold,
         Duration.standardSeconds(30));
   }
@@ -83,19 +64,14 @@
       SimplifiedKinesisClient kinesis,
       CheckpointGenerator initialCheckpointGenerator,
       KinesisSource source,
+      KinesisWatermark watermark,
       Duration upToDateThreshold,
       Duration backlogBytesCheckThreshold) {
     this.kinesis = checkNotNull(kinesis, "kinesis");
     this.initialCheckpointGenerator =
         checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
+    this.watermark = watermark;
     this.source = source;
-    this.minReadTimestampMsSinceEpoch =
-        new MovingFunction(
-            SAMPLE_PERIOD.getMillis(),
-            SAMPLE_UPDATE.getMillis(),
-            MIN_WATERMARK_SPREAD,
-            MIN_WATERMARK_MESSAGES,
-            Min.ofLongs());
     this.upToDateThreshold = upToDateThreshold;
     this.backlogBytesCheckThreshold = backlogBytesCheckThreshold;
   }
@@ -121,8 +97,7 @@ public boolean advance() throws IOException {
     currentRecord = shardReadersPool.nextRecord();
     if (currentRecord.isPresent()) {
       Instant approximateArrivalTimestamp = 
currentRecord.get().getApproximateArrivalTimestamp();
-      minReadTimestampMsSinceEpoch.add(
-          Instant.now().getMillis(), approximateArrivalTimestamp.getMillis());
+      watermark.update(approximateArrivalTimestamp);
       return true;
     }
     return false;
@@ -156,17 +131,7 @@ public void close() throws IOException {
 
   @Override
   public Instant getWatermark() {
-    Instant now = Instant.now();
-    long readMin = minReadTimestampMsSinceEpoch.get(now.getMillis());
-    if (readMin == Long.MAX_VALUE && shardReadersPool.allShardsUpToDate()) {
-      lastWatermark = now;
-    } else if (minReadTimestampMsSinceEpoch.isSignificant()) {
-      Instant minReadTime = new Instant(readMin);
-      if (minReadTime.isAfter(lastWatermark)) {
-        lastWatermark = minReadTime;
-      }
-    }
-    return lastWatermark;
+    return watermark.getCurrent(shardReadersPool::allShardsUpToDate);
   }
 
   @Override
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisWatermark.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisWatermark.java
new file mode 100644
index 00000000000..104063cb1dc
--- /dev/null
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisWatermark.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import java.util.function.BooleanSupplier;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Keeps track of current watermark using {@link MovingFunction}. If the 
pipeline is up to date with
+ * the processing, watermark would be around 'now - {@link 
KinesisWatermark#SAMPLE_PERIOD}' for a
+ * stream with steady traffic, and around 'now - {@link 
KinesisWatermark#UPDATE_THRESHOLD}' for a
+ * stream with low traffic.
+ */
+class KinesisWatermark {
+  /** Period of updates to determine watermark. */
+  private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
+
+  /** Period of samples to determine watermark. */
+  static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
+
+  /**
+   * Period after which watermark should be updated regardless of number of 
samples. It has to be
+   * longer than {@link KinesisWatermark#SAMPLE_PERIOD}, so that for most of 
the cases value
+   * returned from {@link MovingFunction#isSignificant()} is sufficient to 
decide about watermark
+   * update.
+   */
+  static final Duration UPDATE_THRESHOLD = SAMPLE_PERIOD.multipliedBy(2);
+
+  /** Constant representing the maximum Kinesis stream retention period. */
+  static final Duration MAX_KINESIS_STREAM_RETENTION_PERIOD = 
Duration.standardDays(7);
+
+  /** Minimum number of unread messages required before considering updating 
watermark. */
+  static final int MIN_MESSAGES = 10;
+
+  /**
+   * Minimum number of SAMPLE_UPDATE periods over which unread messages should 
be spread before
+   * considering updating watermark.
+   */
+  private static final int MIN_SPREAD = 2;
+
+  private Instant lastWatermark = 
Instant.now().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD);
+  private Instant lastUpdate = new Instant(0L);
+  private final MovingFunction minReadTimestampMsSinceEpoch =
+      new MovingFunction(
+          SAMPLE_PERIOD.getMillis(),
+          SAMPLE_UPDATE.getMillis(),
+          MIN_SPREAD,
+          MIN_MESSAGES,
+          Min.ofLongs());
+
+  public Instant getCurrent(BooleanSupplier shardsUpToDate) {
+    Instant now = Instant.now();
+    Instant readMin = getMinReadTimestamp(now);
+    if (readMin == null) {
+      if (shardsUpToDate.getAsBoolean()) {
+        updateLastWatermark(now.minus(SAMPLE_PERIOD), now);
+      }
+    } else if (shouldUpdate(now)) {
+      updateLastWatermark(readMin, now);
+    }
+    return lastWatermark;
+  }
+
+  public void update(Instant recordArrivalTime) {
+    minReadTimestampMsSinceEpoch.add(Instant.now().getMillis(), 
recordArrivalTime.getMillis());
+  }
+
+  private Instant getMinReadTimestamp(Instant now) {
+    long readMin = minReadTimestampMsSinceEpoch.get(now.getMillis());
+    if (readMin == Min.ofLongs().identity()) {
+      return null;
+    } else {
+      return new Instant(readMin);
+    }
+  }
+
+  /**
+   * In case of streams with low traffic, {@link MovingFunction} could never 
get enough samples in
+   * {@link KinesisWatermark#SAMPLE_PERIOD} to move watermark. To prevent this 
situation, we need to
+   * check if watermark is stale (it was not updated during {@link
+   * KinesisWatermark#UPDATE_THRESHOLD}) and force its update if it is.
+   *
+   * @param now - current timestamp
+   * @return should the watermark be updated
+   */
+  private boolean shouldUpdate(Instant now) {
+    boolean hasEnoughSamples = minReadTimestampMsSinceEpoch.isSignificant();
+    boolean isStale = lastUpdate.isBefore(now.minus(UPDATE_THRESHOLD));
+    return hasEnoughSamples || isStale;
+  }
+
+  private void updateLastWatermark(Instant newWatermark, Instant now) {
+    if (newWatermark.isAfter(lastWatermark)) {
+      lastWatermark = newWatermark;
+      lastUpdate = now;
+    }
+  }
+}
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 84b6cabf11a..078f90ce33a 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -22,22 +22,20 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.DateTimeUtils;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.mockito.stubbing.OngoingStubbing;
 
 /** Tests {@link KinesisReader}. */
 @RunWith(MockitoJUnitRunner.class)
@@ -49,11 +47,12 @@
   @Mock private KinesisRecord a, b, c, d;
   @Mock private KinesisSource kinesisSource;
   @Mock private ShardReadersPool shardReadersPool;
+  @Spy private KinesisWatermark watermark = new KinesisWatermark();
 
   private KinesisReader reader;
 
   @Before
-  public void setUp() throws IOException, TransientKinesisException {
+  public void setUp() throws TransientKinesisException {
     when(generator.generate(kinesis))
         .thenReturn(new KinesisReaderCheckpoint(asList(firstCheckpoint, 
secondCheckpoint)));
     when(shardReadersPool.nextRecord()).thenReturn(CustomOptional.absent());
@@ -65,14 +64,14 @@ public void setUp() throws IOException, 
TransientKinesisException {
     reader = createReader(Duration.ZERO);
   }
 
-  private KinesisReader createReader(Duration backlogBytesCheckThreshold)
-      throws TransientKinesisException {
-    KinesisReader kinesisReader =
-        spy(
-            new KinesisReader(
-                kinesis, generator, kinesisSource, Duration.ZERO, 
backlogBytesCheckThreshold));
-    doReturn(shardReadersPool).when(kinesisReader).createShardReadersPool();
-    return kinesisReader;
+  private KinesisReader createReader(Duration backlogBytesCheckThreshold) {
+    return new KinesisReader(
+        kinesis, generator, kinesisSource, watermark, Duration.ZERO, 
backlogBytesCheckThreshold) {
+      @Override
+      ShardReadersPool createShardReadersPool() {
+        return shardReadersPool;
+      }
+    };
   }
 
   @Test
@@ -87,7 +86,7 @@ public void throwsNoSuchElementExceptionIfNoData() throws 
IOException {
   }
 
   @Test
-  public void startReturnsTrueIfSomeDataAvailable() throws IOException, 
TransientKinesisException {
+  public void startReturnsTrueIfSomeDataAvailable() throws IOException {
     when(shardReadersPool.nextRecord())
         .thenReturn(CustomOptional.of(a))
         .thenReturn(CustomOptional.absent());
@@ -96,7 +95,7 @@ public void startReturnsTrueIfSomeDataAvailable() throws 
IOException, TransientK
   }
 
   @Test
-  public void readsThroughAllDataAvailable() throws IOException, 
TransientKinesisException {
+  public void readsThroughAllDataAvailable() throws IOException {
     when(shardReadersPool.nextRecord())
         .thenReturn(CustomOptional.of(c))
         .thenReturn(CustomOptional.absent())
@@ -120,89 +119,33 @@ public void readsThroughAllDataAvailable() throws 
IOException, TransientKinesisE
   }
 
   @Test
-  public void watermarkDoesNotChangeWhenToFewSampleRecords()
-      throws IOException, TransientKinesisException {
-    try {
-      Instant now = Instant.now();
-      DateTimeUtils.setCurrentMillisFixed(now.getMillis());
-      Instant recordsStartTimestamp = now.minus(Duration.standardHours(1));
-      final long timestampMs = recordsStartTimestamp.getMillis();
-      Duration safetyPeriod = Duration.standardMinutes(1);
-      Instant minKinesisWatermark = 
now.minus(KinesisReader.MAX_KINESIS_STREAM_RETENTION_PERIOD);
-
-      prepareRecordsWithArrivalTimestamps(timestampMs, 1, 
KinesisReader.MIN_WATERMARK_MESSAGES / 2);
-
-      for (boolean more = reader.start(); more; more = reader.advance()) {
-        assertThat(reader.getWatermark())
-            .isBetween(
-                minKinesisWatermark.minus(safetyPeriod), 
minKinesisWatermark.plus(safetyPeriod));
-      }
-    } finally {
-      DateTimeUtils.setCurrentMillisSystem();
-    }
-  }
+  public void doesNotUpdateWatermarkWhenRecordsNotAvailable() throws 
IOException {
+    boolean advanced = reader.start();
 
-  @Test
-  public void watermarkAdvancesWhenEnoughRecordsReadRecently()
-      throws IOException, TransientKinesisException {
-    try {
-      Instant now = Instant.now();
-      DateTimeUtils.setCurrentMillisFixed(now.getMillis());
-      Instant recordsStartTimestamp = now.minus(Duration.standardHours(1));
-      long timestampMs = recordsStartTimestamp.getMillis();
-      Duration safetyPeriod = Duration.standardMinutes(1);
-      Instant minKinesisWatermark = 
now.minus(KinesisReader.MAX_KINESIS_STREAM_RETENTION_PERIOD);
-
-      prepareRecordsWithArrivalTimestamps(timestampMs, 1, 
KinesisReader.MIN_WATERMARK_MESSAGES);
-
-      int recordsNeededForWatermarkAdvancing = 
KinesisReader.MIN_WATERMARK_MESSAGES;
-      for (boolean more = reader.start(); more; more = reader.advance()) {
-        if (--recordsNeededForWatermarkAdvancing > 0) {
-          assertThat(reader.getWatermark())
-              .isBetween(
-                  minKinesisWatermark.minus(safetyPeriod), 
minKinesisWatermark.plus(safetyPeriod));
-        } else {
-          assertThat(reader.getWatermark()).isEqualTo(new 
Instant(timestampMs));
-        }
-      }
-    } finally {
-      DateTimeUtils.setCurrentMillisSystem();
-    }
+    assertThat(advanced).isFalse();
+    verify(watermark, never()).update(any());
   }
 
   @Test
-  public void watermarkMonotonicallyIncreases() throws IOException, 
TransientKinesisException {
-    long timestampMs = 1000L;
-
-    prepareRecordsWithArrivalTimestamps(timestampMs, -1, 
KinesisReader.MIN_WATERMARK_MESSAGES * 2);
+  public void updatesWatermarkWhenRecordsAvailable() throws IOException {
+    when(shardReadersPool.nextRecord())
+        .thenReturn(CustomOptional.of(c))
+        .thenReturn(CustomOptional.absent());
+    boolean advanced = reader.start();
 
-    Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    for (boolean more = reader.start(); more; more = reader.advance()) {
-      Instant currentWatermark = reader.getWatermark();
-      assertThat(currentWatermark).isGreaterThanOrEqualTo(lastWatermark);
-      lastWatermark = currentWatermark;
-    }
-    assertThat(reader.advance()).isFalse();
+    assertThat(advanced).isTrue();
+    verify(watermark).update(c.getApproximateArrivalTimestamp());
   }
 
-  private void prepareRecordsWithArrivalTimestamps(
-      long initialTimestampMs, int increment, int count) throws 
TransientKinesisException {
-    long timestampMs = initialTimestampMs;
-    KinesisRecord firstRecord = 
prepareRecordMockWithArrivalTimestamp(timestampMs);
-    OngoingStubbing<CustomOptional<KinesisRecord>> shardReadersPoolStubbing =
-        
when(shardReadersPool.nextRecord()).thenReturn(CustomOptional.of(firstRecord));
-    for (int i = 0; i < count; i++) {
-      timestampMs += increment;
-      KinesisRecord record = 
prepareRecordMockWithArrivalTimestamp(timestampMs);
-      shardReadersPoolStubbing = 
shardReadersPoolStubbing.thenReturn(CustomOptional.of(record));
-    }
-    shardReadersPoolStubbing.thenReturn(CustomOptional.absent());
-  }
+  @Test
+  public void returnsCurrentWatermark() throws IOException {
+    Instant expectedWatermark = new Instant(123456L);
+    doReturn(expectedWatermark).when(watermark).getCurrent(any());
+
+    reader.start();
+    Instant currentWatermark = reader.getWatermark();
 
-  private KinesisRecord prepareRecordMockWithArrivalTimestamp(long 
timestampMs) {
-    KinesisRecord record = mock(KinesisRecord.class);
-    when(record.getApproximateArrivalTimestamp()).thenReturn(new 
Instant(timestampMs));
-    return record;
+    assertThat(currentWatermark).isEqualTo(expectedWatermark);
   }
 
   @Test
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisWatermarkTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisWatermarkTest.java
new file mode 100644
index 00000000000..be6d5f378e1
--- /dev/null
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisWatermarkTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.function.BooleanSupplier;
+import org.joda.time.DateTimeUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+/** Tests {@link KinesisWatermark}. */
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisWatermarkTest {
+  private static final BooleanSupplier SHARDS_UP_TO_DATE = () -> true;
+  private static final BooleanSupplier SHARDS_NOT_UP_TO_DATE = () -> false;
+  private static final BooleanSupplier SHARDS_IRRELEVANT =
+      () -> {
+        throw new AssertionError("Shard status should not be queried");
+      };
+  private final Instant now = Instant.now();
+  private KinesisWatermark watermark;
+
+  @Before
+  public void setUp() {
+    setCurrentTimeTo(now);
+    watermark = new KinesisWatermark();
+  }
+
+  @After
+  public void tearDown() {
+    DateTimeUtils.setCurrentMillisSystem();
+  }
+
+  @Test
+  public void watermarkStartsAtSamplePeriodBehindNowIfShardsUpToDate() {
+    assertThat(watermark.getCurrent(SHARDS_UP_TO_DATE))
+        .isEqualTo(now.minus(KinesisWatermark.SAMPLE_PERIOD));
+  }
+
+  @Test
+  public void watermarkStartsWithMinIfShardsNotUpToDate() {
+    Instant minKinesisWatermark = 
now.minus(KinesisWatermark.MAX_KINESIS_STREAM_RETENTION_PERIOD);
+
+    
assertThat(watermark.getCurrent(SHARDS_NOT_UP_TO_DATE)).isEqualTo(minKinesisWatermark);
+  }
+
+  @Test
+  public void watermarkIsUpdatedToFirstRecordTimestamp() {
+    Instant firstTimestamp = now.minus(Duration.standardHours(1));
+
+    watermark.update(firstTimestamp);
+
+    
assertThat(watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(firstTimestamp);
+  }
+
+  @Test
+  public void 
watermarkIsUpdatedToRecentRecordTimestampIfItIsOlderThanUpdateThreshold() {
+    Instant firstTimestamp = now.minus(Duration.standardHours(1));
+    watermark.update(firstTimestamp);
+    
assertThat(watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(firstTimestamp);
+
+    Instant timeAfterWatermarkUpdateThreshold =
+        now.plus(KinesisWatermark.UPDATE_THRESHOLD.plus(Duration.millis(1)));
+    setCurrentTimeTo(timeAfterWatermarkUpdateThreshold);
+    Instant nextTimestamp = 
timeAfterWatermarkUpdateThreshold.plus(Duration.millis(1));
+    watermark.update(nextTimestamp);
+
+    
assertThat(watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(nextTimestamp);
+  }
+
+  @Test
+  public void watermarkDoesNotChangeWhenTooFewSampleRecordsInSamplePeriod() {
+    Instant firstTimestamp = now.minus(Duration.standardHours(1));
+    watermark.update(firstTimestamp);
+    
assertThat(watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(firstTimestamp);
+
+    setCurrentTimeTo(now.plus(KinesisWatermark.SAMPLE_PERIOD));
+    watermark.update(firstTimestamp);
+    for (int i = 1; i <= KinesisWatermark.MIN_MESSAGES / 2; ++i) {
+      Instant plus = firstTimestamp.plus(Duration.millis(i));
+      watermark.update(plus);
+    }
+
+    
assertThat(watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(firstTimestamp);
+  }
+
+  @Test
+  public void watermarkAdvancesWhenEnoughRecordsReadRecently() {
+    Instant firstTimestamp = now.minus(Duration.standardHours(1));
+    watermark.update(firstTimestamp);
+    
assertThat(watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(firstTimestamp);
+
+    Instant newTimestamp = firstTimestamp.plus(Duration.millis(1));
+    setCurrentTimeTo(now.plus(KinesisWatermark.SAMPLE_PERIOD));
+
+    for (int i = 0; i < KinesisWatermark.MIN_MESSAGES - 1; ++i) {
+      watermark.update(newTimestamp.plus(Duration.millis(i)));
+      
assertThat(watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(firstTimestamp);
+    }
+
+    
watermark.update(newTimestamp.plus(Duration.millis(KinesisWatermark.MIN_MESSAGES
 - 1)));
+    
assertThat(watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(newTimestamp);
+  }
+
+  @Test
+  public void watermarkDoesNotGoBackward() {
+    watermark.update(now);
+    for (int i = 0; i <= KinesisWatermark.MIN_MESSAGES * 2; ++i) {
+      watermark.update(now.minus(Duration.millis(i)));
+      assertThat(watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(now);
+    }
+  }
+
+  private static void setCurrentTimeTo(Instant time) {
+    DateTimeUtils.setCurrentMillisFixed(time.getMillis());
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 134284)
    Time Spent: 3h  (was: 2h 50m)

> Watermark does not progress for low traffic streams
> ---------------------------------------------------
>
>                 Key: BEAM-5063
>                 URL: https://issues.apache.org/jira/browse/BEAM-5063
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.5.0
>            Reporter: Krzysztof Trubalski
>            Assignee: Krzysztof Trubalski
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> We have a Dataflow Job copying data from multiple Kinesis streams into Big 
> Query. Recently we have noticed that the watermark on one of the streams 
> frequently gets stuck although data from that stream is still being processed 
> (it progress only when the traffic increases or Dataflow autoscaling feature 
> kicks in).
>   
>  Looking at the CloudWatch statistics for the affected stream, it has a 
> really low traffic rate - only ~1 event every few minutes . After 
> investigation and consulting the issue with Google's Dataflow Team, it looks 
> like with such small amount of data on the stream, the function calculating 
> the watermark in KinesisReader reports progress incorrectly.
>   
>  From my initial investigation, I suspect that the issue might be related to 
> usage of MovingFunction in KinesisReader. In the current implementation, it 
> covers 1 minute period of samples, since obtaining the min value flushes 
> stale values, if the traffic is very low the following call to significance 
> check always returns false (as it relies on the number of samples, and most 
> of them were flushed by get() invocation).
>   
>   
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to