This is an automated email from the ASF dual-hosted git repository.

shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d799b6a  [GOBBLIN-1409] Expose record timestamp in kafka-1 client
d799b6a is described below

commit d799b6ab6aa452235eedd3acaeb4a4b10edb3d65
Author: Dan Andreescu <[email protected]>
AuthorDate: Fri Apr 9 19:54:44 2021 -0700

    [GOBBLIN-1409] Expose record timestamp in kafka-1 client
    
    In our use case, we need the timestamp from the
    broker, so we also
    implement isTimestampLogAppend and a kafka-1
    specific method,
    getTimestampType, to let us check the
    TimestampType in all cases.
    
    Closes #3244 from milimetric/master
---
 .../gobblin/kafka/client/Kafka1ConsumerClient.java | 24 ++++++++++++++++++++++
 .../kafka/client/Kafka1ConsumerClientTest.java     | 12 ++++++++---
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
index abc880c..9d2a965 100644
--- 
a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.record.TimestampType;
 
 import javax.annotation.Nonnull;
 import java.io.IOException;
@@ -315,6 +316,29 @@ public class Kafka1ConsumerClient<K, V> extends 
AbstractBaseKafkaConsumerClient
       this.consumerRecord = consumerRecord;
     }
 
+    /**
+     * @return the timestamp type of the underlying ConsumerRecord (only for 
Kafka 1+ records)
+     */
+    public TimestampType getTimestampType() {
+      return this.consumerRecord.timestampType();
+    }
+
+    /**
+     * @return true if the timestamp in the ConsumerRecord is the timestamp 
when the record is written to Kafka.
+     */
+    @Override
+    public boolean isTimestampLogAppend() {
+      return this.consumerRecord.timestampType() == 
TimestampType.LOG_APPEND_TIME;
+    }
+
+    /**
+     * @return the timestamp of the underlying ConsumerRecord.  NOTE: check 
TimestampType
+     */
+    @Override
+    public long getTimestamp() {
+      return this.consumerRecord.timestamp();
+    }
+
     @Override
     public K getKey() {
       return this.consumerRecord.key();
diff --git 
a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
 
b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
index 6587ff7..c6e1afc 100644
--- 
a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
+++ 
b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -47,9 +48,10 @@ public class Kafka1ConsumerClientTest {
     beginningOffsets.put(new TopicPartition("test_topic", 0), 0L);
     consumer.updateBeginningOffsets(beginningOffsets);
 
-    ConsumerRecord<String, String> record0 = new 
ConsumerRecord<>("test_topic", 0, 0L, "key", "value0");
-    ConsumerRecord<String, String> record1 = new 
ConsumerRecord<>("test_topic", 0, 1L, "key", "value1");
-    ConsumerRecord<String, String> record2 = new 
ConsumerRecord<>("test_topic", 0, 2L, "key", "value2");
+
+    ConsumerRecord<String, String> record0 = new 
ConsumerRecord<>("test_topic", 0, 0L, 10L, TimestampType.CREATE_TIME, 0L, 3, 6, 
"key", "value0");
+    ConsumerRecord<String, String> record1 = new 
ConsumerRecord<>("test_topic", 0, 1L, 11L, TimestampType.LOG_APPEND_TIME, 1L, 
3, 6, "key", "value1");
+    ConsumerRecord<String, String> record2 = new 
ConsumerRecord<>("test_topic", 0, 2L, 12L, TimestampType.LOG_APPEND_TIME, 2L, 
3, 6, "key", "value2");
 
     consumer.addRecord(record0);
     consumer.addRecord(record1);
@@ -67,6 +69,10 @@ public class Kafka1ConsumerClientTest {
               new Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record1), new 
Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record2));
       Assert.assertEquals(consumedRecords, expected);
 
+      Kafka1ConsumerClient.Kafka1ConsumerRecord expected0 = 
expected.iterator().next();
+      Assert.assertEquals(record0.timestamp(), expected0.getTimestamp());
+      Assert.assertEquals(record0.timestampType() == 
TimestampType.LOG_APPEND_TIME, expected0.isTimestampLogAppend());
+      Assert.assertEquals(record0.timestampType(), 
expected0.getTimestampType());
     }
 
   }

Reply via email to