This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new c905a4d [hotfix][kinesis] Update emit record javadoc and don't count
max watermark as timeout
c905a4d is described below
commit c905a4d323c0ed4985cdb9e5764efe13bc6183d0
Author: Thomas Weise <[email protected]>
AuthorDate: Mon Aug 26 15:02:40 2019 -0700
[hotfix][kinesis] Update emit record javadoc and don't count max watermark
as timeout
---
.../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 5 +++--
.../connectors/kinesis/util/JobManagerWatermarkTracker.java | 4 +++-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index f38e6eb..80b724b 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -715,7 +715,7 @@ public class KinesisDataFetcher<T> {
//
------------------------------------------------------------------------
/**
- * Atomic operation to collect a record and update state to the
sequence number of the record.
+ * Prepare a record and hand it over to the {@link RecordEmitter},
which may collect it asynchronously.
* This method is called by {@link ShardConsumer}s.
*
* @param record the record to collect
@@ -752,7 +752,8 @@ public class KinesisDataFetcher<T> {
}
/**
- * Actual record emission called from the record emitter.
+ * Atomic operation to collect a record and update state to the
sequence number of the record.
+ * This method is called from the record emitter.
*
* <p>Responsible for tracking per shard watermarks and emit timestamps
extracted from
* the record, when a watermark assigner was configured.
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
index f150bb0..1581024 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -144,7 +144,9 @@ public class JobManagerWatermarkTracker extends
WatermarkTracker {
WatermarkState ws = e.getValue();
if (ws.lastUpdated + updateTimeoutMillis <
currentTime) {
// ignore outdated entry
- updateTimeoutCount++;
+ if (ws.watermark < Long.MAX_VALUE) {
+ updateTimeoutCount++;
+ }
continue;
}
globalWatermark = Math.min(ws.watermark,
globalWatermark);