This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 26bac51  [FLINK-10358] fix NPE when running flink-kinesis connector 
against dynamodb streams
26bac51 is described below

commit 26bac51cae1d298078902a02e196fffc16ea5704
Author: Ying Xu <[email protected]>
AuthorDate: Thu Sep 20 02:26:02 2018 -0700

    [FLINK-10358] fix NPE when running flink-kinesis connector against dynamodb 
streams
    
    This closes #6708.
---
 .../flink/streaming/connectors/kinesis/internals/ShardConsumer.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 5845eea..36a4e92 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -373,7 +373,10 @@ public class ShardConsumer<T> implements Runnable {
                                getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
 
                                // Update millis behind latest so it gets 
reported by the millisBehindLatest gauge
-                               
shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
+                               Long millisBehindLatest = 
getRecordsResult.getMillisBehindLatest();
+                               if (millisBehindLatest != null) {
+                                       
shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
+                               }
                        } catch (ExpiredIteratorException eiEx) {
                                LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
                                        " refreshing the iterator ...", 
shardItr, subscribedShard);

Reply via email to