This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 26bac51 [FLINK-10358] fix NPE when running flink-kinesis connector
against dynamodb streams
26bac51 is described below
commit 26bac51cae1d298078902a02e196fffc16ea5704
Author: Ying Xu <[email protected]>
AuthorDate: Thu Sep 20 02:26:02 2018 -0700
[FLINK-10358] fix NPE when running flink-kinesis connector against dynamodb
streams
This closes #6708.
---
.../flink/streaming/connectors/kinesis/internals/ShardConsumer.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 5845eea..36a4e92 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -373,7 +373,10 @@ public class ShardConsumer<T> implements Runnable {
getRecordsResult = kinesis.getRecords(shardItr,
maxNumberOfRecords);
// Update millis behind latest so it gets
reported by the millisBehindLatest gauge
-
shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
+ Long millisBehindLatest =
getRecordsResult.getMillisBehindLatest();
+ if (millisBehindLatest != null) {
+
shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
+ }
} catch (ExpiredIteratorException eiEx) {
LOG.warn("Encountered an unexpected expired
iterator {} for shard {};" +
" refreshing the iterator ...",
shardItr, subscribedShard);