This is an automated email from the ASF dual-hosted git repository.
mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 501dc4c [BEAM-3851] Option to preserve element timestamp while
publishing to Kafka. (#4868)
501dc4c is described below
commit 501dc4cb17bb943aaa095feab959a9fed1aac20c
Author: Raghu Angadi <[email protected]>
AuthorDate: Thu Mar 22 10:56:50 2018 -0700
[BEAM-3851] Option to preserve element timestamp while publishing to Kafka.
(#4868)
* Option to preserve element timestamp while publishing to Kafka.
* Let users provide custom timestamp function.
* update javadoc
---
.../beam/sdk/io/kafka/KafkaExactlyOnceSink.java | 44 +++++++++++++--------
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 33 +++++++++++++++-
.../io/kafka/KafkaPublishTimestampFunction.java | 45 ++++++++++++++++++++++
.../org/apache/beam/sdk/io/kafka/KafkaWriter.java | 8 +++-
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 16 ++++++--
5 files changed, 123 insertions(+), 23 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index 7345a92..9ae69da 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -64,6 +64,8 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -173,7 +175,8 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
/**
* Shuffle messages assigning each randomly to a shard.
*/
- private static class Reshard<K, V> extends DoFn<KV<K, V>, KV<Integer, KV<K,
V>>> {
+ private static class Reshard<K, V>
+ extends DoFn<KV<K, V>, KV<Integer, TimestampedValue<KV<K, V>>>> {
private final int numShards;
private transient int shardId;
@@ -190,12 +193,13 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
@ProcessElement
public void processElement(ProcessContext ctx) {
shardId = (shardId + 1) % numShards; // round-robin among shards.
- ctx.output(KV.of(shardId, ctx.element()));
+ ctx.output(KV.of(shardId, TimestampedValue.of(ctx.element(),
ctx.timestamp())));
}
}
- private static class Sequencer<K, V>
- extends DoFn<KV<Integer, Iterable<KV<K, V>>>, KV<Integer, KV<Long, KV<K,
V>>>> {
+ private static class Sequencer<K, V> extends DoFn<
+ KV<Integer, Iterable<TimestampedValue<KV<K, V>>>>,
+ KV<Integer, KV<Long, TimestampedValue<KV<K, V>>>>> {
private static final String NEXT_ID = "nextId";
@StateId(NEXT_ID)
@@ -205,7 +209,7 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState,
ProcessContext ctx) {
long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
int shard = ctx.element().getKey();
- for (KV<K, V> value : ctx.element().getValue()) {
+ for (TimestampedValue<KV<K, V>> value : ctx.element().getValue()) {
ctx.output(KV.of(shard, KV.of(nextId, value)));
nextId++;
}
@@ -214,7 +218,7 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
}
private static class ExactlyOnceWriter<K, V>
- extends DoFn<KV<Integer, Iterable<KV<Long, KV<K, V>>>>, Void> {
+ extends DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<KV<K, V>>>>>,
Void> {
private static final String NEXT_ID = "nextId";
private static final String MIN_BUFFERED_ID = "minBufferedId";
@@ -230,7 +234,7 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
@StateId(MIN_BUFFERED_ID)
private final StateSpec<ValueState<Long>> minBufferedIdSpec =
StateSpecs.value();
@StateId(OUT_OF_ORDER_BUFFER)
- private final StateSpec<BagState<KV<Long, KV<K, V>>>> outOfOrderBufferSpec;
+ private final StateSpec<BagState<KV<Long, TimestampedValue<KV<K, V>>>>>
outOfOrderBufferSpec;
// A random id assigned to each shard. Helps with detecting when multiple
jobs are mistakenly
// started with same groupId used for storing state on Kafka side,
including the case where
// a job is restarted with same groupId, but the metadata from previous
run was not cleared.
@@ -248,7 +252,8 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
ExactlyOnceWriter(Write<K, V> spec, Coder<KV<K, V>> elemCoder) {
this.spec = spec;
- this.outOfOrderBufferSpec =
StateSpecs.bag(KvCoder.of(BigEndianLongCoder.of(), elemCoder));
+ this.outOfOrderBufferSpec = StateSpecs.bag(
+ KvCoder.of(BigEndianLongCoder.of(),
TimestampedValueCoder.of(elemCoder)));
}
@Setup
@@ -261,7 +266,7 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState,
@StateId(MIN_BUFFERED_ID) ValueState<Long>
minBufferedIdState,
@StateId(OUT_OF_ORDER_BUFFER)
- BagState<KV<Long, KV<K, V>>> oooBufferState,
+ BagState<KV<Long, TimestampedValue<KV<K,
V>>>> oooBufferState,
@StateId(WRITER_ID) ValueState<String>
writerIdState,
ProcessContext ctx)
throws IOException {
@@ -297,10 +302,10 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
// There might be out of order messages buffered in earlier
iterations. These
// will get merged if and when minBufferedId matches nextId.
- Iterator<KV<Long, KV<K, V>>> iter =
ctx.element().getValue().iterator();
+ Iterator<KV<Long, TimestampedValue<KV<K, V>>>> iter =
ctx.element().getValue().iterator();
while (iter.hasNext()) {
- KV<Long, KV<K, V>> kv = iter.next();
+ KV<Long, TimestampedValue<KV<K, V>>> kv = iter.next();
long recordId = kv.getKey();
if (recordId < nextId) {
@@ -339,7 +344,8 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
// Read all of them in to memory and sort them. Reading into memory
// might be problematic in extreme cases. Might need to improve it
in future.
- List<KV<Long, KV<K, V>>> buffered =
Lists.newArrayList(oooBufferState.read());
+ List<KV<Long, TimestampedValue<KV<K, V>>>> buffered =
+ Lists.newArrayList(oooBufferState.read());
buffered.sort(new KV.OrderByKey<>());
LOG.info("{} : merging {} buffered records (min buffered id is
{}).",
@@ -349,8 +355,7 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
minBufferedIdState.clear();
minBufferedId = Long.MAX_VALUE;
- iter =
- Iterators.mergeSorted(
+ iter = Iterators.mergeSorted(
ImmutableList.of(iter, buffered.iterator()), new
KV.OrderByKey<>());
}
}
@@ -428,10 +433,17 @@ class KafkaExactlyOnceSink<K, V> extends
PTransform<PCollection<KV<K, V>>, PColl
ProducerSpEL.beginTransaction(producer);
}
- void sendRecord(KV<K, V> record, Counter sendCounter) {
+ void sendRecord(TimestampedValue<KV<K, V>> record, Counter sendCounter) {
try {
+ Long timestampMillis = spec.getPublishTimestampFunction() != null
+ ?
spec.getPublishTimestampFunction().getTimestamp(record.getValue(),
+
record.getTimestamp()).getMillis()
+ : null;
+
producer.send(
- new ProducerRecord<>(spec.getTopic(), record.getKey(),
record.getValue()));
+ new ProducerRecord<>(
+ spec.getTopic(), null, timestampMillis,
+ record.getValue().getKey(), record.getValue().getValue()));
sendCounter.inc();
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index eb29229..eeb9da9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -174,10 +174,15 @@ import org.slf4j.LoggerFactory;
* .withKeySerializer(LongSerializer.class)
* .withValueSerializer(StringSerializer.class)
*
- * // you can further customize KafkaProducer used to write the records
by adding more
+ * // You can further customize KafkaProducer used to write the records
by adding more
* // settings for ProducerConfig. e.g, to enable compression :
* .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
*
+ * // You set publish timestamp for the Kafka records.
+ * .withInputTimestamp() // element timestamp is used while publishing
to Kafka
+ * // or you can also set a custom timestamp with a function.
+ * .withPublishTimestampFunction((elem, elemTs) -> ...)
+ *
* // Optionally enable exactly-once sink (on supported runners). See
JavaDoc for withEOS().
* .withEOS(20, "eos-sink-group-id");
* );
@@ -813,6 +818,8 @@ public class KafkaIO {
@Nullable abstract Class<? extends Serializer<K>> getKeySerializer();
@Nullable abstract Class<? extends Serializer<V>> getValueSerializer();
+ @Nullable abstract KafkaPublishTimestampFunction<KV<K, V>>
getPublishTimestampFunction();
+
// Configuration for EOS sink
abstract boolean isEOS();
@Nullable abstract String getSinkGroupId();
@@ -830,6 +837,8 @@ public class KafkaIO {
SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>>
serializer);
abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>>
serializer);
+ abstract Builder<K, V> setPublishTimestampFunction(
+ KafkaPublishTimestampFunction<KV<K, V>> timestampFunction);
abstract Builder<K, V> setEOS(boolean eosEnabled);
abstract Builder<K, V> setSinkGroupId(String sinkGroupId);
abstract Builder<K, V> setNumShards(int numShards);
@@ -890,6 +899,28 @@ public class KafkaIO {
}
/**
+ * The timestamp for each record being published is set to timestamp of
the element in the
+ * pipeline. This is equivalent to {@code withPublishTimestampFunction((e,
ts) -> ts)}. <br>
+ * NOTE: Kafka's retention policies are based on message timestamps. If
the pipeline
+ * is processing messages from the past, they might be deleted immediately
by Kafka after
+ * being published if the timestamps are older than Kafka cluster's {@code
log.retention.hours}.
+ */
+ public Write<K, V> withInputTimestamp() {
+ return
withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
+ }
+
+ /**
+ * A function to provide timestamp for records being published. <br>
+ * NOTE: Kafka's retention policies are based on message timestamps. If
the pipeline
+ * is processing messages from the past, they might be deleted immediately
by Kafka after
+ * being published if the timestamps are older than Kafka cluster's {@code
log.retention.hours}.
+ */
+ public Write<K, V> withPublishTimestampFunction(
+ KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {
+ return
toBuilder().setPublishTimestampFunction(timestampFunction).build();
+ }
+
+ /**
* Provides exactly-once semantics while writing to Kafka, which enables
applications with
* end-to-end exactly-once guarantees on top of exactly-once semantics
<i>within</i> Beam
* pipelines. It ensures that records written to sink are committed on
Kafka exactly once,
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java
new file mode 100644
index 0000000..e0ef639
--- /dev/null
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * An interface for providing custom timestamp for elements written to Kafka.
+ */
+public interface KafkaPublishTimestampFunction<T> extends Serializable {
+
+ /**
+ * Returns timestamp for element being published to Kafka.
+ * See @{@link org.apache.kafka.clients.producer.ProducerRecord}.
+ *
+ * @param element The element being published.
+ * @param elementTimestamp Timestamp of the element from the context
+ * (i.e. @{@link DoFn.ProcessContext#timestamp()}
+ */
+ Instant getTimestamp(T element, Instant elementTimestamp);
+
+ /**
+ * Returns {@link KafkaPublishTimestampFunction} returns element timestamp
from ProcessContext.
+ */
+ static <T> KafkaPublishTimestampFunction<T> withElementTimestamp() {
+ return (element, elementTimestamp) -> elementTimestamp;
+ }
+}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index 00b76e5..9f2544a 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -54,8 +54,12 @@ class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> {
checkForFailures();
KV<K, V> kv = ctx.element();
- producer.send(
- new ProducerRecord<>(spec.getTopic(), kv.getKey(), kv.getValue()), new
SendCallback());
+ Long timestampMillis = spec.getPublishTimestampFunction() != null
+ ? spec.getPublishTimestampFunction().getTimestamp(kv,
ctx.timestamp()).getMillis()
+ : null;
+
+ producer.send(new ProducerRecord<>(
+ spec.getTopic(), null, timestampMillis, kv.getKey(), kv.getValue()),
new SendCallback());
elementsWritten.inc();
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 3718c41..7ae8f1a 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -854,13 +854,14 @@ public class KafkaIOTest {
.withTopic(topic)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(LongSerializer.class)
+ .withInputTimestamp()
.withProducerFactoryFn(new
ProducerFactoryFn(producerWrapper.producerKey)));
p.run();
completionThread.shutdown();
- verifyProducerRecords(producerWrapper.mockProducer, topic, numElements,
false);
+ verifyProducerRecords(producerWrapper.mockProducer, topic, numElements,
false, true);
}
}
@@ -891,7 +892,7 @@ public class KafkaIOTest {
completionThread.shutdown();
- verifyProducerRecords(producerWrapper.mockProducer, topic, numElements,
true);
+ verifyProducerRecords(producerWrapper.mockProducer, topic, numElements,
true, false);
}
}
@@ -930,13 +931,14 @@ public class KafkaIOTest {
.withEOS(1, "test")
.withConsumerFactoryFn(new ConsumerFactoryFn(
Lists.newArrayList(topic), 10, 10,
OffsetResetStrategy.EARLIEST))
+ .withPublishTimestampFunction((e, ts) -> ts)
.withProducerFactoryFn(new
ProducerFactoryFn(producerWrapper.producerKey)));
p.run();
completionThread.shutdown();
- verifyProducerRecords(producerWrapper.mockProducer, topic, numElements,
false);
+ verifyProducerRecords(producerWrapper.mockProducer, topic, numElements,
false, true);
}
}
@@ -1198,7 +1200,10 @@ public class KafkaIOTest {
}
private static void verifyProducerRecords(MockProducer<Integer, Long>
mockProducer,
- String topic, int numElements,
boolean keyIsAbsent) {
+ String topic,
+ int numElements,
+ boolean keyIsAbsent,
+ boolean verifyTimestamp) {
// verify that appropriate messages are written to kafka
List<ProducerRecord<Integer, Long>> sent = mockProducer.history();
@@ -1215,6 +1220,9 @@ public class KafkaIOTest {
assertEquals(i, record.key().intValue());
}
assertEquals(i, record.value().longValue());
+ if (verifyTimestamp) {
+ assertEquals(i, record.timestamp().intValue());
+ }
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].