Repository: samza
Updated Branches:
  refs/heads/master c8b776ec1 -> a702b3bb4


SAMZA-1753: Added timestamp to Incoming message envelope.

Author: Boris S <[email protected]>
Author: Boris Shkolnik <[email protected]>

Reviewers: [email protected]

Closes #559 from sborya/kafkaTS


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a702b3bb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a702b3bb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a702b3bb

Branch: refs/heads/master
Commit: a702b3bb4a159804a2f7234710231b0dbf570b90
Parents: c8b776e
Author: Boris S <[email protected]>
Authored: Mon Jun 25 13:08:50 2018 -0700
Committer: Boris S <[email protected]>
Committed: Mon Jun 25 13:08:50 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/system/IncomingMessageEnvelope.java    | 9 +++++++++
 .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 8 ++++++--
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a702b3bb/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 60a605b..4d0ce2f 100644
--- 
a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ 
b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -36,6 +36,7 @@ public class IncomingMessageEnvelope {
   private final Object key;
   private final Object message;
   private final int size;
+  private long timestamp = 0L;
 
   /**
    * Constructs a new IncomingMessageEnvelope from specified components.
@@ -66,6 +67,14 @@ public class IncomingMessageEnvelope {
     this.size = size;
   }
 
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
   public SystemStreamPartition getSystemStreamPartition() {
     return systemStreamPartition;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a702b3bb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 3a1ffe9..4cebb82 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -279,9 +279,13 @@ private[kafka] class KafkaSystemConsumer(
       }
 
       if(fetchLimitByBytesEnabled ) {
-        put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, offset, key, message, 
getMessageSize(msg.message)))
+        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, 
key, message, getMessageSize(msg.message))
+        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 
0L)
+        put(systemStreamPartition, ime)
       } else {
-        put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
+        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, 
key, message)
+        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 
0L)
+        put(systemStreamPartition, ime)
       }
 
       setIsAtHead(systemStreamPartition, isAtHead)

Reply via email to