This is an automated email from the ASF dual-hosted git repository.
mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9f84beb [BEAM-591] KafkaIO : Improve watermarks and support server
side timestamps (#4680)
9f84beb is described below
commit 9f84bebc4b602551b2c719702cbb4dbab7c5b258
Author: Raghu Angadi <[email protected]>
AuthorDate: Fri Feb 23 16:42:18 2018 -0800
[BEAM-591] KafkaIO : Improve watermarks and support server side timestamps
(#4680)
* Redesign how timestamps and watermarks are handled in KafkaIO.
- Added TimestampPolicy that provides both record timestamps and
watermarks.
- built in policies for 'LogAppendTime' (server-time) ProcessingTime
(default)
- Ensure idle partitions don't hold watermark back
- deprecated previous API to set functions for custom timestamps and
watermarks.
* minor
---
.../org/apache/beam/sdk/io/kafka/ConsumerSpEL.java | 20 +-
.../beam/sdk/io/kafka/KafkaCheckpointMark.java | 32 ++--
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 83 ++++++++-
.../org/apache/beam/sdk/io/kafka/KafkaRecord.java | 14 +-
.../apache/beam/sdk/io/kafka/KafkaRecordCoder.java | 3 +
.../beam/sdk/io/kafka/KafkaTimestampType.java | 44 +++++
.../beam/sdk/io/kafka/KafkaUnboundedReader.java | 200 +++++++++++++-------
.../apache/beam/sdk/io/kafka/TimestampPolicy.java | 68 +++++++
.../beam/sdk/io/kafka/TimestampPolicyFactory.java | 206 +++++++++++++++++++++
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 145 +++++++++++++--
10 files changed, 706 insertions(+), 109 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index a3bd439..f615ad6 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -80,10 +80,10 @@ class ConsumerSpEL {
}
}
- public void evaluateSeek2End(Consumer consumer, TopicPartition
topicPartitions) {
+ public void evaluateSeek2End(Consumer consumer, TopicPartition
topicPartition) {
StandardEvaluationContext mapContext = new StandardEvaluationContext();
mapContext.setVariable("consumer", consumer);
- mapContext.setVariable("tp", topicPartitions);
+ mapContext.setVariable("tp", topicPartition);
seek2endExpression.getValue(mapContext);
}
@@ -95,11 +95,19 @@ class ConsumerSpEL {
}
public long getRecordTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) {
- long timestamp;
- if (!hasRecordTimestamp || (timestamp = rawRecord.timestamp()) <= 0L) {
- timestamp = System.currentTimeMillis();
+ if (hasRecordTimestamp) {
+ return rawRecord.timestamp();
+ }
+ return -1L; // This is the timestamp used in Kafka for older messages
without timestamps.
+ }
+
+ public KafkaTimestampType getRecordTimestamptType(
+ ConsumerRecord<byte[], byte[]> rawRecord) {
+ if (hasRecordTimestamp) {
+ return
KafkaTimestampType.forOrdinal(rawRecord.timestampType().ordinal());
+ } else {
+ return KafkaTimestampType.NO_TIMESTAMP_TYPE;
}
- return timestamp;
}
public boolean hasOffsetsForTimes() {
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 791e594..95ec7ca 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -18,13 +18,14 @@
package org.apache.beam.sdk.io.kafka;
import com.google.common.base.Joiner;
-import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import java.util.Optional;
import org.apache.avro.reflect.AvroIgnore;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
/**
* Checkpoint for a {@link KafkaUnboundedReader}. Consists of Kafka topic
name, partition id,
@@ -36,12 +37,12 @@ public class KafkaCheckpointMark implements
UnboundedSource.CheckpointMark {
private List<PartitionMark> partitions;
@AvroIgnore
- private KafkaUnboundedReader<?, ?> reader; // Non-null when offsets need to
be committed.
+ private Optional<KafkaUnboundedReader<?, ?>> reader; // Present when offsets
need to be committed.
private KafkaCheckpointMark() {} // for Avro
public KafkaCheckpointMark(List<PartitionMark> partitions,
- KafkaUnboundedReader<?, ?> reader) {
+ Optional<KafkaUnboundedReader<?, ?>> reader) {
this.partitions = partitions;
this.reader = reader;
}
@@ -51,14 +52,12 @@ public class KafkaCheckpointMark implements
UnboundedSource.CheckpointMark {
}
@Override
- public void finalizeCheckpoint() throws IOException {
- if (reader != null) {
- // Is it ok to commit asynchronously, or should we wait till this (or
newer) is committed?
- // Often multiple marks would be finalized at once, since we only need
to finalize the latest,
- // it is better to wait a little while. Currently maximum is delay same
as KAFKA_POLL_TIMEOUT
- // in the reader (1 second).
- reader.finalizeCheckpointMarkAsync(this);
- }
+ public void finalizeCheckpoint() {
+ reader.ifPresent(r -> r.finalizeCheckpointMarkAsync(this));
+ // Is it ok to commit asynchronously, or should we wait till this (or
newer) is committed?
+ // Often multiple marks would be finalized at once, since we only need to
finalize the latest,
+ // it is better to wait a little while. Currently maximum delay is same as
KAFKA_POLL_TIMEOUT
+ // in the reader (1 second).
}
@Override
@@ -71,16 +70,20 @@ public class KafkaCheckpointMark implements
UnboundedSource.CheckpointMark {
* for a single partition.
*/
public static class PartitionMark implements Serializable {
+ private static final long MIN_WATERMARK_MILLIS =
BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+
private String topic;
private int partition;
private long nextOffset;
+ private long watermarkMillis = MIN_WATERMARK_MILLIS;
private PartitionMark() {} // for Avro
- public PartitionMark(String topic, int partition, long offset) {
+ public PartitionMark(String topic, int partition, long offset, long
watermarkMillis) {
this.topic = topic;
this.partition = partition;
this.nextOffset = offset;
+ this.watermarkMillis = watermarkMillis;
}
public String getTopic() {
@@ -95,12 +98,17 @@ public class KafkaCheckpointMark implements
UnboundedSource.CheckpointMark {
return nextOffset;
}
+ public long getWatermarkMillis() {
+ return watermarkMillis;
+ }
+
@Override
public String toString() {
return "PartitionMark{"
+ "topic='" + topic + '\''
+ ", partition=" + partition
+ ", nextOffset=" + nextOffset
+ + ", watermarkMillis=" + watermarkMillis
+ '}';
}
}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index bd8ac64..f031003 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
@@ -113,8 +114,9 @@ import org.slf4j.LoggerFactory;
* // settings for ConsumerConfig. e.g :
* .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes",
1024 * 1024))
*
- * // custom function for calculating record timestamp (default is
processing time)
- * .withTimestampFn(new MyTimestampFunction())
+ * // set event times and watermark based on LogAppendTime. To provide a
custom
+ * // policy see withTimestampPolicyFactory(). withProcessingTime() is
the default.
+ * .withLogAppendTime()
*
* // custom function for watermark (default is record timestamp)
* .withWatermarkFn(new MyWatermarkFunction())
@@ -122,7 +124,10 @@ import org.slf4j.LoggerFactory;
* // restrict reader to committed messages on Kafka (see method
documentation).
* .withReadCommitted()
*
- * // finally, if you don't need Kafka metadata, you can drop it
+ * // offset consumed by the pipeline can be committed back.
+ * .commitOffsetsInFinalize()
+ *
+ * // finally, if you don't need Kafka metadata, you can drop it.g
* .withoutMetadata() // PCollection<KV<Long, String>>
* )
* .apply(Values.<String>create()) // PCollection<String>
@@ -246,6 +251,7 @@ public class KafkaIO {
.setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
.setMaxNumRecords(Long.MAX_VALUE)
.setCommitOffsetsInFinalizeEnabled(false)
+ .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
.build();
}
@@ -281,7 +287,6 @@ public class KafkaIO {
@Nullable abstract Class<? extends Deserializer<V>> getValueDeserializer();
abstract SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>
getConsumerFactoryFn();
- @Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant>
getTimestampFn();
@Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant>
getWatermarkFn();
abstract long getMaxNumRecords();
@@ -290,6 +295,8 @@ public class KafkaIO {
@Nullable abstract Instant getStartReadTime();
abstract boolean isCommitOffsetsInFinalizeEnabled();
+ abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+
abstract Builder<K, V> toBuilder();
@AutoValue.Builder
@@ -304,12 +311,13 @@ public class KafkaIO {
Class<? extends Deserializer<V>> valueDeserializer);
abstract Builder<K, V> setConsumerFactoryFn(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
consumerFactoryFn);
- abstract Builder<K, V>
setTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
abstract Builder<K, V>
setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);
abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);
abstract Builder<K, V> setStartReadTime(Instant startReadTime);
abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean
commitOffsetInFinalize);
+ abstract Builder<K, V> setTimestampPolicyFactory(
+ TimestampPolicyFactory<K, V> timestampPolicyFactory);
abstract Read<K, V> build();
}
@@ -461,18 +469,69 @@ public class KafkaIO {
}
/**
+ * Sets {@link TimestampPolicy} to {@link
TimestampPolicyFactory.LogAppendTimePolicy}.
+ * The policy assigns Kafka's log append time (server side ingestion time)
+ * to each record. The watermark for each Kafka partition is the timestamp
of the last record
+ * read. If a partition is idle, the watermark advances to couple of
seconds behind wall time.
+ * Every record consumed from Kafka is expected to have its timestamp type
set to
+ * 'LOG_APPEND_TIME'.
+ *
+ * <p>In Kafka, log append time needs to be enabled for each topic, and
all the subsequent
+ * records wil have their timestamp set to log append time. If a record
does not have its
+ * timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp
is set to previous
+ * record timestamp or latest watermark, whichever is larger.
+ *
+ * <p>The watermark for the entire source is the oldest of each
partition's watermark.
+ * If one of the readers falls behind possibly due to uneven distribution
of records among
+ * Kafka partitions, it ends up holding the watermark for the entire
source.
+ */
+ public Read<K, V> withLogAppendTime(){
+ return
withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
+ }
+
+
+ /**
+ * Sets {@link TimestampPolicy} to {@link
TimestampPolicyFactory.ProcessingTimePolicy}.
+ * This is the default timestamp policy. It assigns processing time to
each record.
+ * Specifically, this is the timestamp when the record becomes 'current'
in the reader.
+ * The watermark aways advances to current time. If servicer side time
(log append time) is
+ * enabled in Kafka, {@link #withLogAppendTime()} is recommended over this.
+ */
+ public Read<K, V> withProcessingTime() {
+ return
withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+ }
+
+ /**
+ * Provide custom {@link TimestampPolicyFactory} to set event times and
watermark for each
+ * partition. {@link
TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}
+ * is invoked for each partition when the reader starts.
+ * @see #withLogAppendTime() and {@link #withProcessingTime()}
+ */
+ public Read<K, V> withTimestampPolicyFactory(
+ TimestampPolicyFactory<K, V> timestampPolicyFactory) {
+ return
toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
+ }
+
+ /**
* A function to assign a timestamp to a record. Default is processing
timestamp.
+ * @deprecated as of version 2.4.
+ * Use {@link #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
*/
+ @Deprecated
public Read<K, V> withTimestampFn2(
SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
checkArgument(timestampFn != null, "timestampFn can not be null");
- return toBuilder().setTimestampFn(timestampFn).build();
+ return toBuilder().setTimestampPolicyFactory(
+ TimestampPolicyFactory.withTimestampFn(timestampFn)).build();
}
/**
- * A function to calculate watermark after a record. Default is last
record timestamp
+ * A function to calculate watermark after a record. Default is last
record timestamp.
* @see #withTimestampFn(SerializableFunction)
+ * @deprecated as of version 2.4.
+ * Use {@link #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
*/
+ @Deprecated
public Read<K, V> withWatermarkFn2(
SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
checkArgument(watermarkFn != null, "watermarkFn can not be null");
@@ -481,16 +540,22 @@ public class KafkaIO {
/**
* A function to assign a timestamp to a record. Default is processing
timestamp.
+ * @deprecated as of version 2.4.
+ * Use {@link #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
*/
+ @Deprecated
public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant>
timestampFn) {
checkArgument(timestampFn != null, "timestampFn can not be null");
return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
}
/**
- * A function to calculate watermark after a record. Default is last
record timestamp
+ * A function to calculate watermark after a record. Default is last
record timestamp.
* @see #withTimestampFn(SerializableFunction)
+ * @deprecated as of version 2.4.
+ * Use {@link #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
*/
+ @Deprecated
public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant>
watermarkFn) {
checkArgument(watermarkFn != null, "watermarkFn can not be null");
return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
@@ -608,7 +673,7 @@ public class KafkaIO {
return new KafkaUnboundedSource<>(this, -1);
}
- // utility method to convert KafkRecord<K, V> to user KV<K, V> before
applying user functions
+ // utility method to convert KafkaRecord<K, V> to user KV<K, V> before
applying user functions
private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT,
ValueT>, OutT>
unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
return record -> fn.apply(record.getKV());
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
index 235fb1f..f4b1f1b 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
@@ -26,21 +26,26 @@ import org.apache.beam.sdk.values.KV;
* partition id, and offset).
*/
public class KafkaRecord<K, V> implements Serializable {
+ // This is based on {@link ConsumerRecord} received from Kafka Consumer.
+ // The primary difference is that this contains deserialized key and value,
and runtime
+ // Kafka version agnostic (e.g. Kafka version 0.9.x does not have timestamp
fields).
private final String topic;
private final int partition;
private final long offset;
private final KV<K, V> kv;
private final long timestamp;
+ private final KafkaTimestampType timestampType;
public KafkaRecord(
String topic,
int partition,
long offset,
long timestamp,
+ KafkaTimestampType timestampType,
K key,
V value) {
- this(topic, partition, offset, timestamp, KV.of(key, value));
+ this(topic, partition, offset, timestamp, timestampType, KV.of(key,
value));
}
public KafkaRecord(
@@ -48,14 +53,17 @@ public class KafkaRecord<K, V> implements Serializable {
int partition,
long offset,
long timestamp,
+ KafkaTimestampType timestampType,
KV<K, V> kv) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
+ this.timestampType = timestampType;
this.kv = kv;
}
+
public String getTopic() {
return topic;
}
@@ -76,6 +84,10 @@ public class KafkaRecord<K, V> implements Serializable {
return timestamp;
}
+ public KafkaTimestampType getTimestampType() {
+ return timestampType;
+ }
+
@Override
public int hashCode() {
return Arrays.deepHashCode(new Object[]{topic, partition, offset,
timestamp, kv});
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 577fdee..cb3b953 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -54,6 +54,7 @@ public class KafkaRecordCoder<K, V> extends
StructuredCoder<KafkaRecord<K, V>> {
intCoder.encode(value.getPartition(), outStream);
longCoder.encode(value.getOffset(), outStream);
longCoder.encode(value.getTimestamp(), outStream);
+ intCoder.encode(value.getTimestampType().ordinal(), outStream);
kvCoder.encode(value.getKV(), outStream);
}
@@ -64,6 +65,7 @@ public class KafkaRecordCoder<K, V> extends
StructuredCoder<KafkaRecord<K, V>> {
intCoder.decode(inStream),
longCoder.decode(inStream),
longCoder.decode(inStream),
+ KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
kvCoder.decode(inStream));
}
@@ -94,6 +96,7 @@ public class KafkaRecordCoder<K, V> extends
StructuredCoder<KafkaRecord<K, V>> {
value.getPartition(),
value.getOffset(),
value.getTimestamp(),
+ value.getTimestampType(),
(KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
}
}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaTimestampType.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaTimestampType.java
new file mode 100644
index 0000000..e4b9346
--- /dev/null
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaTimestampType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF E 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link
org.apache.kafka.common.record.TimestampType}. Included
+ * here in order to support older Kafka versions (0.9.x).
+ */
+public enum KafkaTimestampType {
+ NO_TIMESTAMP_TYPE(-1, "NoTimestampType"),
+ CREATE_TIME(0, "CreateTime"),
+ LOG_APPEND_TIME(1, "LogAppendTime");
+
+ public final int id;
+ public final String name;
+
+ KafkaTimestampType(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ public static KafkaTimestampType forOrdinal(int ordinal) {
+ return values()[ordinal];
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index e830b4c..533c8b3 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -2,8 +2,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
+ * regarding copyright ownership. The ASF E 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
@@ -24,13 +23,16 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.io.Closeables;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -135,10 +137,25 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
Map<String, Object> offsetConsumerConfig = new
HashMap<>(spec.getConsumerConfig());
offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ // Force read isolation level to 'read_uncommitted' for offset consumer.
This consumer
+ // fetches latest offset for two reasons : (a) to calculate backlog
(number of records
+ // yet to be consumed) (b) to advance watermark if the backlog is zero.
The right thing to do
+ // for (a) is to leave this config unchanged from the main config (i.e. if
there are records
+ // that can't be read because of uncommitted records before them, they
shouldn't
+ // ideally count towards backlog when "read_committed" is enabled. But (b)
+ // requires finding out if there are any records left to be read
(committed or uncommitted).
+ // Rather than using two separate consumers we will go with better support
for (b). If we do
+ // hit a case where a lot of records are not readable (due to some stuck
transactions), the
+ // pipeline would report more backlog, but would not be able to consume
it. It might be ok
+ // since CPU consumed on the workers would be low and will likely avoid
unnecessary upscale.
+ offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions());
+ // Fetch offsets once before running periodically.
+ updateLatestOffsets();
+
offsetFetcherThread.scheduleAtFixedRate(
this::updateLatestOffsets, 0, OFFSET_UPDATE_INTERVAL_SECONDS,
TimeUnit.SECONDS);
@@ -156,10 +173,7 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
*/
while (true) {
if (curBatch.hasNext()) {
- PartitionState pState = curBatch.next();
-
- elementsRead.inc();
- elementsReadBySplit.inc();
+ PartitionState<K, V> pState = curBatch.next();
if (!pState.recordIter.hasNext()) { // -- (c)
pState.recordIter = Collections.emptyIterator(); // drop ref
@@ -167,6 +181,9 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
continue;
}
+ elementsRead.inc();
+ elementsReadBySplit.inc();
+
ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
long expected = pState.nextOffset;
long offset = rawRecord.offset();
@@ -194,11 +211,12 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
rawRecord.partition(),
rawRecord.offset(),
consumerSpEL.getRecordTimestamp(rawRecord),
+ consumerSpEL.getRecordTimestamptType(rawRecord),
keyDeserializerInstance.deserialize(rawRecord.topic(),
rawRecord.key()),
valueDeserializerInstance.deserialize(rawRecord.topic(),
rawRecord.value()));
- curTimestamp = (source.getSpec().getTimestampFn() == null)
- ? Instant.now() : source.getSpec().getTimestampFn().apply(record);
+ curTimestamp = pState.timestampPolicy
+ .getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
curRecord = record;
int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length)
@@ -220,46 +238,56 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
@Override
public Instant getWatermark() {
- if (curRecord == null) {
- LOG.debug("{}: getWatermark() : no records have been read yet.", name);
- return initialWatermark;
+
+ if (source.getSpec().getWatermarkFn() != null) {
+ // Support old API which requires a KafkaRecord to invoke watermarkFn.
+ if (curRecord == null) {
+ LOG.debug("{}: getWatermark() : no records have been read yet.", name);
+ return initialWatermark;
+ }
+ return source.getSpec().getWatermarkFn().apply(curRecord);
}
- return source.getSpec().getWatermarkFn() != null
- ? source.getSpec().getWatermarkFn().apply(curRecord) : curTimestamp;
+ // Return minimum watermark among partitions.
+ return partitionStates
+ .stream()
+ .map(PartitionState::updateAndGetWatermark)
+ .min(Comparator.naturalOrder())
+ .get();
}
@Override
public CheckpointMark getCheckpointMark() {
reportBacklog();
return new KafkaCheckpointMark(
- partitionStates.stream()
- .map((p) -> new PartitionMark(p.topicPartition.topic(),
- p.topicPartition.partition(),
- p.nextOffset))
- .collect(Collectors.toList()),
- source.getSpec().isCommitOffsetsInFinalizeEnabled() ? this : null
+ partitionStates.stream()
+ .map(p -> new PartitionMark(p.topicPartition.topic(),
+ p.topicPartition.partition(),
+ p.nextOffset,
+ p.lastWatermark.getMillis()))
+ .collect(Collectors.toList()),
+ source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this)
: Optional.empty()
);
}
@Override
- public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
+ public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource () {
return source;
}
@Override
- public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
+ public KafkaRecord<K, V> getCurrent () throws NoSuchElementException {
// should we delay updating consumed offset till this point? Mostly not
required.
return curRecord;
}
@Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
+ public Instant getCurrentTimestamp () throws NoSuchElementException {
return curTimestamp;
}
@Override
- public long getSplitBacklogBytes() {
+ public long getSplitBacklogBytes () {
long backlogBytes = 0;
for (PartitionState p : partitionStates) {
@@ -287,10 +315,10 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
private final KafkaUnboundedSource<K, V> source;
private final String name;
private Consumer<byte[], byte[]> consumer;
- private final List<PartitionState> partitionStates;
+ private final List<PartitionState<K, V>> partitionStates;
private KafkaRecord<K, V> curRecord;
private Instant curTimestamp;
- private Iterator<PartitionState> curBatch = Collections.emptyIterator();
+ private Iterator<PartitionState<K, V>> curBatch =
Collections.emptyIterator();
private Deserializer<K> keyDeserializerInstance = null;
private Deserializer<V> valueDeserializerInstance = null;
@@ -339,9 +367,9 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
private Consumer<byte[], byte[]> offsetConsumer;
private final ScheduledExecutorService offsetFetcherThread =
Executors.newSingleThreadScheduledExecutor();
- private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
+ private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1;
- private static final long UNINITIALIZED_OFFSET = -1;
+ static final long UNINITIALIZED_OFFSET = -1;
//Add SpEL instance to cover the interface difference of Kafka client
private transient ConsumerSpEL consumerSpEL;
@@ -370,20 +398,50 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
}
}
+ private static class TimestampPolicyContext extends
TimestampPolicy.PartitionContext {
+
+ private final long messageBacklog;
+ private final Instant backlogCheckTime;
+
+ TimestampPolicyContext(long messageBacklog, Instant backlogCheckTime) {
+ this.messageBacklog = messageBacklog;
+ this.backlogCheckTime = backlogCheckTime;
+ }
+
+ @Override
+ public long getMessageBacklog() {
+ return messageBacklog;
+ }
+
+ @Override
+ public Instant getBacklogCheckTime() {
+ return backlogCheckTime;
+ }
+ }
+
// maintains state of each assigned partition (buffered records, consumed
offset, etc)
- private static class PartitionState {
+ private static class PartitionState<K, V> {
private final TopicPartition topicPartition;
private long nextOffset;
private long latestOffset;
+ private Instant latestOffsetFetchTime;
+ private Instant lastWatermark; // As returned by timestampPolicy
+ private final TimestampPolicy<K, V> timestampPolicy;
+
private Iterator<ConsumerRecord<byte[], byte[]>> recordIter =
Collections.emptyIterator();
private MovingAvg avgRecordSize = new MovingAvg();
private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log
compaction is enabled.
- PartitionState(TopicPartition partition, long nextOffset) {
+
+ PartitionState(TopicPartition partition, long nextOffset,
+ TimestampPolicy<K, V> timestampPolicy) {
this.topicPartition = partition;
this.nextOffset = nextOffset;
this.latestOffset = UNINITIALIZED_OFFSET;
+ this.latestOffsetFetchTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ this.lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ this.timestampPolicy = timestampPolicy;
}
// Update consumedOffset, avgRecordSize, and avgOffsetGap
@@ -395,8 +453,11 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
avgOffsetGap.update(offsetGap);
}
- synchronized void setLatestOffset(long latestOffset) {
+ synchronized void setLatestOffset(long latestOffset, Instant fetchTime) {
this.latestOffset = latestOffset;
+ this.latestOffsetFetchTime = fetchTime;
+ LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg
record size {})",
+ this, topicPartition, latestOffset, nextOffset, avgRecordSize);
}
synchronized long approxBacklogInBytes() {
@@ -415,6 +476,15 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
double remaining = (latestOffset - nextOffset) / (1 +
avgOffsetGap.get());
return Math.max(0, (long) Math.ceil(remaining));
}
+
+ synchronized TimestampPolicyContext mkTimestampPolicyContext() {
+ return new TimestampPolicyContext(backlogMessageCount(),
latestOffsetFetchTime);
+ }
+
+ Instant updateAndGetWatermark() {
+ lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext());
+ return lastWatermark;
+ }
}
KafkaUnboundedReader(
@@ -425,35 +495,40 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
this.name = "Reader-" + source.getId();
List<TopicPartition> partitions = source.getSpec().getTopicPartitions();
- partitionStates =
- ImmutableList.copyOf(
- partitions
- .stream()
- .map(tp -> new PartitionState(tp, UNINITIALIZED_OFFSET))
- .collect(Collectors.toList()));
+ List<PartitionState<K, V>> states = new ArrayList<>(partitions.size());
if (checkpointMark != null) {
- // a) verify that assigned and check-pointed partitions match exactly
- // b) set consumed offsets
-
checkState(checkpointMark.getPartitions().size() == partitions.size(),
- "checkPointMark and assignedPartitions should match");
+ "checkPointMark and assignedPartitions should match");
+ }
+
+ for (int i = 0; i < partitions.size(); i++) {
+ TopicPartition tp = partitions.get(i);
+ long nextOffset = UNINITIALIZED_OFFSET;
+ Optional<Instant> prevWatermark = Optional.empty();
+
+ if (checkpointMark != null) {
+ // Verify that assigned and check-pointed partitions match exactly and
set next offset.
- for (int i = 0; i < partitions.size(); i++) {
PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
- TopicPartition assigned = partitions.get(i);
+
TopicPartition partition = new TopicPartition(ckptMark.getTopic(),
ckptMark.getPartition());
- checkState(partition.equals(assigned),
+ checkState(partition.equals(tp),
"checkpointed partition %s and assigned partition %s don't
match",
- partition, assigned);
-
- partitionStates.get(i).nextOffset = ckptMark.getNextOffset();
+ partition, tp);
+ nextOffset = ckptMark.getNextOffset();
+ prevWatermark = Optional.of(new
Instant(ckptMark.getWatermarkMillis()));
}
+
+ states.add(new PartitionState<>(
+ tp, nextOffset,
+
source.getSpec().getTimestampPolicyFactory().createTimestampPolicy(tp,
prevWatermark)));
}
- String splitId = String.valueOf(source.getId());
+ partitionStates = ImmutableList.copyOf(states);
+ String splitId = String.valueOf(source.getId());
elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId);
bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId);
backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId);
@@ -489,6 +564,8 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
}
private void commitCheckpointMark(KafkaCheckpointMark checkpointMark) {
+ LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
+
consumer.commitSync(
checkpointMark
.getPartitions()
@@ -533,17 +610,10 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
return;
}
- List<PartitionState> nonEmpty = new LinkedList<>();
-
- for (PartitionState p : partitionStates) {
- p.recordIter = records.records(p.topicPartition).iterator();
- if (p.recordIter.hasNext()) {
- nonEmpty.add(p);
- }
- }
+ partitionStates.forEach(p -> p.recordIter =
records.records(p.topicPartition).iterator());
// cycle through the partitions in order to interleave records from each.
- curBatch = Iterators.cycle(nonEmpty);
+ curBatch = Iterators.cycle(new LinkedList<>(partitionStates));
}
private void setupInitialOffset(PartitionState pState) {
@@ -567,29 +637,23 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
}
}
- // update latest offset for each partition.
- // called from offsetFetcher thread
+ // Update latest offset for each partition.
+ // Called from setupInitialOffset() at the start and then periodically from
offsetFetcher thread.
private void updateLatestOffsets() {
for (PartitionState p : partitionStates) {
try {
- // If "read_committed" is enabled in the config, this seeks to 'Last
Stable Offset'.
- // As a result uncommitted messages are not counted in backlog. It is
correct since
- // the reader can not read them anyway.
+ Instant fetchTime = Instant.now();
consumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition);
long offset = offsetConsumer.position(p.topicPartition);
- p.setLatestOffset(offset);
+ p.setLatestOffset(offset, fetchTime);
} catch (Exception e) {
- // An exception is expected if we've closed the reader in another
thread. Ignore and exit.
- if (closed.get()) {
+ if (closed.get()) { // Ignore the exception if the reader is closed.
break;
}
LOG.warn("{}: exception while fetching latest offset for partition {}.
will be retried.",
this, p.topicPartition, e);
- p.setLatestOffset(UNINITIALIZED_OFFSET); // reset
+ // Don't update the latest offset.
}
-
- LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg
record size {})",
- this, p.topicPartition, p.latestOffset, p.nextOffset,
p.avgRecordSize);
}
LOG.debug("{}: backlog {}", this, getSplitBacklogBytes());
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java
new file mode 100644
index 0000000..24a635f
--- /dev/null
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.joda.time.Instant;
+
+/**
+ * A timestamp policy to assign event time for messages in a Kafka partition
and watermark for it.
+ * KafkaIO reader creates one policy using {@link TimestampPolicyFactory} for
each each of the
+ * partitions it reads from. See @{@link
TimestampPolicyFactory.LogAppendTimePolicy} for example
+ * of a policy.
+ */
+public abstract class TimestampPolicy<K, V> {
+
+ /**
+ * The context contains state maintained in the reader for the partition.
Available with
+ * each of the methods in @{@link TimestampPolicy}.
+ */
+ public abstract static class PartitionContext {
+ /**
+ * Current backlog in messages
+ * (latest offset of the partition - last processed record offset).
+ */
+ public abstract long getMessageBacklog();
+
+ /**
+ * The time at which latest offset for the partition was fetched in order
to calculate
+ * backlog. The reader periodically polls for latest offsets. This
timestamp
+ * is useful in advancing watermark for idle partitions as in
+ * {@link TimestampPolicyFactory.LogAppendTimePolicy}.
+ */
+ public abstract Instant getBacklogCheckTime();
+ }
+
+ /**
+ * Returns record timestamp (aka event time). This is often based on the
timestamp
+ * of the Kafka record. This is invoked for each record when it is processed
in the reader.
+ */
+ public abstract Instant getTimestampForRecord(PartitionContext ctx,
KafkaRecord<K, V> record);
+
+ /**
+ * Returns watermark for the partition. It is the timestamp before or at the
timestamps of all
+ * future records consumed from the partition.
+ * See {@link UnboundedSource.UnboundedReader#getWatermark()} for more
guidance on watermarks.
+ * E.g. if the record timestamp is 'LogAppendTime', watermark would be the
timestamp of the last
+ * record since 'LogAppendTime' monotonically increases within a partition.
+ */
+ public abstract Instant getWatermark(PartitionContext ctx);
+
+ // It is useful to let TimestampPolicy store its state in checkpointMark.
+ // We need to add a getCheckpointMark() here for that.
+}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
new file mode 100644
index 0000000..d84bfe8
--- /dev/null
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF E 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.io.Serializable;
+import java.util.Optional;
+import org.apache.beam.sdk.io.kafka.TimestampPolicy.PartitionContext;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * An extendable factory to create a {@link TimestampPolicy} for each
partition at runtime by
+ * KafkaIO reader. Subclasses implement {@link #createTimestampPolicy}, which
is invoked by
+ * the the reader while starting or resuming from a checkpoint. Two commonly
used policies are
+ * provided. See {@link #withLogAppendTime()} and {@link
#withProcessingTime()}.
+ */
+public abstract class TimestampPolicyFactory<KeyT, ValueT> implements
Serializable {
+
+ /**
+ * Creates a TimestampPolicy for a partition. This is invoked by the reader
at the start or while
+ * resuming from previous checkpoint.
+ *
+ * @param tp The returned policy applies to records from this {@link
TopicPartition}.
+ * @param previousWatermark The latest check-pointed watermark. This is set
when the reader
+ * is resuming from a checkpoint. This is a good value to return
by implementations
+ * of {@link TimestampPolicy#getWatermark(PartitionContext)} until
a better watermark
+ * can be established as more records are read.
+ * @return
+ */
+ public abstract TimestampPolicy<KeyT, ValueT> createTimestampPolicy(
+ TopicPartition tp, Optional<Instant> previousWatermark);
+
+ /**
+ * A {@link TimestampPolicy} that assigns processing time to each record.
+ * Specifically, this is the timestamp when the record becomes 'current' in
the reader.
+ * The watermark aways advances to current time.
+ */
+ public static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {
+ return new TimestampPolicyFactory<K, V>() {
+ @Override
+ public TimestampPolicy<K, V>
+ createTimestampPolicy(TopicPartition tp, Optional<Instant>
previousWatermark) {
+ return new ProcessingTimePolicy<>();
+ }
+ };
+ }
+
+ /**
+ * A {@link TimestampPolicy} that assigns Kafka's log append time (server
side ingestion time)
+ * to each record. The watermark for each Kafka partition is the timestamp
of the last record
+ * read. If a partition is idle, the watermark advances roughly to 'current
time - 2 seconds'.
+ * See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
+ */
+ public static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {
+ //return (tp, previousWatermark) -> new
LogAppendTimePolicy<>(previousWatermark);
+ return new TimestampPolicyFactory<K, V>() {
+ @Override
+ public TimestampPolicy<K, V>
+ createTimestampPolicy(TopicPartition tp, Optional<Instant>
previousWatermark) {
+ return new LogAppendTimePolicy<>(previousWatermark);
+ }
+ };
+ }
+
+ /*
+ * TODO
+ * Provide a another built in implementation where the watermark is based on
all the timestamps
+ * seen in last 1 minute of wall clock time (this duration could be
configurable). This is
+ * similar to watermark set by PubsubIO.
+ *
+ * public static <K, V> TimestampPolicyFactory<K, V> withCreateTime() {
+ * return withCustomTypestamp(...);
+ * }
+ *
+ * public static <K, V> TimestampPolicyFactory<K, V> withCustomTimestamp() {
+ * }
+ */
+
+ /**
+ * Used by the Read transform to support old timestamp functions API.
+ */
+ static <K, V> TimestampPolicyFactory<K, V> withTimestampFn(
+ final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
+
+ return new TimestampPolicyFactory<K, V>() {
+ @Override
+ public TimestampPolicy<K, V> createTimestampPolicy(TopicPartition tp,
+ Optional<Instant>
previousWatermark) {
+ return new TimestampFnPolicy<>(timestampFn, previousWatermark);
+ }
+ };
+ }
+
+ /**
+ * A simple policy that uses current time for event time and watermark. This
should be used
+ * when better timestamps like LogAppendTime are not available for a topic.
+ */
+ public static class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V>
{
+
+ @Override
+ public Instant getTimestampForRecord(PartitionContext context,
KafkaRecord<K, V> record) {
+ return Instant.now();
+ }
+
+ @Override
+ public Instant getWatermark(PartitionContext context) {
+ return Instant.now();
+ }
+ }
+
+ /**
+ * Assigns Kafka's log append time (server side ingestion time)
+ * to each record. The watermark for each Kafka partition is the timestamp
of the last record
+ * read. If a partition is idle, the watermark advances roughly to 'current
time - 2 seconds'.
+ * See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
+ */
+ public static class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> {
+
+ /**
+ * When a partition is idle or caught up (i.e. backlog is zero), we
advance the watermark
+ * to near realtime. Kafka server does not have an API to provide server
side current
+ * timestamp which could ensure minimum LogAppendTime for future records.
+ * The best we could do is to advance the watermark to
+ * 'last backlog check time - small delta to account for any internal
buffering in Kafka'.
+ * Using 2 seconds for this delta. Should this be user configurable?
+ */
+ private static final Duration IDLE_WATERMARK_DELTA =
Duration.standardSeconds(2);
+
+ protected Instant currentWatermark;
+
+ public LogAppendTimePolicy(Optional<Instant> previousWatermark) {
+ currentWatermark =
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
+ }
+
+ @Override
+ public Instant getTimestampForRecord(PartitionContext context,
KafkaRecord<K, V> record) {
+ if
(record.getTimestampType().equals(KafkaTimestampType.LOG_APPEND_TIME)) {
+ currentWatermark = new Instant(record.getTimestamp());
+ } else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+ // This is the first record and it does not have LOG_APPEND_TIME.
+ // Most likely the topic is not configured correctly.
+ throw new IllegalStateException(String.format(
+ "LogAppendTimePolicy policy is enabled in reader, but Kafka record's
timestamp type "
+ + "is LogAppendTime. Most likely it is not enabled on Kafka for the
topic '%s'. "
+ + "Actual timestamp type is '%s'.", record.getTopic(),
record.getTimestampType()));
+ }
+ return currentWatermark;
+ }
+
+ @Override
+ public Instant getWatermark(PartitionContext context) {
+ if (context.getMessageBacklog() == 0) {
+ // The reader is caught up. May need to advance the watermark.
+ Instant idleWatermark =
context.getBacklogCheckTime().minus(IDLE_WATERMARK_DELTA);
+ if (idleWatermark.isAfter(currentWatermark)) {
+ currentWatermark = idleWatermark;
+ }
+ } // else, there is backlog (or is unknown). Do not advance the
watermark.
+ return currentWatermark;
+ }
+ }
+
+ /**
+ * Internal policy to support deprecated withTimestampFn API. It returns
last record
+ * timestamp for watermark!.
+ */
+ private static class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> {
+
+ final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+ Instant lastRecordTimestamp;
+
+ TimestampFnPolicy(SerializableFunction<KafkaRecord<K, V>, Instant>
timestampFn,
+ Optional<Instant> previousWatermark) {
+ this.timestampFn = timestampFn;
+ lastRecordTimestamp =
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
+ }
+
+ @Override
+ public Instant getTimestampForRecord(PartitionContext context,
KafkaRecord<K, V> record) {
+ lastRecordTimestamp = timestampFn.apply(record);
+ return lastRecordTimestamp;
+ }
+
+ @Override
+ public Instant getWatermark(PartitionContext context) {
+ return lastRecordTimestamp;
+ }
+ }
+}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index ebdd1da..adeebe6 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -41,6 +41,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -77,16 +78,21 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -99,6 +105,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -110,6 +117,7 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableWithSize;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
@@ -142,6 +150,8 @@ public class KafkaIOTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
+ private static final Instant LOG_APPEND_START_TIME = new Instant(600 * 1000);
+
// Update mock consumer with records distributed among the given topics,
each with given number
// of partitions. Records are assigned in round-robin order among the
partitions.
private static MockConsumer<byte[], byte[]> mkMockConsumer(
@@ -166,17 +176,22 @@ public class KafkaIOTest {
int numPartitions = partitions.size();
final long[] offsets = new long[numPartitions];
+
for (int i = 0; i < numElements; i++) {
int pIdx = i % numPartitions;
TopicPartition tp = partitions.get(pIdx);
+ byte[] key = ByteBuffer.wrap(new byte[4]).putInt(i).array(); // key
is 4 byte record id
+ byte[] value = ByteBuffer.wrap(new byte[8]).putLong(i).array(); //
value is 8 byte record id
+
records.get(tp).add(
new ConsumerRecord<>(
tp.topic(),
tp.partition(),
offsets[pIdx]++,
- ByteBuffer.wrap(new byte[4]).putInt(i).array(), // key is 4
byte record id
- ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8
byte record id
+
LOG_APPEND_START_TIME.plus(Duration.standardSeconds(i)).getMillis(),
+ TimestampType.LOG_APPEND_TIME,
+ 0, key.length, value.length, key, value));
}
// This is updated when reader assigns partitions.
@@ -250,10 +265,10 @@ public class KafkaIOTest {
private final int numElements;
private final OffsetResetStrategy offsetResetStrategy;
- public ConsumerFactoryFn(List<String> topics,
- int partitionsPerTopic,
- int numElements,
- OffsetResetStrategy offsetResetStrategy) {
+ ConsumerFactoryFn(List<String> topics,
+ int partitionsPerTopic,
+ int numElements,
+ OffsetResetStrategy offsetResetStrategy) {
this.topics = topics;
this.partitionsPerTopic = partitionsPerTopic;
this.numElements = numElements;
@@ -302,7 +317,7 @@ public class KafkaIOTest {
private static class AssertMultipleOf implements
SerializableFunction<Iterable<Long>, Void> {
private final int num;
- public AssertMultipleOf(int num) {
+ AssertMultipleOf(int num) {
this.num = num;
}
@@ -456,9 +471,113 @@ public class KafkaIOTest {
p.run();
}
+ @Test
+ public void testUnboundedSourceLogAppendTimestamps() {
+ // LogAppendTime (server side timestamp) for records is set based on
record index
+ // in MockConsumer above. Ensure that those exact timestamps are set by
the source.
+ int numElements = 1000;
+
+ PCollection<Long> input =
+ p.apply(mkKafkaReadTransform(numElements, null)
+ .withLogAppendTime()
+ .withoutMetadata())
+ .apply(Values.create());
+
+ addCountingAsserts(input, numElements);
+
+ PCollection<Long> diffs =
+ input
+ .apply(MapElements.into(TypeDescriptors.longs()).via(t ->
+ LOG_APPEND_START_TIME.plus(Duration.standardSeconds(t)).getMillis()))
+ .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+ .apply("DistinctTimestamps", Distinct.create());
+
+ // This assert also confirms that diff only has one unique value.
+ PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+ p.run();
+ }
+
+ // Returns TIMESTAMP_MAX_VALUE for watermark when all the records are read
from a partition.
+ static class TimestampPolicyWithEndOfSource<K, V> extends
TimestampPolicyFactory<K, V> {
+ private final long maxOffset;
+
+ TimestampPolicyWithEndOfSource(long maxOffset) {
+ this.maxOffset = maxOffset;
+ }
+
+ @Override
+ public TimestampPolicy<K, V> createTimestampPolicy(
+ TopicPartition tp, Optional<Instant> previousWatermark) {
+ return new TimestampPolicy<K, V>() {
+ long lastOffset = 0;
+ Instant lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ @Override
+ public Instant getTimestampForRecord(PartitionContext ctx,
KafkaRecord<K, V> record) {
+ lastOffset = record.getOffset();
+ lastTimestamp = new Instant(record.getTimestamp());
+ return lastTimestamp;
+ }
+
+ @Override
+ public Instant getWatermark(PartitionContext ctx) {
+ if (lastOffset < maxOffset) {
+ return lastTimestamp;
+ } else { // EOF
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testUnboundedSourceWithoutBoundedWrapper() {
+ // Most of the tests in this file set 'maxNumRecords' on the source, which
wraps
+ // the unbounded source in a bounded source. As a result, the test
pipeline run as
+ // bounded/batch pipelines under direct-runner.
+ // This is same as testUnboundedSource() without the BoundedSource wrapper.
+
+ final int numElements = 1000;
+ final int numPartitions = 10;
+ String topic = "testUnboundedSourceWithoutBoundedWrapper";
+
+ KafkaIO.Read<byte[], Long> reader = KafkaIO.<byte[], Long>read()
+ .withBootstrapServers(topic)
+ .withTopic(topic)
+ .withConsumerFactoryFn(new ConsumerFactoryFn(
+ ImmutableList.of(topic), numPartitions, numElements,
OffsetResetStrategy.EARLIEST))
+ .withKeyDeserializer(ByteArrayDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
+ .withTimestampPolicyFactory(
+ new TimestampPolicyWithEndOfSource<>(numElements / numPartitions - 1));
+
+ PCollection <Long> input =
+ p.apply("readFromKafka", reader.withoutMetadata())
+ .apply(Values.create())
+ .apply(Window.into(FixedWindows.of(Duration.standardDays(100))));
+
+ PipelineResult result = p.run();
+
+ MetricName elementsRead = SourceMetrics.elementsRead().getName();
+
+ MetricQueryResults metrics = result.metrics().queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(MetricNameFilter.inNamespace(elementsRead.namespace()))
+ .build());
+
+ assertThat(metrics.counters(), hasItem(
+ attemptedMetricsResult(
+ elementsRead.namespace(),
+ elementsRead.name(),
+ "readFromKafka",
+ Long.valueOf(numElements))));
+ }
+
private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K,
V>, KV<K, V>> {
@ProcessElement
- public void processElement(ProcessContext ctx) throws Exception {
+ public void processElement(ProcessContext ctx) {
ctx.output(ctx.element().getKV());
}
}
@@ -631,8 +750,7 @@ public class KafkaIOTest {
p.apply(readStep,
mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
- .updateConsumerProperties(ImmutableMap.<String,
Object>of(ConsumerConfig.GROUP_ID_CONFIG,
-
"test.group"))
+
.updateConsumerProperties(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
"test.group"))
.commitOffsetsInFinalize()
.withoutMetadata());
@@ -1070,7 +1188,6 @@ public class KafkaIOTest {
.addNameFilter(MetricNameFilter.inNamespace(elementsWritten.namespace()))
.build());
-
assertThat(metrics.counters(), hasItem(
attemptedMetricsResult(
elementsWritten.namespace(),
@@ -1182,11 +1299,13 @@ public class KafkaIOTest {
// Make sure the config is correctly set up for serializers.
Utils.newInstance(
- ((Class<?>)
config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
- .asSubclass(Serializer.class)
+ (Class<? extends Serializer<?>>)
+ ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+ .asSubclass(Serializer.class)
).configure(config, true);
Utils.newInstance(
+ (Class<? extends Serializer<?>>)
((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.asSubclass(Serializer.class)
).configure(config, false);
--
To stop receiving notification emails like this one, please contact
[email protected].