Repository: flink
Updated Branches:
  refs/heads/master eeac022f0 -> e75481cc6


[FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in 
Kinesis connector.

This closes #5480.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e75481cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e75481cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e75481cc

Branch: refs/heads/master
Commit: e75481cc6c5240e976f7eb829152153731fb63bf
Parents: eeac022
Author: Thomas Weise <t...@apache.org>
Authored: Tue Feb 13 16:33:59 2018 -0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Fri Feb 23 19:48:19 2018 +0800

----------------------------------------------------------------------
 .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e75481cc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 945f396..65de24c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -513,7 +513,7 @@ public class KinesisDataFetcher<T> {
         *                        when the shard state was registered.
         * @param lastSequenceNumber the last sequence number value to update
         */
-       protected final void emitRecordAndUpdateState(T record, long 
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
+       protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
                synchronized (checkpointLock) {
                        if (record != null) {
                                sourceContext.collectWithTimestamp(record, 
recordTimestamp);

Reply via email to