Repository: samza Updated Branches: refs/heads/master c8b776ec1 -> a702b3bb4
SAMZA-1753: Added timestamp to Incoming message envelope. Author: Boris S <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: [email protected] Closes #559 from sborya/kafkaTS Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a702b3bb Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a702b3bb Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a702b3bb Branch: refs/heads/master Commit: a702b3bb4a159804a2f7234710231b0dbf570b90 Parents: c8b776e Author: Boris S <[email protected]> Authored: Mon Jun 25 13:08:50 2018 -0700 Committer: Boris S <[email protected]> Committed: Mon Jun 25 13:08:50 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/IncomingMessageEnvelope.java | 9 +++++++++ .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 8 ++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a702b3bb/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java index 60a605b..4d0ce2f 100644 --- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java +++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java @@ -36,6 +36,7 @@ public class IncomingMessageEnvelope { private final Object key; private final Object message; private final int size; + private long timestamp = 0L; /** * Constructs a new IncomingMessageEnvelope from specified components. @@ -66,6 +67,14 @@ public class IncomingMessageEnvelope { this.size = size; } + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + public SystemStreamPartition getSystemStreamPartition() { return systemStreamPartition; } http://git-wip-us.apache.org/repos/asf/samza/blob/a702b3bb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 3a1ffe9..4cebb82 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -279,9 +279,13 @@ private[kafka] class KafkaSystemConsumer( } if(fetchLimitByBytesEnabled ) { - put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message))) + val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message)) + ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L) + put(systemStreamPartition, ime) } else { - put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)) + val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message) + ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L) + put(systemStreamPartition, ime) } setIsAtHead(systemStreamPartition, isAtHead)
