This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch FLINK-21661
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 552f867d394ad41b1d8134ad61df25f695d0e8ae
Author: Danny Cranmer <[email protected]>
AuthorDate: Wed Mar 24 08:48:58 2021 +0000

    [FLINK-21661][kinesis] Fix fetch interval for polling consumer (#15157)
---
 .../polling/AdaptivePollingRecordPublisher.java    | 42 ++++---------------
 .../publisher/polling/PollingRecordPublisher.java  | 48 ++++++++++++++++++----
 .../polling/PollingRecordPublisherTest.java        | 29 ++++++++++++-
 3 files changed, 75 insertions(+), 44 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
index 282d11b..29a4bd5 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
@@ -26,9 +26,9 @@ import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 
 /**
- * An adaptive record publisher to add a dynamic loop delay and batch read 
size for {@link
- * PollingRecordPublisher}. Kinesis Streams have quotas on the transactions 
per second, and
- * throughout. This class attempts to balance quotas and mitigate back off 
errors.
+ * An adaptive record publisher to add a dynamic batch read size for {@link 
PollingRecordPublisher}.
+ * Kinesis Streams have quotas on the transactions per second, and throughout. 
This class attempts
+ * to balance quotas and mitigate back off errors.
  */
 @Internal
 public class AdaptivePollingRecordPublisher extends PollingRecordPublisher {
@@ -44,8 +44,6 @@ public class AdaptivePollingRecordPublisher extends 
PollingRecordPublisher {
 
     private int maxNumberOfRecordsPerFetch;
 
-    private final long fetchIntervalMillis;
-
     private final PollingRecordPublisherMetricsReporter metricsReporter;
 
     AdaptivePollingRecordPublisher(
@@ -64,7 +62,6 @@ public class AdaptivePollingRecordPublisher extends 
PollingRecordPublisher {
                 maxNumberOfRecordsPerFetch,
                 fetchIntervalMillis);
         this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
-        this.fetchIntervalMillis = fetchIntervalMillis;
         this.metricsReporter = metricsReporter;
     }
 
@@ -81,42 +78,19 @@ public class AdaptivePollingRecordPublisher extends 
PollingRecordPublisher {
                         },
                         maxNumberOfRecordsPerFetch);
 
-        long adjustmentEndTimeNanos =
-                adjustRunLoopFrequency(processingStartTimeNanos, 
System.nanoTime());
-        long runLoopTimeNanos = adjustmentEndTimeNanos - 
processingStartTimeNanos;
+        long endTimeNanos = System.nanoTime();
+        long runLoopTimeNanos = endTimeNanos - processingStartTimeNanos;
+
         maxNumberOfRecordsPerFetch =
                 adaptRecordsToRead(
                         runLoopTimeNanos,
                         lastRecordBatchSize,
                         lastRecordBatchSizeInBytes,
                         maxNumberOfRecordsPerFetch);
-        processingStartTimeNanos = adjustmentEndTimeNanos;
-        metricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
 
-        return result;
-    }
+        processingStartTimeNanos = endTimeNanos;
 
-    /**
-     * Adjusts loop timing to match target frequency if specified.
-     *
-     * @param processingStartTimeNanos The start time of the run loop "work"
-     * @param processingEndTimeNanos The end time of the run loop "work"
-     * @return The System.nanoTime() after the sleep (if any)
-     * @throws InterruptedException
-     */
-    private long adjustRunLoopFrequency(long processingStartTimeNanos, long 
processingEndTimeNanos)
-            throws InterruptedException {
-        long endTimeNanos = processingEndTimeNanos;
-        if (fetchIntervalMillis != 0) {
-            long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
-            long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos 
/ 1_000_000);
-            if (sleepTimeMillis > 0) {
-                Thread.sleep(sleepTimeMillis);
-                endTimeNanos = System.nanoTime();
-                metricsReporter.setSleepTimeMillis(sleepTimeMillis);
-            }
-        }
-        return endTimeNanos;
+        return result;
     }
 
     /**
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
index 574196e..70e48a4 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
@@ -60,7 +60,9 @@ public class PollingRecordPublisher implements 
RecordPublisher {
 
     private final int maxNumberOfRecordsPerFetch;
 
-    private final long expiredIteratorBackoffMillis;
+    private final long fetchIntervalMillis;
+
+    private long processingStartTimeNanos = System.nanoTime();
 
     /**
      * A Polling implementation of {@link RecordPublisher} that polls kinesis 
for records. The
@@ -71,8 +73,7 @@ public class PollingRecordPublisher implements 
RecordPublisher {
      * @param metricsReporter a metric reporter used to output metrics
      * @param kinesisProxy the proxy used to communicate with kinesis
      * @param maxNumberOfRecordsPerFetch the maximum number of records to 
retrieve per batch
-     * @param expiredIteratorBackoffMillis the duration to sleep in the event 
of an {@link
-     *     ExpiredIteratorException}
+     * @param fetchIntervalMillis the target interval between each GetRecords 
invocation
      */
     PollingRecordPublisher(
             final StartingPosition startingPosition,
@@ -80,16 +81,16 @@ public class PollingRecordPublisher implements 
RecordPublisher {
             final PollingRecordPublisherMetricsReporter metricsReporter,
             final KinesisProxyInterface kinesisProxy,
             final int maxNumberOfRecordsPerFetch,
-            final long expiredIteratorBackoffMillis)
+            final long fetchIntervalMillis)
             throws InterruptedException {
         this.nextStartingPosition = 
Preconditions.checkNotNull(startingPosition);
         this.subscribedShard = Preconditions.checkNotNull(subscribedShard);
         this.metricsReporter = Preconditions.checkNotNull(metricsReporter);
         this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy);
         this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
-        this.expiredIteratorBackoffMillis = expiredIteratorBackoffMillis;
+        this.fetchIntervalMillis = fetchIntervalMillis;
 
-        Preconditions.checkArgument(expiredIteratorBackoffMillis >= 0);
+        Preconditions.checkArgument(fetchIntervalMillis >= 0);
         Preconditions.checkArgument(maxNumberOfRecordsPerFetch > 0);
 
         this.nextShardItr = getShardIterator();
@@ -118,6 +119,14 @@ public class PollingRecordPublisher implements 
RecordPublisher {
 
         nextStartingPosition = getNextStartingPosition(latestSequenceNumber);
         nextShardItr = result.getNextShardIterator();
+
+        long adjustmentEndTimeNanos =
+                adjustRunLoopFrequency(processingStartTimeNanos, 
System.nanoTime());
+        long runLoopTimeNanos = adjustmentEndTimeNanos - 
processingStartTimeNanos;
+
+        processingStartTimeNanos = adjustmentEndTimeNanos;
+        metricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
+
         return nextShardItr == null ? COMPLETE : INCOMPLETE;
     }
 
@@ -168,8 +177,8 @@ public class PollingRecordPublisher implements 
RecordPublisher {
 
                 // sleep for the fetch interval before the next getRecords 
attempt with the
                 // refreshed iterator
-                if (expiredIteratorBackoffMillis != 0) {
-                    Thread.sleep(expiredIteratorBackoffMillis);
+                if (fetchIntervalMillis != 0) {
+                    Thread.sleep(fetchIntervalMillis);
                 }
             }
         }
@@ -188,4 +197,27 @@ public class PollingRecordPublisher implements 
RecordPublisher {
                 nextStartingPosition.getShardIteratorType().toString(),
                 nextStartingPosition.getStartingMarker());
     }
+
+    /**
+     * Adjusts loop timing to match target frequency if specified.
+     *
+     * @param processingStartTimeNanos The start time of the run loop "work"
+     * @param processingEndTimeNanos The end time of the run loop "work"
+     * @return The System.nanoTime() after the sleep (if any)
+     * @throws InterruptedException
+     */
+    private long adjustRunLoopFrequency(long processingStartTimeNanos, long 
processingEndTimeNanos)
+            throws InterruptedException {
+        long endTimeNanos = processingEndTimeNanos;
+        if (fetchIntervalMillis != 0) {
+            long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+            long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos 
/ 1_000_000);
+            if (sleepTimeMillis > 0) {
+                Thread.sleep(sleepTimeMillis);
+                endTimeNanos = System.nanoTime();
+                metricsReporter.setSleepTimeMillis(sleepTimeMillis);
+            }
+        }
+        return endTimeNanos;
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
index 052a922..ad93f9c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
@@ -34,6 +34,7 @@ import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
 import static 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.AdditionalMatchers.geq;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -43,6 +44,8 @@ import static org.mockito.Mockito.verify;
 /** Tests for {@link PollingRecordPublisher}. */
 public class PollingRecordPublisherTest {
 
+    private static final long FETCH_INTERVAL_MILLIS = 500L;
+
     @Rule public ExpectedException thrown = ExpectedException.none();
 
     @Test
@@ -59,6 +62,21 @@ public class PollingRecordPublisherTest {
     }
 
     @Test
+    public void testRunEmitsRunLoopTimeNanos() throws Exception {
+        PollingRecordPublisherMetricsReporter metricsReporter =
+                spy(new 
PollingRecordPublisherMetricsReporter(mock(MetricGroup.class)));
+
+        KinesisProxyInterface fakeKinesis = 
totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 5, 100);
+        PollingRecordPublisher recordPublisher =
+                createPollingRecordPublisher(fakeKinesis, metricsReporter);
+
+        recordPublisher.run(new TestConsumer());
+
+        // Expect that the run loop took at least FETCH_INTERVAL_MILLIS in 
nanos
+        verify(metricsReporter).setRunLoopTimeNanos(geq(FETCH_INTERVAL_MILLIS 
* 1_000_000));
+    }
+
+    @Test
     public void testRunReturnsCompleteWhenShardExpires() throws Exception {
         // There are 2 batches available in the stream
         KinesisProxyInterface fakeKinesis = 
totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 2, 100);
@@ -136,12 +154,19 @@ public class PollingRecordPublisherTest {
         PollingRecordPublisherMetricsReporter metricsReporter =
                 new 
PollingRecordPublisherMetricsReporter(mock(MetricGroup.class));
 
+        return createPollingRecordPublisher(kinesis, metricsReporter);
+    }
+
+    PollingRecordPublisher createPollingRecordPublisher(
+            final KinesisProxyInterface kinesis,
+            final PollingRecordPublisherMetricsReporter metricGroupReporter)
+            throws Exception {
         return new PollingRecordPublisher(
                 
StartingPosition.restartFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()),
                 TestUtils.createDummyStreamShardHandle(),
-                metricsReporter,
+                metricGroupReporter,
                 kinesis,
                 10000,
-                500L);
+                FETCH_INTERVAL_MILLIS);
     }
 }

Reply via email to