This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch FLINK-21661 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 552f867d394ad41b1d8134ad61df25f695d0e8ae Author: Danny Cranmer <[email protected]> AuthorDate: Wed Mar 24 08:48:58 2021 +0000 [FLINK-21661][kinesis] Fix fetch interval for polling consumer (#15157) --- .../polling/AdaptivePollingRecordPublisher.java | 42 ++++--------------- .../publisher/polling/PollingRecordPublisher.java | 48 ++++++++++++++++++---- .../polling/PollingRecordPublisherTest.java | 29 ++++++++++++- 3 files changed, 75 insertions(+), 44 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java index 282d11b..29a4bd5 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java @@ -26,9 +26,9 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; /** - * An adaptive record publisher to add a dynamic loop delay and batch read size for {@link - * PollingRecordPublisher}. Kinesis Streams have quotas on the transactions per second, and - * throughout. This class attempts to balance quotas and mitigate back off errors. + * An adaptive record publisher to add a dynamic batch read size for {@link PollingRecordPublisher}. + * Kinesis Streams have quotas on the transactions per second, and throughout. This class attempts + * to balance quotas and mitigate back off errors. */ @Internal public class AdaptivePollingRecordPublisher extends PollingRecordPublisher { @@ -44,8 +44,6 @@ public class AdaptivePollingRecordPublisher extends PollingRecordPublisher { private int maxNumberOfRecordsPerFetch; - private final long fetchIntervalMillis; - private final PollingRecordPublisherMetricsReporter metricsReporter; AdaptivePollingRecordPublisher( @@ -64,7 +62,6 @@ public class AdaptivePollingRecordPublisher extends PollingRecordPublisher { maxNumberOfRecordsPerFetch, fetchIntervalMillis); this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch; - this.fetchIntervalMillis = fetchIntervalMillis; this.metricsReporter = metricsReporter; } @@ -81,42 +78,19 @@ public class AdaptivePollingRecordPublisher extends PollingRecordPublisher { }, maxNumberOfRecordsPerFetch); - long adjustmentEndTimeNanos = - adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime()); - long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + long endTimeNanos = System.nanoTime(); + long runLoopTimeNanos = endTimeNanos - processingStartTimeNanos; + maxNumberOfRecordsPerFetch = adaptRecordsToRead( runLoopTimeNanos, lastRecordBatchSize, lastRecordBatchSizeInBytes, maxNumberOfRecordsPerFetch); - processingStartTimeNanos = adjustmentEndTimeNanos; - metricsReporter.setRunLoopTimeNanos(runLoopTimeNanos); - return result; - } + processingStartTimeNanos = endTimeNanos; - /** - * Adjusts loop timing to match target frequency if specified. - * - * @param processingStartTimeNanos The start time of the run loop "work" - * @param processingEndTimeNanos The end time of the run loop "work" - * @return The System.nanoTime() after the sleep (if any) - * @throws InterruptedException - */ - private long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) - throws InterruptedException { - long endTimeNanos = processingEndTimeNanos; - if (fetchIntervalMillis != 0) { - long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; - long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); - if (sleepTimeMillis > 0) { - Thread.sleep(sleepTimeMillis); - endTimeNanos = System.nanoTime(); - metricsReporter.setSleepTimeMillis(sleepTimeMillis); - } - } - return endTimeNanos; + return result; } /** diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java index 574196e..70e48a4 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java @@ -60,7 +60,9 @@ public class PollingRecordPublisher implements RecordPublisher { private final int maxNumberOfRecordsPerFetch; - private final long expiredIteratorBackoffMillis; + private final long fetchIntervalMillis; + + private long processingStartTimeNanos = System.nanoTime(); /** * A Polling implementation of {@link RecordPublisher} that polls kinesis for records. The @@ -71,8 +73,7 @@ public class PollingRecordPublisher implements RecordPublisher { * @param metricsReporter a metric reporter used to output metrics * @param kinesisProxy the proxy used to communicate with kinesis * @param maxNumberOfRecordsPerFetch the maximum number of records to retrieve per batch - * @param expiredIteratorBackoffMillis the duration to sleep in the event of an {@link - * ExpiredIteratorException} + * @param fetchIntervalMillis the target interval between each GetRecords invocation */ PollingRecordPublisher( final StartingPosition startingPosition, @@ -80,16 +81,16 @@ public class PollingRecordPublisher implements RecordPublisher { final PollingRecordPublisherMetricsReporter metricsReporter, final KinesisProxyInterface kinesisProxy, final int maxNumberOfRecordsPerFetch, - final long expiredIteratorBackoffMillis) + final long fetchIntervalMillis) throws InterruptedException { this.nextStartingPosition = Preconditions.checkNotNull(startingPosition); this.subscribedShard = Preconditions.checkNotNull(subscribedShard); this.metricsReporter = Preconditions.checkNotNull(metricsReporter); this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy); this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch; - this.expiredIteratorBackoffMillis = expiredIteratorBackoffMillis; + this.fetchIntervalMillis = fetchIntervalMillis; - Preconditions.checkArgument(expiredIteratorBackoffMillis >= 0); + Preconditions.checkArgument(fetchIntervalMillis >= 0); Preconditions.checkArgument(maxNumberOfRecordsPerFetch > 0); this.nextShardItr = getShardIterator(); @@ -118,6 +119,14 @@ public class PollingRecordPublisher implements RecordPublisher { nextStartingPosition = getNextStartingPosition(latestSequenceNumber); nextShardItr = result.getNextShardIterator(); + + long adjustmentEndTimeNanos = + adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime()); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + + processingStartTimeNanos = adjustmentEndTimeNanos; + metricsReporter.setRunLoopTimeNanos(runLoopTimeNanos); + return nextShardItr == null ? COMPLETE : INCOMPLETE; } @@ -168,8 +177,8 @@ public class PollingRecordPublisher implements RecordPublisher { // sleep for the fetch interval before the next getRecords attempt with the // refreshed iterator - if (expiredIteratorBackoffMillis != 0) { - Thread.sleep(expiredIteratorBackoffMillis); + if (fetchIntervalMillis != 0) { + Thread.sleep(fetchIntervalMillis); } } } @@ -188,4 +197,27 @@ public class PollingRecordPublisher implements RecordPublisher { nextStartingPosition.getShardIteratorType().toString(), nextStartingPosition.getStartingMarker()); } + + /** + * Adjusts loop timing to match target frequency if specified. + * + * @param processingStartTimeNanos The start time of the run loop "work" + * @param processingEndTimeNanos The end time of the run loop "work" + * @return The System.nanoTime() after the sleep (if any) + * @throws InterruptedException + */ + private long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + metricsReporter.setSleepTimeMillis(sleepTimeMillis); + } + } + return endTimeNanos; + } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java index 052a922..ad93f9c 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java @@ -34,6 +34,7 @@ import static org.apache.flink.streaming.connectors.kinesis.internals.publisher. import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM; import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls; import static org.junit.Assert.assertEquals; +import static org.mockito.AdditionalMatchers.geq; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -43,6 +44,8 @@ import static org.mockito.Mockito.verify; /** Tests for {@link PollingRecordPublisher}. */ public class PollingRecordPublisherTest { + private static final long FETCH_INTERVAL_MILLIS = 500L; + @Rule public ExpectedException thrown = ExpectedException.none(); @Test @@ -59,6 +62,21 @@ public class PollingRecordPublisherTest { } @Test + public void testRunEmitsRunLoopTimeNanos() throws Exception { + PollingRecordPublisherMetricsReporter metricsReporter = + spy(new PollingRecordPublisherMetricsReporter(mock(MetricGroup.class))); + + KinesisProxyInterface fakeKinesis = totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 5, 100); + PollingRecordPublisher recordPublisher = + createPollingRecordPublisher(fakeKinesis, metricsReporter); + + recordPublisher.run(new TestConsumer()); + + // Expect that the run loop took at least FETCH_INTERVAL_MILLIS in nanos + verify(metricsReporter).setRunLoopTimeNanos(geq(FETCH_INTERVAL_MILLIS * 1_000_000)); + } + + @Test public void testRunReturnsCompleteWhenShardExpires() throws Exception { // There are 2 batches available in the stream KinesisProxyInterface fakeKinesis = totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 2, 100); @@ -136,12 +154,19 @@ public class PollingRecordPublisherTest { PollingRecordPublisherMetricsReporter metricsReporter = new PollingRecordPublisherMetricsReporter(mock(MetricGroup.class)); + return createPollingRecordPublisher(kinesis, metricsReporter); + } + + PollingRecordPublisher createPollingRecordPublisher( + final KinesisProxyInterface kinesis, + final PollingRecordPublisherMetricsReporter metricGroupReporter) + throws Exception { return new PollingRecordPublisher( StartingPosition.restartFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()), TestUtils.createDummyStreamShardHandle(), - metricsReporter, + metricGroupReporter, kinesis, 10000, - 500L); + FETCH_INTERVAL_MILLIS); } }
