[ 
https://issues.apache.org/jira/browse/BEAM-5063?focusedWorklogId=134014&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134014
 ]

ASF GitHub Bot logged work on BEAM-5063:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Aug/18 15:56
            Start Date: 12/Aug/18 15:56
    Worklog Time Spent: 10m 
      Work Description: krzysztof-tr commented on a change in pull request 
#6178: [BEAM-5063] Fix Watermark does not progress for low traffic streams
URL: https://github.com/apache/beam/pull/6178#discussion_r209450092
 
 

 ##########
 File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisWatermark.java
 ##########
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import java.util.function.BooleanSupplier;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** Keeps track of current watermark using {@link MovingFunction}. */
+class KinesisWatermark {
+  /** Period of updates to determine watermark. */
+  private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
+
+  /** Period of samples to determine watermark. */
+  static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
+
+  /**
+   * Period after which watermark should be updated regardless of number of 
samples. It has to be
+   * longer than {@link KinesisWatermark#SAMPLE_PERIOD}, so that for most of 
the cases value
+   * returned from {@link MovingFunction#isSignificant()} is sufficient to 
decide about watermark
+   * update.
+   */
+  static final Duration UPDATE_THRESHOLD = SAMPLE_PERIOD.multipliedBy(2);
+
+  /** Constant representing the maximum Kinesis stream retention period. */
+  static final Duration MAX_KINESIS_STREAM_RETENTION_PERIOD = 
Duration.standardDays(7);
+
+  /** Minimum number of unread messages required before considering updating 
watermark. */
+  static final int MIN_MESSAGES = 10;
+
+  /**
+   * Minimum number of SAMPLE_UPDATE periods over which unread messages should 
be spread before
+   * considering updating watermark.
+   */
+  private static final int MIN_SPREAD = 2;
+
+  private Instant lastWatermark = 
Instant.now().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD);
+  private Instant lastUpdate = new Instant(0L);
+  private final MovingFunction minReadTimestampMsSinceEpoch =
+      new MovingFunction(
+          SAMPLE_PERIOD.getMillis(),
+          SAMPLE_UPDATE.getMillis(),
+          MIN_SPREAD,
+          MIN_MESSAGES,
+          Min.ofLongs());
+
+  public Instant getCurrent(BooleanSupplier shardsUpToDate) {
+    Instant now = Instant.now();
+    Instant readMin = getMinReadTimestamp(now);
+    if (readMin == null) {
+      if (shardsUpToDate.getAsBoolean()) {
+        updateLastWatermark(now, now);
 
 Review comment:
   I think that it makes sense, shard iterators determine if they are up to 
date based on the last call to get records from Kinesis, so it might happen 
that new records will be behind now causing them to be classified as late 
records. Following the suggestion from documentation "this value should be as 
late as possible." using `now.minus(SAMPLE_PERIOD)` looks like good choice here.
   @pawel-kaczmarczyk do you have any thoughts on that?
   
   I've run our test pipelines with suggested change to check if it will have 
any negative effect (I suspect it won't). If everything will look good tomorrow 
I will update the pull request.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 134014)
    Time Spent: 1h 40m  (was: 1.5h)

> Watermark does not progress for low traffic streams
> ---------------------------------------------------
>
>                 Key: BEAM-5063
>                 URL: https://issues.apache.org/jira/browse/BEAM-5063
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.5.0
>            Reporter: Krzysztof Trubalski
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> We have a Dataflow Job copying data from multiple Kinesis streams into Big 
> Query. Recently we have noticed that the watermark on one of the streams 
> frequently gets stuck although data from that stream is still being processed 
> (it progress only when the traffic increases or Dataflow autoscaling feature 
> kicks in).
>   
>  Looking at the CloudWatch statistics for the affected stream, it has a 
> really low traffic rate - only ~1 event every few minutes . After 
> investigation and consulting the issue with Google's Dataflow Team, it looks 
> like with such small amount of data on the stream, the function calculating 
> the watermark in KinesisReader reports progress incorrectly.
>   
>  From my initial investigation, I suspect that the issue might be related to 
> usage of MovingFunction in KinesisReader. In the current implementation, it 
> covers 1 minute period of samples, since obtaining the min value flushes 
> stale values, if the traffic is very low the following call to significance 
> check always returns false (as it relies on the number of samples, and most 
> of them were flushed by get() invocation).
>   
>   
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to