This is an automated email from the ASF dual-hosted git repository.
shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d799b6a [GOBBLIN-1409] Expose record timestamp in kafka-1 client
d799b6a is described below
commit d799b6ab6aa452235eedd3acaeb4a4b10edb3d65
Author: Dan Andreescu <[email protected]>
AuthorDate: Fri Apr 9 19:54:44 2021 -0700
[GOBBLIN-1409] Expose record timestamp in kafka-1 client
In our use case, we need the timestamp from the
broker, so we also
implement isTimestampLogAppend and a kafka-1
specific method,
getTimestampType, to let us check the
TimestampType in all cases.
Closes #3244 from milimetric/master
---
.../gobblin/kafka/client/Kafka1ConsumerClient.java | 24 ++++++++++++++++++++++
.../kafka/client/Kafka1ConsumerClientTest.java | 12 ++++++++---
2 files changed, 33 insertions(+), 3 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
index abc880c..9d2a965 100644
---
a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
+++
b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.record.TimestampType;
import javax.annotation.Nonnull;
import java.io.IOException;
@@ -315,6 +316,29 @@ public class Kafka1ConsumerClient<K, V> extends
AbstractBaseKafkaConsumerClient
this.consumerRecord = consumerRecord;
}
+ /**
+ * @return the timestamp type of the underlying ConsumerRecord (only for
Kafka 1+ records)
+ */
+ public TimestampType getTimestampType() {
+ return this.consumerRecord.timestampType();
+ }
+
+ /**
+ * @return true if the timestamp in the ConsumerRecord is the timestamp
when the record is written to Kafka.
+ */
+ @Override
+ public boolean isTimestampLogAppend() {
+ return this.consumerRecord.timestampType() ==
TimestampType.LOG_APPEND_TIME;
+ }
+
+ /**
+ * @return the timestamp of the underlying ConsumerRecord. NOTE: check
TimestampType
+ */
+ @Override
+ public long getTimestamp() {
+ return this.consumerRecord.timestamp();
+ }
+
@Override
public K getKey() {
return this.consumerRecord.key();
diff --git
a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
index 6587ff7..c6e1afc 100644
---
a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
+++
b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -47,9 +48,10 @@ public class Kafka1ConsumerClientTest {
beginningOffsets.put(new TopicPartition("test_topic", 0), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
- ConsumerRecord<String, String> record0 = new
ConsumerRecord<>("test_topic", 0, 0L, "key", "value0");
- ConsumerRecord<String, String> record1 = new
ConsumerRecord<>("test_topic", 0, 1L, "key", "value1");
- ConsumerRecord<String, String> record2 = new
ConsumerRecord<>("test_topic", 0, 2L, "key", "value2");
+
+ ConsumerRecord<String, String> record0 = new
ConsumerRecord<>("test_topic", 0, 0L, 10L, TimestampType.CREATE_TIME, 0L, 3, 6,
"key", "value0");
+ ConsumerRecord<String, String> record1 = new
ConsumerRecord<>("test_topic", 0, 1L, 11L, TimestampType.LOG_APPEND_TIME, 1L,
3, 6, "key", "value1");
+ ConsumerRecord<String, String> record2 = new
ConsumerRecord<>("test_topic", 0, 2L, 12L, TimestampType.LOG_APPEND_TIME, 2L,
3, 6, "key", "value2");
consumer.addRecord(record0);
consumer.addRecord(record1);
@@ -67,6 +69,10 @@ public class Kafka1ConsumerClientTest {
new Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record1), new
Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record2));
Assert.assertEquals(consumedRecords, expected);
+ Kafka1ConsumerClient.Kafka1ConsumerRecord expected0 =
expected.iterator().next();
+ Assert.assertEquals(record0.timestamp(), expected0.getTimestamp());
+ Assert.assertEquals(record0.timestampType() ==
TimestampType.LOG_APPEND_TIME, expected0.isTimestampLogAppend());
+ Assert.assertEquals(record0.timestampType(),
expected0.getTimestampType());
}
}