asfgit closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard
watermark support
URL: https://github.com/apache/flink/pull/6980
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 407a5a95524..3c5e3c7303f 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -31,6 +31,7 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
@@ -78,6 +79,22 @@
* A custom assigner implementation can be set via {@link
#setShardAssigner(KinesisShardAssigner)} to optimize the
* hash function or use static overrides to limit skew.
*
+ * <p>In order for the consumer to emit watermarks, a timestamp assigner needs
to be set via {@link
+ * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto
watermark emit
+ * interval configured via {@link
+ * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
+ *
+ * <p>Watermarks can only advance when all shards of a subtask continuously
deliver records. To
+ * avoid an inactive or closed shard to block the watermark progress, the idle
timeout should be
+ * configured via configuration property {@link
+ * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards
won't be considered
+ * idle and watermark calculation will wait for newer records to arrive from
all shards.
+ *
+ * <p>Note that re-sharding of the Kinesis stream while an application (that
relies on
+ * the Kinesis records for watermarking) is running can lead to incorrect late
events.
+ * This depends on how shards are assigned to subtasks and applies regardless
of whether watermarks
+ * are generated in the source or a downstream operator.
+ *
* @param <T> the type of data emitted
*/
@PublicEvolving
@@ -108,6 +125,8 @@
*/
private KinesisShardAssigner shardAssigner =
KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
+ private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+
//
------------------------------------------------------------------------
// Runtime state
//
------------------------------------------------------------------------
@@ -213,13 +232,28 @@ public KinesisShardAssigner getShardAssigner() {
/**
* Provide a custom assigner to influence how shards are distributed
over subtasks.
- * @param shardAssigner
+ * @param shardAssigner shard assigner
*/
public void setShardAssigner(KinesisShardAssigner shardAssigner) {
this.shardAssigner = checkNotNull(shardAssigner, "function can
not be null");
ClosureCleaner.clean(shardAssigner, true);
}
+ public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner()
{
+ return periodicWatermarkAssigner;
+ }
+
+ /**
+ * Set the assigner that will extract the timestamp from {@link T} and
calculate the
+ * watermark.
+ * @param periodicWatermarkAssigner periodic watermark assigner
+ */
+ public void setPeriodicWatermarkAssigner(
+ AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
+ this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+ ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
+ }
+
//
------------------------------------------------------------------------
// Source life cycle
//
------------------------------------------------------------------------
@@ -414,7 +448,7 @@ public void snapshotState(FunctionSnapshotContext context)
throws Exception {
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
- return new KinesisDataFetcher<>(streams, sourceContext,
runtimeContext, configProps, deserializationSchema, shardAssigner);
+ return new KinesisDataFetcher<>(streams, sourceContext,
runtimeContext, configProps, deserializationSchema, shardAssigner,
periodicWatermarkAssigner);
}
@VisibleForTesting
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 443b19ec382..42e2173474b 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -140,6 +140,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The config to turn on adaptive reads from a shard. */
public static final String SHARD_USE_ADAPTIVE_READS =
"flink.shard.adaptivereads";
+ /** The interval after which to consider a shard idle for purposes of
watermark generation. */
+ public static final String SHARD_IDLE_INTERVAL_MILLIS =
"flink.shard.idle.interval";
//
------------------------------------------------------------------------
// Default values for consumer configuration
@@ -190,6 +192,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
public static final boolean DEFAULT_SHARD_USE_ADAPTIVE_READS = false;
+ public static final long DEFAULT_SHARD_IDLE_INTERVAL_MILLIS = -1;
+
/**
* To avoid shard iterator expires in {@link ShardConsumer}s, the value
for the configured
* getRecords interval can not exceed 5 minutes, which is the expire
time for retrieved iterators.
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 0981b76ce89..77ca23c9d37 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -21,7 +21,10 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import
org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
@@ -35,7 +38,10 @@
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -51,6 +57,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -182,6 +189,28 @@
private volatile boolean running = true;
+ private final AssignerWithPeriodicWatermarks<T>
periodicWatermarkAssigner;
+
+ /**
+ * The watermark related state for each shard consumer. Entries in this
map will be created when shards
+ * are discovered. After recovery, this shard map will be recreated,
possibly with different shard index keys,
+ * since those are transient and not part of checkpointed state.
+ */
+ private ConcurrentHashMap<Integer, ShardWatermarkState> shardWatermarks
= new ConcurrentHashMap<>();
+
+ /**
+ * The most recent watermark, calculated from the per shard watermarks.
The initial value will never be emitted and
+ * also apply after recovery. The fist watermark that will be emitted
is derived from actually consumed records.
+ * In case of recovery and replay, the watermark will rewind,
consistent wth the shard consumer sequence.
+ */
+ private long lastWatermark = Long.MIN_VALUE;
+
+ /**
+ * The time span since last consumed record, after which a shard will
be considered idle for purpose of watermark
+ * calculation. A positive value will allow the watermark to progress
even when some shards don't receive new records.
+ */
+ private long shardIdleIntervalMillis =
ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS;
+
/**
* Factory to create Kinesis proxy instances used by a fetcher.
*/
@@ -203,7 +232,8 @@ public KinesisDataFetcher(List<String> streams,
RuntimeContext
runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema,
- KinesisShardAssigner
shardAssigner) {
+ KinesisShardAssigner
shardAssigner,
+
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
this(streams,
sourceContext,
sourceContext.getCheckpointLock(),
@@ -211,6 +241,7 @@ public KinesisDataFetcher(List<String> streams,
configProps,
deserializationSchema,
shardAssigner,
+ periodicWatermarkAssigner,
new AtomicReference<>(),
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
@@ -225,6 +256,7 @@ protected KinesisDataFetcher(List<String> streams,
Properties
configProps,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner shardAssigner,
+
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
AtomicReference<Throwable> error,
List<KinesisStreamShardState> subscribedShardsState,
HashMap<String,
String> subscribedStreamsToLastDiscoveredShardIds,
@@ -238,6 +270,7 @@ protected KinesisDataFetcher(List<String> streams,
this.indexOfThisConsumerSubtask =
runtimeContext.getIndexOfThisSubtask();
this.deserializationSchema =
checkNotNull(deserializationSchema);
this.shardAssigner = checkNotNull(shardAssigner);
+ this.periodicWatermarkAssigner = periodicWatermarkAssigner;
this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
this.kinesis = kinesisProxyFactory.create(configProps);
@@ -339,6 +372,19 @@ public void runFetcher() throws Exception {
}
}
+ // start periodic watermark emitter, if a watermark assigner was
configured
+ if (periodicWatermarkAssigner != null) {
+ long periodicWatermarkIntervalMillis =
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+ if (periodicWatermarkIntervalMillis > 0) {
+ ProcessingTimeService timerService =
((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
+ LOG.info("Starting periodic watermark emitter
with interval {}", periodicWatermarkIntervalMillis);
+ new PeriodicWatermarkEmitter(timerService,
periodicWatermarkIntervalMillis).start();
+ }
+ this.shardIdleIntervalMillis = Long.parseLong(
+
getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
+
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
+ }
+
//
------------------------------------------------------------------------
// finally, start the infinite shard discovery and consumer
launching loop;
@@ -546,6 +592,18 @@ protected Properties getConsumerConfiguration() {
* @param lastSequenceNumber the last sequence number value to update
*/
protected void emitRecordAndUpdateState(T record, long recordTimestamp,
int shardStateIndex, SequenceNumber lastSequenceNumber) {
+ // track per shard watermarks and emit timestamps extracted
from the record,
+ // when a watermark assigner was configured.
+ if (periodicWatermarkAssigner != null) {
+ ShardWatermarkState sws =
shardWatermarks.get(shardStateIndex);
+ Preconditions.checkNotNull(
+ sws, "shard watermark state initialized in
registerNewSubscribedShardState");
+ recordTimestamp =
+
sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp);
+ sws.lastRecordTimestamp = recordTimestamp;
+ sws.lastUpdated = getCurrentTimeMillis();
+ }
+
synchronized (checkpointLock) {
if (record != null) {
sourceContext.collectWithTimestamp(record,
recordTimestamp);
@@ -609,7 +667,110 @@ public int
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
this.numberOfActiveShards.incrementAndGet();
}
- return subscribedShardsState.size() - 1;
+ int shardStateIndex = subscribedShardsState.size() - 1;
+
+ // track all discovered shards for watermark
determination
+ ShardWatermarkState sws =
shardWatermarks.get(shardStateIndex);
+ if (sws == null) {
+ sws = new ShardWatermarkState();
+ try {
+ sws.periodicWatermarkAssigner =
InstantiationUtil.clone(periodicWatermarkAssigner);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to
instantiate new WatermarkAssigner", e);
+ }
+ sws.lastUpdated = getCurrentTimeMillis();
+ sws.lastRecordTimestamp = Long.MIN_VALUE;
+ shardWatermarks.put(shardStateIndex, sws);
+ }
+
+ return shardStateIndex;
+ }
+ }
+
+ /**
+ * Return the current system time. Allow tests to override this to
simulate progress for watermark
+ * logic.
+ *
+ * @return current processing time
+ */
+ @VisibleForTesting
+ protected long getCurrentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * Called periodically to emit a watermark. Checks all shards for the
current event time
+ * watermark, and possibly emits the next watermark.
+ *
+ * <p>Shards that have not received an update for a certain interval
are considered inactive so as
+ * to not hold back the watermark indefinitely. When all shards are
inactive, the subtask will be
+ * marked as temporarily idle to not block downstream operators.
+ */
+ @VisibleForTesting
+ protected void emitWatermark() {
+ LOG.debug("Evaluating watermark for subtask {} time {}",
indexOfThisConsumerSubtask, getCurrentTimeMillis());
+ long potentialWatermark = Long.MAX_VALUE;
+ long idleTime =
+ (shardIdleIntervalMillis > 0)
+ ? getCurrentTimeMillis() -
shardIdleIntervalMillis
+ : Long.MAX_VALUE;
+
+ for (Map.Entry<Integer, ShardWatermarkState> e :
shardWatermarks.entrySet()) {
+ // consider only active shards, or those that would
advance the watermark
+ Watermark w =
e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
+ if (w != null && (e.getValue().lastUpdated >= idleTime
|| w.getTimestamp() > lastWatermark)) {
+ potentialWatermark =
Math.min(potentialWatermark, w.getTimestamp());
+ }
+ }
+
+ // advance watermark if possible (watermarks can only be
ascending)
+ if (potentialWatermark == Long.MAX_VALUE) {
+ if (shardWatermarks.isEmpty() ||
shardIdleIntervalMillis > 0) {
+ LOG.debug("No active shard for subtask {},
marking the source idle.",
+ indexOfThisConsumerSubtask);
+ // no active shard, signal downstream operators
to not wait for a watermark
+ sourceContext.markAsTemporarilyIdle();
+ }
+ } else if (potentialWatermark > lastWatermark) {
+ LOG.debug("Emitting watermark {} from subtask {}",
+ potentialWatermark,
+ indexOfThisConsumerSubtask);
+ sourceContext.emitWatermark(new
Watermark(potentialWatermark));
+ lastWatermark = potentialWatermark;
+ }
+ }
+
+ /** Per shard tracking of watermark and last activity. */
+ private static class ShardWatermarkState<T> {
+ private AssignerWithPeriodicWatermarks<T>
periodicWatermarkAssigner;
+ private volatile long lastRecordTimestamp;
+ private volatile long lastUpdated;
+ }
+
+ /**
+ * The periodic watermark emitter. In its given interval, it checks all
shards for the current
+ * event time watermark, and possibly emits the next watermark.
+ */
+ private class PeriodicWatermarkEmitter implements
ProcessingTimeCallback {
+
+ private final ProcessingTimeService timerService;
+ private final long interval;
+
+ PeriodicWatermarkEmitter(ProcessingTimeService timerService,
long autoWatermarkInterval) {
+ this.timerService = checkNotNull(timerService);
+ this.interval = autoWatermarkInterval;
+ }
+
+ public void start() {
+ LOG.debug("registering periodic watermark timer with
interval {}", interval);
+
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval,
this);
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) {
+ emitWatermark();
+ // schedule the next watermark
+
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval,
this);
}
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index a99e845f249..9a6d2d66a6f 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -418,7 +418,7 @@ public TestFetcher(
HashMap<StreamShardMetadata, SequenceNumber>
testStateSnapshot,
List<StreamShardHandle>
testInitialDiscoveryShards) {
- super(streams, sourceContext, runtimeContext,
configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER);
+ super(streams, sourceContext, runtimeContext,
configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null);
this.testStateSnapshot = testStateSnapshot;
this.testInitialDiscoveryShards =
testInitialDiscoveryShards;
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index c18f4309be4..84e18bdc98c 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -23,7 +23,6 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
@@ -32,7 +31,13 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -41,10 +46,16 @@
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
import
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import
org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.CollectingSourceContext;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -59,16 +70,21 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -243,8 +259,7 @@ public void testListStateChangedAfterSnapshotState() throws
Exception {
@Test
@SuppressWarnings("unchecked")
public void
testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws
Exception {
- KinesisDataFetcher mockedFetcher =
Mockito.mock(KinesisDataFetcher.class);
-
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
+ KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -286,11 +301,10 @@ public void
testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws
// mock fetcher
//
----------------------------------------------------------------------
- KinesisDataFetcher mockedFetcher =
Mockito.mock(KinesisDataFetcher.class);
+ KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -348,11 +362,10 @@ public void
testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exceptio
// mock fetcher
//
----------------------------------------------------------------------
- KinesisDataFetcher mockedFetcher =
Mockito.mock(KinesisDataFetcher.class);
+ KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -441,13 +454,12 @@ public void
testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShar
// mock fetcher
//
----------------------------------------------------------------------
- KinesisDataFetcher mockedFetcher =
Mockito.mock(KinesisDataFetcher.class);
+ KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
shards.add(new StreamShardHandle("fakeStream2",
new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -553,7 +565,7 @@ public void
testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheSt
// mock fetcher
//
----------------------------------------------------------------------
- KinesisDataFetcher mockedFetcher =
Mockito.mock(KinesisDataFetcher.class);
+ KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
// create a fake stream shard handle based on the first entry
in the restored state
@@ -567,7 +579,6 @@ public void
testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheSt
shards.add(closedStreamShardHandle);
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -664,34 +675,166 @@ public void addAll(List<T> values) throws Exception {
return fakeRestoredState;
}
- /**
- * A non-serializable {@link KinesisDeserializationSchema} (because it
is a nested class with reference
- * to the enclosing class, which is not serializable) used for testing.
- */
- private final class NonSerializableDeserializationSchema implements
KinesisDeserializationSchema<String> {
- @Override
- public String deserialize(byte[] recordValue, String
partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String
shardId) throws IOException {
- return new String(recordValue);
- }
+ private static KinesisDataFetcher mockKinesisDataFetcher() throws
Exception {
+ KinesisDataFetcher mockedFetcher =
Mockito.mock(KinesisDataFetcher.class);
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
+ java.lang.reflect.Constructor<KinesisDataFetcher> ctor =
(java.lang.reflect.Constructor<KinesisDataFetcher>)
KinesisDataFetcher.class.getConstructors()[0];
+ Class<?>[] otherParamTypes = new
Class<?>[ctor.getParameterTypes().length - 1];
+ System.arraycopy(ctor.getParameterTypes(), 1, otherParamTypes,
0, ctor.getParameterTypes().length - 1);
+
+ Supplier<Object[]> argumentSupplier = () -> {
+ Object[] otherParamArgs = new
Object[otherParamTypes.length];
+ for (int i = 0; i < otherParamTypes.length; i++) {
+ otherParamArgs[i] =
Mockito.nullable(otherParamTypes[i]);
+ }
+ return otherParamArgs;
+ };
+
PowerMockito.whenNew(ctor).withArguments(Mockito.any(ctor.getParameterTypes()[0]),
+ argumentSupplier.get()).thenReturn(mockedFetcher);
+ return mockedFetcher;
+ }
+
+ @Test
+ public void testPeriodicWatermark() throws Exception {
+
+ String streamName = "fakeStreamName";
+ Time maxOutOfOrderness = Time.milliseconds(5);
+ long autoWatermarkInterval = 1_000;
+
+ HashMap<String, String>
subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
+ subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
+
+ KinesisDeserializationSchema<String> deserializationSchema =
new KinesisDeserializationSchemaWrapper<>(
+ new SimpleStringSchema());
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfigConstants.AWS_REGION,
"us-east-1");
+
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
Long.toString(10L));
+
+ BlockingQueue<String> shard1 = new LinkedBlockingQueue();
+ BlockingQueue<String> shard2 = new LinkedBlockingQueue();
+
+ Map<String, List<BlockingQueue<String>>> streamToQueueMap = new
HashMap<>();
+ streamToQueueMap.put(streamName, Lists.newArrayList(shard1,
shard2));
+
+ // override createFetcher to mock Kinesis
+ FlinkKinesisConsumer<String> sourceFunc =
+ new FlinkKinesisConsumer<String>(streamName,
deserializationSchema, props) {
+ @Override
+ protected KinesisDataFetcher<String>
createFetcher(
+ List<String> streams,
+ SourceContext<String> sourceContext,
+ RuntimeContext runtimeContext,
+ Properties configProps,
+ KinesisDeserializationSchema<String>
deserializationSchema) {
+
+ KinesisDataFetcher<String> fetcher =
+ new KinesisDataFetcher<String>(
+ streams,
+ sourceContext,
+
sourceContext.getCheckpointLock(),
+ runtimeContext,
+ configProps,
+ deserializationSchema,
+ getShardAssigner(),
+
getPeriodicWatermarkAssigner(),
+ new AtomicReference<>(),
+ new ArrayList<>(),
+
subscribedStreamsToLastDiscoveredShardIds,
+ (props) ->
FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamToQueueMap)
+ ) {};
+ return fetcher;
+ }
+ };
+
+ sourceFunc.setShardAssigner(
+ (streamShardHandle, i) -> {
+ // shardId-000000000000
+ return Integer.parseInt(
+
streamShardHandle.getShard().getShardId().substring("shardId-".length()));
+ });
+
+ sourceFunc.setPeriodicWatermarkAssigner(new
TestTimestampExtractor(maxOutOfOrderness));
+
+ // there is currently no test harness specifically for sources,
+ // so we overlay the source thread here
+ AbstractStreamOperatorTestHarness<Object> testHarness =
+ new AbstractStreamOperatorTestHarness<Object>(
+ new StreamSource(sourceFunc), 1, 1, 0);
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval);
+
+ testHarness.initializeState(null);
+ testHarness.open();
+
+ ConcurrentLinkedQueue<Watermark> watermarks = new
ConcurrentLinkedQueue<>();
+
+ @SuppressWarnings("unchecked")
+ SourceFunction.SourceContext<String> sourceContext = new
CollectingSourceContext(
+ testHarness.getCheckpointLock(),
testHarness.getOutput()) {
+ @Override
+ public void emitWatermark(Watermark mark) {
+ watermarks.add(mark);
+ }
+ };
+
+ new Thread(
+ () -> {
+ try {
+ sourceFunc.run(sourceContext);
+ } catch (InterruptedException e) {
+ // expected on cancel
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .start();
+
+ shard1.put("1");
+ shard1.put("2");
+ shard2.put("10");
+ int recordCount = 3;
+ int watermarkCount = 0;
+ awaitRecordCount(testHarness.getOutput(), recordCount);
+
+ // trigger watermark emit
+ testHarness.setProcessingTime(testHarness.getProcessingTime() +
autoWatermarkInterval);
+ watermarkCount++;
+
+ // advance watermark
+ shard1.put("10");
+ recordCount++;
+ awaitRecordCount(testHarness.getOutput(), recordCount);
+
+ // trigger watermark emit
+ testHarness.setProcessingTime(testHarness.getProcessingTime() +
autoWatermarkInterval);
+ watermarkCount++;
+
+ sourceFunc.cancel();
+ testHarness.close();
+
+ assertEquals("record count", recordCount,
testHarness.getOutput().size());
+ assertEquals("watermark count", watermarkCount,
watermarks.size());
+ assertThat(watermarks, org.hamcrest.Matchers.contains(new
Watermark(-3), new Watermark(5)));
+ }
+
+ private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object>
queue, int count) throws Exception {
+ long timeoutMillis = System.currentTimeMillis() + 10_000;
+ while (System.currentTimeMillis() < timeoutMillis &&
queue.size() < count) {
+ Thread.sleep(10);
}
}
- /**
- * A static, serializable {@link KinesisDeserializationSchema}.
- */
- private static final class SerializableDeserializationSchema implements
KinesisDeserializationSchema<String> {
- @Override
- public String deserialize(byte[] recordValue, String
partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String
shardId) throws IOException {
- return new String(recordValue);
+ private static class TestTimestampExtractor extends
BoundedOutOfOrdernessTimestampExtractor<String> {
+ private static final long serialVersionUID = 1L;
+
+ public TestTimestampExtractor(Time maxAllowedLateness) {
+ super(maxAllowedLateness);
}
@Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
+ public long extractTimestamp(String element) {
+ return Long.parseLong(element);
}
}
+
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index ccf39d0e19b..e3e7287f24a 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -21,6 +21,9 @@
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -40,9 +43,13 @@
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.junit.Assert;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@@ -710,4 +717,98 @@ public void testIsThisSubtaskShouldSubscribeTo() {
assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 1));
}
+ private static BoundedOutOfOrdernessTimestampExtractor<String>
watermarkAssigner =
+ new
BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(0)) {
+ @Override
+ public long extractTimestamp(String element) {
+ return Long.parseLong(element);
+ }
+ };
+
+ @Test
+ public void testPeriodicWatermark() {
+ final MutableLong clock = new MutableLong();
+ final MutableBoolean isTemporaryIdle = new MutableBoolean();
+ final List<Watermark> watermarks = new ArrayList<>();
+
+ String fakeStream1 = "fakeStream1";
+ StreamShardHandle shardHandle =
+ new StreamShardHandle(
+ fakeStream1,
+ new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
+
+ TestSourceContext<String> sourceContext =
+ new TestSourceContext<String>() {
+ @Override
+ public void emitWatermark(Watermark mark) {
+ watermarks.add(mark);
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ isTemporaryIdle.setTrue();
+ }
+ };
+
+ HashMap<String, String>
subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
+
+ final KinesisDataFetcher<String> fetcher =
+ new TestableKinesisDataFetcher<String>(
+ Collections.singletonList(fakeStream1),
+ sourceContext,
+ new java.util.Properties(),
+ new KinesisDeserializationSchemaWrapper<>(new
org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
+ 1,
+ 1,
+ new AtomicReference<>(),
+ new LinkedList<>(),
+ subscribedStreamsToLastSeenShardIdsUnderTest,
+
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
+
+ @Override
+ protected long getCurrentTimeMillis() {
+ return clock.getValue();
+ }
+ };
+ Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner",
watermarkAssigner);
+
+ SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
+ // register shards to subsequently emit records
+ int shardIndex =
+ fetcher.registerNewSubscribedShardState(
+ new KinesisStreamShardState(
+
KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle,
seq));
+
+ StreamRecord<String> record1 =
+ new StreamRecord<>(String.valueOf(Long.MIN_VALUE),
Long.MIN_VALUE);
+ fetcher.emitRecordAndUpdateState(record1.getValue(),
record1.getTimestamp(), shardIndex, seq);
+ Assert.assertEquals(record1,
sourceContext.getCollectedOutputs().poll());
+
+ fetcher.emitWatermark();
+ Assert.assertTrue("potential watermark equals previous
watermark", watermarks.isEmpty());
+
+ StreamRecord<String> record2 = new
StreamRecord<>(String.valueOf(1), 1);
+ fetcher.emitRecordAndUpdateState(record2.getValue(),
record2.getTimestamp(), shardIndex, seq);
+ Assert.assertEquals(record2,
sourceContext.getCollectedOutputs().poll());
+
+ fetcher.emitWatermark();
+ Assert.assertFalse("watermark advanced", watermarks.isEmpty());
+ Assert.assertEquals(new Watermark(record2.getTimestamp()),
watermarks.remove(0));
+ Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+
+ // test idle timeout
+ long idleTimeout = 10;
+ // advance clock idleTimeout
+ clock.add(idleTimeout + 1);
+ fetcher.emitWatermark();
+ Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+ Assert.assertTrue("not idle, no new watermark",
watermarks.isEmpty());
+
+ // activate idle timeout
+ Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis",
idleTimeout);
+ fetcher.emitWatermark();
+ Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
+ Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
+ }
+
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index eb3415572c0..c8e1f0919dc 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -22,20 +22,25 @@
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.util.Preconditions;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -102,6 +107,10 @@ public static KinesisProxyInterface
initialNumOfRecordsAfterNumOfGetRecordsCalls
millisBehindLatest);
}
+ public static KinesisProxyInterface blockingQueueGetRecords(Map<String,
List<BlockingQueue<String>>> streamsToShardQueues) {
+ return new BlockingQueueKinesis(streamsToShardQueues);
+ }
+
private static class
SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends
SingleShardEmittingFixNumOfRecordsKinesis {
private final long millisBehindLatest;
@@ -387,4 +396,85 @@ public GetRecordsResult getRecords(String shardIterator,
int maxRecordsToGet) {
return null;
}
}
+
+ private static class BlockingQueueKinesis implements
KinesisProxyInterface {
+
+ private Map<String, List<StreamShardHandle>>
streamsWithListOfShards = new HashMap<>();
+ private Map<String, BlockingQueue<String>>
shardIteratorToQueueMap = new HashMap<>();
+
+ private static String getShardIterator(StreamShardHandle
shardHandle) {
+ return shardHandle.getStreamName() + "-" +
shardHandle.getShard().getShardId();
+ }
+
+ public BlockingQueueKinesis(Map<String,
List<BlockingQueue<String>>> streamsToShardCount) {
+ for (Map.Entry<String, List<BlockingQueue<String>>>
streamToShardQueues : streamsToShardCount.entrySet()) {
+ String streamName =
streamToShardQueues.getKey();
+ int shardCount =
streamToShardQueues.getValue().size();
+
+ if (shardCount == 0) {
+ // don't do anything
+ } else {
+ List<StreamShardHandle> shardsOfStream
= new ArrayList<>(shardCount);
+ for (int i = 0; i < shardCount; i++) {
+ StreamShardHandle shardHandle =
new StreamShardHandle(
+ streamName,
+ new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))
+
.withSequenceNumberRange(new
SequenceNumberRange().withStartingSequenceNumber("0"))
+
.withHashKeyRange(new
HashKeyRange().withStartingHashKey("0").withEndingHashKey("0")));
+ shardsOfStream.add(shardHandle);
+
shardIteratorToQueueMap.put(getShardIterator(shardHandle),
streamToShardQueues.getValue().get(i));
+ }
+ streamsWithListOfShards.put(streamName,
shardsOfStream);
+ }
+ }
+ }
+
+ @Override
+ public GetShardListResult getShardList(Map<String, String>
streamNamesWithLastSeenShardIds) {
+ GetShardListResult result = new GetShardListResult();
+ for (Map.Entry<String, List<StreamShardHandle>>
streamsWithShards : streamsWithListOfShards.entrySet()) {
+ String streamName = streamsWithShards.getKey();
+ for (StreamShardHandle shard :
streamsWithShards.getValue()) {
+ if
(streamNamesWithLastSeenShardIds.get(streamName) == null) {
+
result.addRetrievedShardToStream(streamName, shard);
+ } else {
+ if
(StreamShardHandle.compareShardIds(
+
shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName))
> 0) {
+
result.addRetrievedShardToStream(streamName, shard);
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public String getShardIterator(StreamShardHandle shard, String
shardIteratorType, Object startingMarker) {
+ return getShardIterator(shard);
+ }
+
+ @Override
+ public GetRecordsResult getRecords(String shardIterator, int
maxRecordsToGet) {
+ BlockingQueue<String> queue =
Preconditions.checkNotNull(this.shardIteratorToQueueMap.get(shardIterator),
+ "no queue for iterator %s", shardIterator);
+ List<Record> records = Collections.emptyList();
+ try {
+ String data = queue.take();
+ Record record = new Record()
+ .withData(
+
ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
+
.withPartitionKey(UUID.randomUUID().toString())
+ .withApproximateArrivalTimestamp(new
Date(System.currentTimeMillis()))
+ .withSequenceNumber(String.valueOf(0));
+ records = Collections.singletonList(record);
+ } catch (InterruptedException e) {
+ shardIterator = null;
+ }
+ return new GetRecordsResult()
+ .withRecords(records)
+ .withMillisBehindLatest(0L)
+ .withNextShardIterator(shardIterator);
+ }
+ }
+
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index 21588c9a7a7..a44028766e1 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -69,6 +69,7 @@ public TestableKinesisDataFetcher(
fakeConfiguration,
deserializationSchema,
DEFAULT_SHARD_ASSIGNER,
+ null,
thrownErrorUnderTest,
subscribedShardsStateUnderTest,
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
diff --git a/pom.xml b/pom.xml
index 2a5dfced140..e6446094d07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,7 +124,7 @@ under the License.
<avro.version>1.8.2</avro.version>
<junit.version>4.12</junit.version>
<mockito.version>2.21.0</mockito.version>
- <powermock.version>2.0.0-beta.5</powermock.version>
+ <powermock.version>2.0.0-RC.4</powermock.version>
<hamcrest.version>1.3</hamcrest.version>
<japicmp.skip>false</japicmp.skip>
<codebase>new</codebase>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services