This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f84beb  [BEAM-591] KafkaIO : Improve watermarks and support server 
side timestamps (#4680)
9f84beb is described below

commit 9f84bebc4b602551b2c719702cbb4dbab7c5b258
Author: Raghu Angadi <rang...@apache.org>
AuthorDate: Fri Feb 23 16:42:18 2018 -0800

    [BEAM-591] KafkaIO : Improve watermarks and support server side timestamps 
(#4680)
    
    * Redesign how timestamps and watermarks are handled in KafkaIO.
     - Added TimestampPolicy that provides both record timestamps and
     watermarks.
     - built in policies for 'LogAppendTime' (server-time) ProcessingTime
     (default)
     - Ensure idle partitions don't hold watermark back
     - deprecated previous API to set functions for custom timestamps and
     watermarks.
    
    * minor
---
 .../org/apache/beam/sdk/io/kafka/ConsumerSpEL.java |  20 +-
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java     |  32 ++--
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  83 ++++++++-
 .../org/apache/beam/sdk/io/kafka/KafkaRecord.java  |  14 +-
 .../apache/beam/sdk/io/kafka/KafkaRecordCoder.java |   3 +
 .../beam/sdk/io/kafka/KafkaTimestampType.java      |  44 +++++
 .../beam/sdk/io/kafka/KafkaUnboundedReader.java    | 200 +++++++++++++-------
 .../apache/beam/sdk/io/kafka/TimestampPolicy.java  |  68 +++++++
 .../beam/sdk/io/kafka/TimestampPolicyFactory.java  | 206 +++++++++++++++++++++
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 145 +++++++++++++--
 10 files changed, 706 insertions(+), 109 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index a3bd439..f615ad6 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -80,10 +80,10 @@ class ConsumerSpEL {
     }
   }
 
-  public void evaluateSeek2End(Consumer consumer, TopicPartition 
topicPartitions) {
+  public void evaluateSeek2End(Consumer consumer, TopicPartition 
topicPartition) {
     StandardEvaluationContext mapContext = new StandardEvaluationContext();
     mapContext.setVariable("consumer", consumer);
-    mapContext.setVariable("tp", topicPartitions);
+    mapContext.setVariable("tp", topicPartition);
     seek2endExpression.getValue(mapContext);
   }
 
@@ -95,11 +95,19 @@ class ConsumerSpEL {
   }
 
   public long getRecordTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) {
-    long timestamp;
-    if (!hasRecordTimestamp || (timestamp = rawRecord.timestamp()) <= 0L) {
-      timestamp = System.currentTimeMillis();
+    if (hasRecordTimestamp) {
+      return rawRecord.timestamp();
+    }
+    return -1L; // This is the timestamp used in Kafka for older messages 
without timestamps.
+  }
+
+  public KafkaTimestampType getRecordTimestamptType(
+    ConsumerRecord<byte[], byte[]> rawRecord) {
+    if (hasRecordTimestamp) {
+      return 
KafkaTimestampType.forOrdinal(rawRecord.timestampType().ordinal());
+    } else {
+      return KafkaTimestampType.NO_TIMESTAMP_TYPE;
     }
-    return timestamp;
   }
 
   public boolean hasOffsetsForTimes() {
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 791e594..95ec7ca 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -18,13 +18,14 @@
 package org.apache.beam.sdk.io.kafka;
 
 import com.google.common.base.Joiner;
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.Optional;
 import org.apache.avro.reflect.AvroIgnore;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 
 /**
  * Checkpoint for a {@link KafkaUnboundedReader}. Consists of Kafka topic 
name, partition id,
@@ -36,12 +37,12 @@ public class KafkaCheckpointMark implements 
UnboundedSource.CheckpointMark {
   private List<PartitionMark> partitions;
 
   @AvroIgnore
-  private KafkaUnboundedReader<?, ?> reader; // Non-null when offsets need to 
be committed.
+  private Optional<KafkaUnboundedReader<?, ?>> reader; // Present when offsets 
need to be committed.
 
   private KafkaCheckpointMark() {} // for Avro
 
   public KafkaCheckpointMark(List<PartitionMark> partitions,
-                             KafkaUnboundedReader<?, ?> reader) {
+                             Optional<KafkaUnboundedReader<?, ?>> reader) {
     this.partitions = partitions;
     this.reader = reader;
   }
@@ -51,14 +52,12 @@ public class KafkaCheckpointMark implements 
UnboundedSource.CheckpointMark {
   }
 
   @Override
-  public void finalizeCheckpoint() throws IOException {
-    if (reader != null) {
-      // Is it ok to commit asynchronously, or should we wait till this (or 
newer) is committed?
-      // Often multiple marks would be finalized at once, since we only need 
to finalize the latest,
-      // it is better to wait a little while. Currently maximum is delay same 
as KAFKA_POLL_TIMEOUT
-      // in the reader (1 second).
-      reader.finalizeCheckpointMarkAsync(this);
-    }
+  public void finalizeCheckpoint() {
+    reader.ifPresent(r -> r.finalizeCheckpointMarkAsync(this));
+    // Is it ok to commit asynchronously, or should we wait till this (or 
newer) is committed?
+    // Often multiple marks would be finalized at once, since we only need to 
finalize the latest,
+    // it is better to wait a little while. Currently maximum delay is same as 
KAFKA_POLL_TIMEOUT
+    // in the reader (1 second).
   }
 
   @Override
@@ -71,16 +70,20 @@ public class KafkaCheckpointMark implements 
UnboundedSource.CheckpointMark {
    * for a single partition.
    */
   public static class PartitionMark implements Serializable {
+    private static final long MIN_WATERMARK_MILLIS = 
BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+
     private String topic;
     private int partition;
     private long nextOffset;
+    private long watermarkMillis = MIN_WATERMARK_MILLIS;
 
     private PartitionMark() {} // for Avro
 
-    public PartitionMark(String topic, int partition, long offset) {
+    public PartitionMark(String topic, int partition, long offset, long 
watermarkMillis) {
       this.topic = topic;
       this.partition = partition;
       this.nextOffset = offset;
+      this.watermarkMillis = watermarkMillis;
     }
 
     public String getTopic() {
@@ -95,12 +98,17 @@ public class KafkaCheckpointMark implements 
UnboundedSource.CheckpointMark {
       return nextOffset;
     }
 
+    public long getWatermarkMillis() {
+      return watermarkMillis;
+    }
+
     @Override
     public String toString() {
       return "PartitionMark{"
           + "topic='" + topic + '\''
           + ", partition=" + partition
           + ", nextOffset=" + nextOffset
+          + ", watermarkMillis=" + watermarkMillis
           + '}';
     }
   }
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index bd8ac64..f031003 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -113,8 +114,9 @@ import org.slf4j.LoggerFactory;
  *       // settings for ConsumerConfig. e.g :
  *       .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 
1024 * 1024))
  *
- *       // custom function for calculating record timestamp (default is 
processing time)
- *       .withTimestampFn(new MyTimestampFunction())
+ *       // set event times and watermark based on LogAppendTime. To provide a 
custom
+ *       // policy see withTimestampPolicyFactory(). withProcessingTime() is 
the default.
+ *       .withLogAppendTime()
  *
  *       // custom function for watermark (default is record timestamp)
  *       .withWatermarkFn(new MyWatermarkFunction())
@@ -122,7 +124,10 @@ import org.slf4j.LoggerFactory;
  *       // restrict reader to committed messages on Kafka (see method 
documentation).
  *       .withReadCommitted()
  *
- *       // finally, if you don't need Kafka metadata, you can drop it
+ *       // offset consumed by the pipeline can be committed back.
+ *       .commitOffsetsInFinalize()
+ *
+ *       // finally, if you don't need Kafka metadata, you can drop it.g
  *       .withoutMetadata() // PCollection<KV<Long, String>>
  *    )
  *    .apply(Values.<String>create()) // PCollection<String>
@@ -246,6 +251,7 @@ public class KafkaIO {
         .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
         .setMaxNumRecords(Long.MAX_VALUE)
         .setCommitOffsetsInFinalizeEnabled(false)
+        .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
         .build();
   }
 
@@ -281,7 +287,6 @@ public class KafkaIO {
     @Nullable abstract Class<? extends Deserializer<V>> getValueDeserializer();
     abstract SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>
         getConsumerFactoryFn();
-    @Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant> 
getTimestampFn();
     @Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant> 
getWatermarkFn();
 
     abstract long getMaxNumRecords();
@@ -290,6 +295,8 @@ public class KafkaIO {
     @Nullable abstract Instant getStartReadTime();
     abstract boolean isCommitOffsetsInFinalizeEnabled();
 
+    abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+
     abstract Builder<K, V> toBuilder();
 
     @AutoValue.Builder
@@ -304,12 +311,13 @@ public class KafkaIO {
           Class<? extends Deserializer<V>> valueDeserializer);
       abstract Builder<K, V> setConsumerFactoryFn(
           SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn);
-      abstract Builder<K, V> 
setTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
       abstract Builder<K, V> 
setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
       abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);
       abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);
       abstract Builder<K, V> setStartReadTime(Instant startReadTime);
       abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean 
commitOffsetInFinalize);
+      abstract Builder<K, V> setTimestampPolicyFactory(
+        TimestampPolicyFactory<K, V> timestampPolicyFactory);
 
       abstract Read<K, V> build();
     }
@@ -461,18 +469,69 @@ public class KafkaIO {
     }
 
     /**
+     * Sets {@link TimestampPolicy} to {@link 
TimestampPolicyFactory.LogAppendTimePolicy}.
+     * The policy assigns Kafka's log append time (server side ingestion time)
+     * to each record. The watermark for each Kafka partition is the timestamp 
of the last record
+     * read. If a partition is idle, the watermark advances to couple of 
seconds behind wall time.
+     * Every record consumed from Kafka is expected to have its timestamp type 
set to
+     * 'LOG_APPEND_TIME'.
+     *
+     * <p>In Kafka, log append time needs to be enabled for each topic, and 
all the subsequent
+     * records wil have their timestamp set to log append time. If a record 
does not have its
+     * timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp 
is set to previous
+     * record timestamp or latest watermark, whichever is larger.
+     *
+     * <p>The watermark for the entire source is the oldest of each 
partition's watermark.
+     * If one of the readers falls behind possibly due to uneven distribution 
of records among
+     * Kafka partitions, it ends up holding the watermark for the entire 
source.
+     */
+    public Read<K, V> withLogAppendTime(){
+      return 
withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
+    }
+
+
+    /**
+     * Sets {@link TimestampPolicy} to {@link 
TimestampPolicyFactory.ProcessingTimePolicy}.
+     * This is the default timestamp policy. It assigns processing time to 
each record.
+     * Specifically, this is the timestamp when the record becomes 'current' 
in the reader.
+     * The watermark aways advances to current time. If servicer side time 
(log append time) is
+     * enabled in Kafka, {@link #withLogAppendTime()} is recommended over this.
+     */
+    public Read<K, V> withProcessingTime() {
+      return 
withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+    }
+
+    /**
+     * Provide custom {@link TimestampPolicyFactory} to set event times and 
watermark for each
+     * partition. {@link 
TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}
+     * is invoked for each partition when the reader starts.
+     * @see #withLogAppendTime() and {@link #withProcessingTime()}
+     */
+    public Read<K, V> withTimestampPolicyFactory(
+      TimestampPolicyFactory<K, V> timestampPolicyFactory) {
+      return 
toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
+    }
+
+    /**
      * A function to assign a timestamp to a record. Default is processing 
timestamp.
+     * @deprecated as of version 2.4.
+     * Use {@link #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
      */
+    @Deprecated
     public Read<K, V> withTimestampFn2(
         SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
       checkArgument(timestampFn != null, "timestampFn can not be null");
-      return toBuilder().setTimestampFn(timestampFn).build();
+      return toBuilder().setTimestampPolicyFactory(
+        TimestampPolicyFactory.withTimestampFn(timestampFn)).build();
     }
 
     /**
-     * A function to calculate watermark after a record. Default is last 
record timestamp
+     * A function to calculate watermark after a record. Default is last 
record timestamp.
      * @see #withTimestampFn(SerializableFunction)
+     * @deprecated as of version 2.4.
+     * Use {@link #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
      */
+    @Deprecated
     public Read<K, V> withWatermarkFn2(
         SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
       checkArgument(watermarkFn != null, "watermarkFn can not be null");
@@ -481,16 +540,22 @@ public class KafkaIO {
 
     /**
      * A function to assign a timestamp to a record. Default is processing 
timestamp.
+     * @deprecated as of version 2.4.
+     * Use {@link #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
      */
+    @Deprecated
     public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> 
timestampFn) {
       checkArgument(timestampFn != null, "timestampFn can not be null");
       return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
     }
 
     /**
-     * A function to calculate watermark after a record. Default is last 
record timestamp
+     * A function to calculate watermark after a record. Default is last 
record timestamp.
      * @see #withTimestampFn(SerializableFunction)
+     * @deprecated as of version 2.4.
+     * Use {@link #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
      */
+    @Deprecated
     public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> 
watermarkFn) {
       checkArgument(watermarkFn != null, "watermarkFn can not be null");
       return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
@@ -608,7 +673,7 @@ public class KafkaIO {
       return new KafkaUnboundedSource<>(this, -1);
     }
 
-    // utility method to convert KafkRecord<K, V> to user KV<K, V> before 
applying user functions
+    // utility method to convert KafkaRecord<K, V> to user KV<K, V> before 
applying user functions
     private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, 
ValueT>, OutT>
     unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
       return record -> fn.apply(record.getKV());
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
index 235fb1f..f4b1f1b 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
@@ -26,21 +26,26 @@ import org.apache.beam.sdk.values.KV;
  * partition id, and offset).
  */
 public class KafkaRecord<K, V> implements Serializable {
+  // This is based on {@link ConsumerRecord} received from Kafka Consumer.
+  // The primary difference is that this contains deserialized key and value, 
and runtime
+  // Kafka version agnostic (e.g. Kafka version 0.9.x does not have timestamp 
fields).
 
   private final String topic;
   private final int partition;
   private final long offset;
   private final KV<K, V> kv;
   private final long timestamp;
+  private final KafkaTimestampType timestampType;
 
   public KafkaRecord(
       String topic,
       int partition,
       long offset,
       long timestamp,
+      KafkaTimestampType timestampType,
       K key,
       V value) {
-    this(topic, partition, offset, timestamp, KV.of(key, value));
+    this(topic, partition, offset, timestamp, timestampType, KV.of(key, 
value));
   }
 
   public KafkaRecord(
@@ -48,14 +53,17 @@ public class KafkaRecord<K, V> implements Serializable {
       int partition,
       long offset,
       long timestamp,
+      KafkaTimestampType timestampType,
       KV<K, V> kv) {
     this.topic = topic;
     this.partition = partition;
     this.offset = offset;
     this.timestamp = timestamp;
+    this.timestampType = timestampType;
     this.kv = kv;
   }
 
+
   public String getTopic() {
     return topic;
   }
@@ -76,6 +84,10 @@ public class KafkaRecord<K, V> implements Serializable {
     return timestamp;
   }
 
+  public KafkaTimestampType getTimestampType() {
+    return timestampType;
+  }
+
   @Override
   public int hashCode() {
     return Arrays.deepHashCode(new Object[]{topic, partition, offset, 
timestamp, kv});
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 577fdee..cb3b953 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -54,6 +54,7 @@ public class KafkaRecordCoder<K, V> extends 
StructuredCoder<KafkaRecord<K, V>> {
     intCoder.encode(value.getPartition(), outStream);
     longCoder.encode(value.getOffset(), outStream);
     longCoder.encode(value.getTimestamp(), outStream);
+    intCoder.encode(value.getTimestampType().ordinal(), outStream);
     kvCoder.encode(value.getKV(), outStream);
   }
 
@@ -64,6 +65,7 @@ public class KafkaRecordCoder<K, V> extends 
StructuredCoder<KafkaRecord<K, V>> {
         intCoder.decode(inStream),
         longCoder.decode(inStream),
         longCoder.decode(inStream),
+        KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
         kvCoder.decode(inStream));
   }
 
@@ -94,6 +96,7 @@ public class KafkaRecordCoder<K, V> extends 
StructuredCoder<KafkaRecord<K, V>> {
           value.getPartition(),
           value.getOffset(),
           value.getTimestamp(),
+          value.getTimestampType(),
           (KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
     }
   }
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaTimestampType.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaTimestampType.java
new file mode 100644
index 0000000..e4b9346
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaTimestampType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF E 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+/**
+ * This is a copy of Kafka's {@link 
org.apache.kafka.common.record.TimestampType}. Included
+ * here in order to support older Kafka versions (0.9.x).
+ */
+public enum KafkaTimestampType {
+  NO_TIMESTAMP_TYPE(-1, "NoTimestampType"),
+  CREATE_TIME(0, "CreateTime"),
+  LOG_APPEND_TIME(1, "LogAppendTime");
+
+  public final int id;
+  public final String name;
+
+  KafkaTimestampType(int id, String name) {
+    this.id = id;
+    this.name = name;
+  }
+
+  public static KafkaTimestampType forOrdinal(int ordinal) {
+    return values()[ordinal];
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index e830b4c..533c8b3 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -2,8 +2,7 @@
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
+ * regarding copyright ownership.  The ASF E 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
@@ -24,13 +23,16 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.io.Closeables;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -135,10 +137,25 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
     Map<String, Object> offsetConsumerConfig = new 
HashMap<>(spec.getConsumerConfig());
     offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
     offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    // Force read isolation level to 'read_uncommitted' for offset consumer. 
This consumer
+    // fetches latest offset for two reasons : (a) to calculate backlog 
(number of records
+    // yet to be consumed) (b) to advance watermark if the backlog is zero. 
The right thing to do
+    // for (a) is to leave this config unchanged from the main config (i.e. if 
there are records
+    // that can't be read because of uncommitted records before them, they 
shouldn't
+    // ideally count towards backlog when "read_committed" is enabled. But (b)
+    // requires finding out if there are any records left to be read 
(committed or uncommitted).
+    // Rather than using two separate consumers we will go with better support 
for (b). If we do
+    // hit a case where a lot of records are not readable (due to some stuck 
transactions), the
+    // pipeline would report more backlog, but would not be able to consume 
it. It might be ok
+    // since CPU consumed on the workers would be low and will likely avoid 
unnecessary upscale.
+    offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
 
     offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
     consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions());
 
+    // Fetch offsets once before running periodically.
+    updateLatestOffsets();
+
     offsetFetcherThread.scheduleAtFixedRate(
         this::updateLatestOffsets, 0, OFFSET_UPDATE_INTERVAL_SECONDS, 
TimeUnit.SECONDS);
 
@@ -156,10 +173,7 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
      */
     while (true) {
       if (curBatch.hasNext()) {
-        PartitionState pState = curBatch.next();
-
-        elementsRead.inc();
-        elementsReadBySplit.inc();
+        PartitionState<K, V> pState = curBatch.next();
 
         if (!pState.recordIter.hasNext()) { // -- (c)
           pState.recordIter = Collections.emptyIterator(); // drop ref
@@ -167,6 +181,9 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
           continue;
         }
 
+        elementsRead.inc();
+        elementsReadBySplit.inc();
+
         ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
         long expected = pState.nextOffset;
         long offset = rawRecord.offset();
@@ -194,11 +211,12 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
             rawRecord.partition(),
             rawRecord.offset(),
             consumerSpEL.getRecordTimestamp(rawRecord),
+            consumerSpEL.getRecordTimestamptType(rawRecord),
             keyDeserializerInstance.deserialize(rawRecord.topic(), 
rawRecord.key()),
             valueDeserializerInstance.deserialize(rawRecord.topic(), 
rawRecord.value()));
 
-        curTimestamp = (source.getSpec().getTimestampFn() == null)
-            ? Instant.now() : source.getSpec().getTimestampFn().apply(record);
+        curTimestamp = pState.timestampPolicy
+          .getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
         curRecord = record;
 
         int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length)
@@ -220,46 +238,56 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
 
   @Override
   public Instant getWatermark() {
-    if (curRecord == null) {
-      LOG.debug("{}: getWatermark() : no records have been read yet.", name);
-      return initialWatermark;
+
+    if (source.getSpec().getWatermarkFn() != null) {
+      // Support old API which requires a KafkaRecord to invoke watermarkFn.
+      if (curRecord == null) {
+        LOG.debug("{}: getWatermark() : no records have been read yet.", name);
+        return initialWatermark;
+      }
+      return source.getSpec().getWatermarkFn().apply(curRecord);
     }
 
-    return source.getSpec().getWatermarkFn() != null
-        ? source.getSpec().getWatermarkFn().apply(curRecord) : curTimestamp;
+    // Return minimum watermark among partitions.
+    return partitionStates
+      .stream()
+      .map(PartitionState::updateAndGetWatermark)
+      .min(Comparator.naturalOrder())
+      .get();
   }
 
   @Override
   public CheckpointMark getCheckpointMark() {
     reportBacklog();
     return new KafkaCheckpointMark(
-            partitionStates.stream()
-                    .map((p) -> new PartitionMark(p.topicPartition.topic(),
-                            p.topicPartition.partition(),
-                            p.nextOffset))
-                    .collect(Collectors.toList()),
-            source.getSpec().isCommitOffsetsInFinalizeEnabled() ? this : null
+      partitionStates.stream()
+        .map(p -> new PartitionMark(p.topicPartition.topic(),
+                                    p.topicPartition.partition(),
+                                    p.nextOffset,
+                                    p.lastWatermark.getMillis()))
+        .collect(Collectors.toList()),
+      source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this) 
: Optional.empty()
     );
   }
 
   @Override
-  public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
+  public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource () {
     return source;
   }
 
   @Override
-  public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
+  public KafkaRecord<K, V> getCurrent () throws NoSuchElementException {
     // should we delay updating consumed offset till this point? Mostly not 
required.
     return curRecord;
   }
 
   @Override
-  public Instant getCurrentTimestamp() throws NoSuchElementException {
+  public Instant getCurrentTimestamp () throws NoSuchElementException {
     return curTimestamp;
   }
 
   @Override
-  public long getSplitBacklogBytes() {
+  public long getSplitBacklogBytes () {
     long backlogBytes = 0;
 
     for (PartitionState p : partitionStates) {
@@ -287,10 +315,10 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
   private final KafkaUnboundedSource<K, V> source;
   private final String name;
   private Consumer<byte[], byte[]> consumer;
-  private final List<PartitionState> partitionStates;
+  private final List<PartitionState<K, V>> partitionStates;
   private KafkaRecord<K, V> curRecord;
   private Instant curTimestamp;
-  private Iterator<PartitionState> curBatch = Collections.emptyIterator();
+  private Iterator<PartitionState<K, V>> curBatch = 
Collections.emptyIterator();
 
   private Deserializer<K> keyDeserializerInstance = null;
   private Deserializer<V> valueDeserializerInstance = null;
@@ -339,9 +367,9 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
   private Consumer<byte[], byte[]> offsetConsumer;
   private final ScheduledExecutorService offsetFetcherThread =
       Executors.newSingleThreadScheduledExecutor();
-  private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
+  private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1;
 
-  private static final long UNINITIALIZED_OFFSET = -1;
+  static final long UNINITIALIZED_OFFSET = -1;
 
   //Add SpEL instance to cover the interface difference of Kafka client
   private transient ConsumerSpEL consumerSpEL;
@@ -370,20 +398,50 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
     }
   }
 
+  private static class TimestampPolicyContext extends 
TimestampPolicy.PartitionContext {
+
+    private final long messageBacklog;
+    private final Instant backlogCheckTime;
+
+    TimestampPolicyContext(long messageBacklog, Instant backlogCheckTime) {
+      this.messageBacklog = messageBacklog;
+      this.backlogCheckTime = backlogCheckTime;
+    }
+
+    @Override
+    public long getMessageBacklog() {
+      return messageBacklog;
+    }
+
+    @Override
+    public Instant getBacklogCheckTime() {
+      return backlogCheckTime;
+    }
+  }
+
   // maintains state of each assigned partition (buffered records, consumed 
offset, etc)
-  private static class PartitionState {
+  private static class PartitionState<K, V> {
     private final TopicPartition topicPartition;
     private long nextOffset;
     private long latestOffset;
+    private Instant latestOffsetFetchTime;
+    private Instant lastWatermark; // As returned by timestampPolicy
+    private final TimestampPolicy<K, V> timestampPolicy;
+
     private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = 
Collections.emptyIterator();
 
     private MovingAvg avgRecordSize = new MovingAvg();
     private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log 
compaction is enabled.
 
-    PartitionState(TopicPartition partition, long nextOffset) {
+
+    PartitionState(TopicPartition partition, long nextOffset,
+                   TimestampPolicy<K, V> timestampPolicy) {
       this.topicPartition = partition;
       this.nextOffset = nextOffset;
       this.latestOffset = UNINITIALIZED_OFFSET;
+      this.latestOffsetFetchTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      this.lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      this.timestampPolicy = timestampPolicy;
     }
 
     // Update consumedOffset, avgRecordSize, and avgOffsetGap
@@ -395,8 +453,11 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
       avgOffsetGap.update(offsetGap);
     }
 
-    synchronized void setLatestOffset(long latestOffset) {
+    synchronized void setLatestOffset(long latestOffset, Instant fetchTime) {
       this.latestOffset = latestOffset;
+      this.latestOffsetFetchTime = fetchTime;
+      LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg 
record size {})",
+                this, topicPartition, latestOffset, nextOffset, avgRecordSize);
     }
 
     synchronized long approxBacklogInBytes() {
@@ -415,6 +476,15 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
       double remaining = (latestOffset - nextOffset) / (1 + 
avgOffsetGap.get());
       return Math.max(0, (long) Math.ceil(remaining));
     }
+
+    synchronized TimestampPolicyContext mkTimestampPolicyContext() {
+      return new TimestampPolicyContext(backlogMessageCount(), 
latestOffsetFetchTime);
+    }
+
+    Instant updateAndGetWatermark() {
+      lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext());
+      return lastWatermark;
+    }
   }
 
   KafkaUnboundedReader(
@@ -425,35 +495,40 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
     this.name = "Reader-" + source.getId();
 
     List<TopicPartition> partitions = source.getSpec().getTopicPartitions();
-    partitionStates =
-        ImmutableList.copyOf(
-            partitions
-                .stream()
-                .map(tp -> new PartitionState(tp, UNINITIALIZED_OFFSET))
-                .collect(Collectors.toList()));
+    List<PartitionState<K, V>> states = new ArrayList<>(partitions.size());
 
     if (checkpointMark != null) {
-      // a) verify that assigned and check-pointed partitions match exactly
-      // b) set consumed offsets
-
       checkState(checkpointMark.getPartitions().size() == partitions.size(),
-          "checkPointMark and assignedPartitions should match");
+                 "checkPointMark and assignedPartitions should match");
+    }
+
+    for (int i = 0; i < partitions.size(); i++) {
+      TopicPartition tp = partitions.get(i);
+      long nextOffset = UNINITIALIZED_OFFSET;
+      Optional<Instant> prevWatermark = Optional.empty();
+
+      if (checkpointMark != null) {
+        // Verify that assigned and check-pointed partitions match exactly and 
set next offset.
 
-      for (int i = 0; i < partitions.size(); i++) {
         PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
-        TopicPartition assigned = partitions.get(i);
+
         TopicPartition partition = new TopicPartition(ckptMark.getTopic(),
                                                       ckptMark.getPartition());
-        checkState(partition.equals(assigned),
+        checkState(partition.equals(tp),
                    "checkpointed partition %s and assigned partition %s don't 
match",
-                   partition, assigned);
-
-        partitionStates.get(i).nextOffset = ckptMark.getNextOffset();
+                   partition, tp);
+        nextOffset = ckptMark.getNextOffset();
+        prevWatermark = Optional.of(new 
Instant(ckptMark.getWatermarkMillis()));
       }
+
+      states.add(new PartitionState<>(
+          tp, nextOffset,
+          
source.getSpec().getTimestampPolicyFactory().createTimestampPolicy(tp, 
prevWatermark)));
     }
 
-    String splitId = String.valueOf(source.getId());
+    partitionStates = ImmutableList.copyOf(states);
 
+    String splitId = String.valueOf(source.getId());
     elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId);
     bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId);
     backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId);
@@ -489,6 +564,8 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
   }
 
   private void commitCheckpointMark(KafkaCheckpointMark checkpointMark) {
+    LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
+
     consumer.commitSync(
       checkpointMark
         .getPartitions()
@@ -533,17 +610,10 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
       return;
     }
 
-    List<PartitionState> nonEmpty = new LinkedList<>();
-
-    for (PartitionState p : partitionStates) {
-      p.recordIter = records.records(p.topicPartition).iterator();
-      if (p.recordIter.hasNext()) {
-        nonEmpty.add(p);
-      }
-    }
+    partitionStates.forEach(p -> p.recordIter = 
records.records(p.topicPartition).iterator());
 
     // cycle through the partitions in order to interleave records from each.
-    curBatch = Iterators.cycle(nonEmpty);
+    curBatch = Iterators.cycle(new LinkedList<>(partitionStates));
   }
 
   private void setupInitialOffset(PartitionState pState) {
@@ -567,29 +637,23 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
     }
   }
 
-  // update latest offset for each partition.
-  // called from offsetFetcher thread
+  // Update latest offset for each partition.
+  // Called from setupInitialOffset() at the start and then periodically from 
offsetFetcher thread.
   private void updateLatestOffsets() {
     for (PartitionState p : partitionStates) {
       try {
-        // If "read_committed" is enabled in the config, this seeks to 'Last 
Stable Offset'.
-        // As a result uncommitted messages are not counted in backlog. It is 
correct since
-        // the reader can not read them anyway.
+        Instant fetchTime = Instant.now();
         consumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition);
         long offset = offsetConsumer.position(p.topicPartition);
-        p.setLatestOffset(offset);
+        p.setLatestOffset(offset, fetchTime);
       } catch (Exception e) {
-        // An exception is expected if we've closed the reader in another 
thread. Ignore and exit.
-        if (closed.get()) {
+        if (closed.get()) { // Ignore the exception if the reader is closed.
           break;
         }
         LOG.warn("{}: exception while fetching latest offset for partition {}. 
will be retried.",
             this, p.topicPartition, e);
-        p.setLatestOffset(UNINITIALIZED_OFFSET); // reset
+        // Don't update the latest offset.
       }
-
-      LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg 
record size {})",
-          this, p.topicPartition, p.latestOffset, p.nextOffset, 
p.avgRecordSize);
     }
 
     LOG.debug("{}:  backlog {}", this, getSplitBacklogBytes());
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java
new file mode 100644
index 0000000..24a635f
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.joda.time.Instant;
+
+/**
+ * A timestamp policy to assign event time for messages in a Kafka partition 
and watermark for it.
+ * KafkaIO reader creates one policy using {@link TimestampPolicyFactory} for 
each each of the
+ * partitions it reads from. See @{@link 
TimestampPolicyFactory.LogAppendTimePolicy} for example
+ * of a policy.
+ */
+public abstract class TimestampPolicy<K, V> {
+
+  /**
+   * The context contains state maintained in the reader for the partition. 
Available with
+   * each of the methods in @{@link TimestampPolicy}.
+   */
+  public abstract static class PartitionContext {
+    /**
+     * Current backlog in messages
+     * (latest offset of the partition - last processed record offset).
+     */
+    public abstract long getMessageBacklog();
+
+    /**
+     * The time at which latest offset for the partition was fetched in order 
to calculate
+     * backlog. The reader periodically polls for latest offsets. This 
timestamp
+     * is useful in advancing watermark for idle partitions as in
+     * {@link TimestampPolicyFactory.LogAppendTimePolicy}.
+     */
+    public abstract Instant getBacklogCheckTime();
+  }
+
+  /**
+   * Returns record timestamp (aka event time). This is often based on the 
timestamp
+   * of the Kafka record. This is invoked for each record when it is processed 
in the reader.
+   */
+  public abstract Instant getTimestampForRecord(PartitionContext ctx, 
KafkaRecord<K, V> record);
+
+  /**
+   * Returns watermark for the partition. It is the timestamp before or at the 
timestamps of all
+   * future records consumed from the partition.
+   * See {@link UnboundedSource.UnboundedReader#getWatermark()} for more 
guidance on watermarks.
+   * E.g. if the record timestamp is 'LogAppendTime', watermark would be the 
timestamp of the last
+   * record since 'LogAppendTime' monotonically increases within a partition.
+   */
+  public abstract Instant getWatermark(PartitionContext ctx);
+
+  // It is useful to let TimestampPolicy store its state in checkpointMark.
+  // We need to add a getCheckpointMark() here for that.
+}
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
new file mode 100644
index 0000000..d84bfe8
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF E 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.io.Serializable;
+import java.util.Optional;
+import org.apache.beam.sdk.io.kafka.TimestampPolicy.PartitionContext;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * An extendable factory to create a {@link TimestampPolicy} for each 
partition at runtime by
+ * KafkaIO reader. Subclasses implement {@link #createTimestampPolicy}, which 
is invoked by
+ * the the reader while starting or resuming from a checkpoint. Two commonly 
used policies are
+ * provided. See {@link #withLogAppendTime()} and {@link 
#withProcessingTime()}.
+ */
+public abstract class TimestampPolicyFactory<KeyT, ValueT> implements 
Serializable {
+
+  /**
+   * Creates a TimestampPolicy for a partition. This is invoked by the reader 
at the start or while
+   * resuming from previous checkpoint.
+   *
+   * @param tp The returned policy applies to records from this {@link 
TopicPartition}.
+   * @param previousWatermark The latest check-pointed watermark. This is set 
when the reader
+   *           is resuming from a checkpoint. This is a good value to return 
by implementations
+   *           of {@link TimestampPolicy#getWatermark(PartitionContext)} until 
a better watermark
+   *           can be established as more records are read.
+   * @return
+   */
+  public abstract TimestampPolicy<KeyT, ValueT> createTimestampPolicy(
+    TopicPartition tp, Optional<Instant> previousWatermark);
+
+  /**
+   * A {@link TimestampPolicy} that assigns processing time to each record.
+   * Specifically, this is the timestamp when the record becomes 'current' in 
the reader.
+   * The watermark aways advances to current time.
+   */
+  public static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {
+    return new TimestampPolicyFactory<K, V>() {
+      @Override
+      public TimestampPolicy<K, V>
+      createTimestampPolicy(TopicPartition tp, Optional<Instant> 
previousWatermark) {
+        return new ProcessingTimePolicy<>();
+      }
+    };
+  }
+
+  /**
+   * A {@link TimestampPolicy} that assigns Kafka's log append time (server 
side ingestion time)
+   * to each record. The watermark for each Kafka partition is the timestamp 
of the last record
+   * read. If a partition is idle, the watermark advances roughly to 'current 
time - 2 seconds'.
+   * See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
+   */
+  public static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {
+    //return (tp, previousWatermark) -> new 
LogAppendTimePolicy<>(previousWatermark);
+    return new TimestampPolicyFactory<K, V>() {
+      @Override
+      public TimestampPolicy<K, V>
+      createTimestampPolicy(TopicPartition tp, Optional<Instant> 
previousWatermark) {
+        return new LogAppendTimePolicy<>(previousWatermark);
+      }
+    };
+  }
+
+  /*
+   * TODO
+   * Provide a another built in implementation where the watermark is based on 
all the timestamps
+   * seen in last 1 minute of wall clock time (this duration could be 
configurable). This is
+   * similar to watermark set by PubsubIO.
+   *
+   * public static <K, V> TimestampPolicyFactory<K, V> withCreateTime() {
+   *   return withCustomTypestamp(...);
+   * }
+   *
+   * public static <K, V> TimestampPolicyFactory<K, V> withCustomTimestamp() {
+   * }
+   */
+
+  /**
+   * Used by the Read transform to support old timestamp functions API.
+   */
+  static <K, V> TimestampPolicyFactory<K, V> withTimestampFn(
+    final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
+
+    return new TimestampPolicyFactory<K, V>() {
+      @Override
+      public TimestampPolicy<K, V> createTimestampPolicy(TopicPartition tp,
+                                                         Optional<Instant> 
previousWatermark) {
+        return new TimestampFnPolicy<>(timestampFn, previousWatermark);
+      }
+    };
+  }
+
+  /**
+   * A simple policy that uses current time for event time and watermark. This 
should be used
+   * when better timestamps like LogAppendTime are not available for a topic.
+   */
+  public static class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> 
{
+
+    @Override
+    public Instant getTimestampForRecord(PartitionContext context, 
KafkaRecord<K, V> record) {
+      return Instant.now();
+    }
+
+    @Override
+    public Instant getWatermark(PartitionContext context) {
+      return Instant.now();
+    }
+  }
+
+  /**
+   * Assigns Kafka's log append time (server side ingestion time)
+   * to each record. The watermark for each Kafka partition is the timestamp 
of the last record
+   * read. If a partition is idle, the watermark advances roughly to 'current 
time - 2 seconds'.
+   * See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
+   */
+  public static class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> {
+
+    /**
+     * When a partition is idle or caught up (i.e. backlog is zero), we 
advance the watermark
+     * to near realtime. Kafka server does not have an API to provide server 
side current
+     * timestamp which could ensure minimum LogAppendTime for future records.
+     * The best we could do is to advance the watermark to
+     * 'last backlog check time - small delta to account for any internal 
buffering in Kafka'.
+     * Using 2 seconds for this delta. Should this be user configurable?
+     */
+    private static final Duration IDLE_WATERMARK_DELTA = 
Duration.standardSeconds(2);
+
+    protected Instant currentWatermark;
+
+    public LogAppendTimePolicy(Optional<Instant> previousWatermark) {
+      currentWatermark = 
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    @Override
+    public Instant getTimestampForRecord(PartitionContext context, 
KafkaRecord<K, V> record) {
+      if 
(record.getTimestampType().equals(KafkaTimestampType.LOG_APPEND_TIME)) {
+        currentWatermark = new Instant(record.getTimestamp());
+      } else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+        // This is the first record and it does not have LOG_APPEND_TIME.
+        // Most likely the topic is not configured correctly.
+        throw new IllegalStateException(String.format(
+          "LogAppendTimePolicy policy is enabled in reader, but Kafka record's 
timestamp type "
+          + "is LogAppendTime. Most likely it is not enabled on Kafka for the 
topic '%s'. "
+          + "Actual timestamp type is '%s'.", record.getTopic(), 
record.getTimestampType()));
+      }
+      return currentWatermark;
+    }
+
+    @Override
+    public Instant getWatermark(PartitionContext context) {
+      if (context.getMessageBacklog() == 0) {
+        // The reader is caught up. May need to advance the watermark.
+        Instant idleWatermark = 
context.getBacklogCheckTime().minus(IDLE_WATERMARK_DELTA);
+        if (idleWatermark.isAfter(currentWatermark)) {
+          currentWatermark = idleWatermark;
+        }
+      } // else, there is backlog (or is unknown). Do not advance the 
watermark.
+      return currentWatermark;
+    }
+  }
+
+  /**
+   * Internal policy to support deprecated withTimestampFn API. It returns 
last record
+   * timestamp for watermark!.
+   */
+  private static class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> {
+
+    final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+    Instant lastRecordTimestamp;
+
+    TimestampFnPolicy(SerializableFunction<KafkaRecord<K, V>, Instant> 
timestampFn,
+                      Optional<Instant> previousWatermark) {
+      this.timestampFn = timestampFn;
+      lastRecordTimestamp = 
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    @Override
+    public Instant getTimestampForRecord(PartitionContext context, 
KafkaRecord<K, V> record) {
+      lastRecordTimestamp = timestampFn.apply(record);
+      return lastRecordTimestamp;
+    }
+
+    @Override
+    public Instant getWatermark(PartitionContext context) {
+      return lastRecordTimestamp;
+    }
+  }
+}
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index ebdd1da..adeebe6 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -41,6 +41,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -77,16 +78,21 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -99,6 +105,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -110,6 +117,7 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Utils;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.hamcrest.collection.IsIterableWithSize;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -142,6 +150,8 @@ public class KafkaIOTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
+  private static final Instant LOG_APPEND_START_TIME = new Instant(600 * 1000);
+
   // Update mock consumer with records distributed among the given topics, 
each with given number
   // of partitions. Records are assigned in round-robin order among the 
partitions.
   private static MockConsumer<byte[], byte[]> mkMockConsumer(
@@ -166,17 +176,22 @@ public class KafkaIOTest {
     int numPartitions = partitions.size();
     final long[] offsets = new long[numPartitions];
 
+
     for (int i = 0; i < numElements; i++) {
       int pIdx = i % numPartitions;
       TopicPartition tp = partitions.get(pIdx);
 
+      byte[] key = ByteBuffer.wrap(new byte[4]).putInt(i).array();    // key 
is 4 byte record id
+      byte[] value =  ByteBuffer.wrap(new byte[8]).putLong(i).array(); // 
value is 8 byte record id
+
       records.get(tp).add(
           new ConsumerRecord<>(
               tp.topic(),
               tp.partition(),
               offsets[pIdx]++,
-              ByteBuffer.wrap(new byte[4]).putInt(i).array(),    // key is 4 
byte record id
-              ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 
byte record id
+              
LOG_APPEND_START_TIME.plus(Duration.standardSeconds(i)).getMillis(),
+              TimestampType.LOG_APPEND_TIME,
+              0, key.length, value.length, key, value));
     }
 
     // This is updated when reader assigns partitions.
@@ -250,10 +265,10 @@ public class KafkaIOTest {
     private final int numElements;
     private final OffsetResetStrategy offsetResetStrategy;
 
-    public ConsumerFactoryFn(List<String> topics,
-                             int partitionsPerTopic,
-                             int numElements,
-                             OffsetResetStrategy offsetResetStrategy) {
+    ConsumerFactoryFn(List<String> topics,
+                      int partitionsPerTopic,
+                      int numElements,
+                      OffsetResetStrategy offsetResetStrategy) {
       this.topics = topics;
       this.partitionsPerTopic = partitionsPerTopic;
       this.numElements = numElements;
@@ -302,7 +317,7 @@ public class KafkaIOTest {
   private static class AssertMultipleOf implements 
SerializableFunction<Iterable<Long>, Void> {
     private final int num;
 
-    public AssertMultipleOf(int num) {
+    AssertMultipleOf(int num) {
       this.num = num;
     }
 
@@ -456,9 +471,113 @@ public class KafkaIOTest {
     p.run();
   }
 
+  @Test
+  public void testUnboundedSourceLogAppendTimestamps() {
+    // LogAppendTime (server side timestamp) for records is set based on 
record index
+    // in MockConsumer above. Ensure that those exact timestamps are set by 
the source.
+    int numElements = 1000;
+
+    PCollection<Long> input =
+      p.apply(mkKafkaReadTransform(numElements, null)
+                .withLogAppendTime()
+                .withoutMetadata())
+        .apply(Values.create());
+
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs =
+      input
+        .apply(MapElements.into(TypeDescriptors.longs()).via(t ->
+          LOG_APPEND_START_TIME.plus(Duration.standardSeconds(t)).getMillis()))
+        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+        .apply("DistinctTimestamps", Distinct.create());
+
+    // This assert also confirms that diff only has one unique value.
+    PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
+  // Returns TIMESTAMP_MAX_VALUE for watermark when all the records are read 
from a partition.
+  static class TimestampPolicyWithEndOfSource<K, V> extends 
TimestampPolicyFactory<K, V> {
+    private final long maxOffset;
+
+    TimestampPolicyWithEndOfSource(long maxOffset) {
+      this.maxOffset = maxOffset;
+    }
+
+    @Override
+    public TimestampPolicy<K, V> createTimestampPolicy(
+      TopicPartition tp, Optional<Instant> previousWatermark) {
+      return new TimestampPolicy<K, V>() {
+        long lastOffset = 0;
+        Instant lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+        @Override
+        public Instant getTimestampForRecord(PartitionContext ctx, 
KafkaRecord<K, V> record) {
+          lastOffset = record.getOffset();
+          lastTimestamp = new Instant(record.getTimestamp());
+          return lastTimestamp;
+        }
+
+        @Override
+        public Instant getWatermark(PartitionContext ctx) {
+          if (lastOffset < maxOffset) {
+            return lastTimestamp;
+          } else { // EOF
+            return BoundedWindow.TIMESTAMP_MAX_VALUE;
+          }
+        }
+      };
+    }
+  }
+
+  @Test
+  public void testUnboundedSourceWithoutBoundedWrapper() {
+    // Most of the tests in this file set 'maxNumRecords' on the source, which 
wraps
+    // the unbounded source in a bounded source. As a result, the test 
pipeline run as
+    // bounded/batch pipelines under direct-runner.
+    // This is same as testUnboundedSource() without the BoundedSource wrapper.
+
+    final int numElements = 1000;
+    final int numPartitions = 10;
+    String topic = "testUnboundedSourceWithoutBoundedWrapper";
+
+    KafkaIO.Read<byte[], Long> reader = KafkaIO.<byte[], Long>read()
+      .withBootstrapServers(topic)
+      .withTopic(topic)
+      .withConsumerFactoryFn(new ConsumerFactoryFn(
+        ImmutableList.of(topic), numPartitions, numElements, 
OffsetResetStrategy.EARLIEST))
+      .withKeyDeserializer(ByteArrayDeserializer.class)
+      .withValueDeserializer(LongDeserializer.class)
+      .withTimestampPolicyFactory(
+        new TimestampPolicyWithEndOfSource<>(numElements / numPartitions - 1));
+
+    PCollection <Long> input =
+      p.apply("readFromKafka", reader.withoutMetadata())
+        .apply(Values.create())
+        .apply(Window.into(FixedWindows.of(Duration.standardDays(100))));
+
+    PipelineResult result = p.run();
+
+    MetricName elementsRead = SourceMetrics.elementsRead().getName();
+
+    MetricQueryResults metrics = result.metrics().queryMetrics(
+      MetricsFilter.builder()
+        .addNameFilter(MetricNameFilter.inNamespace(elementsRead.namespace()))
+        .build());
+
+    assertThat(metrics.counters(), hasItem(
+      attemptedMetricsResult(
+        elementsRead.namespace(),
+        elementsRead.name(),
+        "readFromKafka",
+        Long.valueOf(numElements))));
+  }
+
   private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, 
V>, KV<K, V>> {
     @ProcessElement
-    public void processElement(ProcessContext ctx) throws Exception {
+    public void processElement(ProcessContext ctx) {
       ctx.output(ctx.element().getKV());
     }
   }
@@ -631,8 +750,7 @@ public class KafkaIOTest {
 
     p.apply(readStep,
         mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
-          .updateConsumerProperties(ImmutableMap.<String, 
Object>of(ConsumerConfig.GROUP_ID_CONFIG,
-                                                                    
"test.group"))
+          
.updateConsumerProperties(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
"test.group"))
           .commitOffsetsInFinalize()
           .withoutMetadata());
 
@@ -1070,7 +1188,6 @@ public class KafkaIOTest {
               
.addNameFilter(MetricNameFilter.inNamespace(elementsWritten.namespace()))
               .build());
 
-
       assertThat(metrics.counters(), hasItem(
           attemptedMetricsResult(
               elementsWritten.namespace(),
@@ -1182,11 +1299,13 @@ public class KafkaIOTest {
 
       // Make sure the config is correctly set up for serializers.
       Utils.newInstance(
-              ((Class<?>) 
config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
-                      .asSubclass(Serializer.class)
+        (Class<? extends Serializer<?>>)
+          ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+                .asSubclass(Serializer.class)
       ).configure(config, true);
 
       Utils.newInstance(
+        (Class<? extends Serializer<?>>)
           ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
               .asSubclass(Serializer.class)
       ).configure(config, false);

-- 
To stop receiving notification emails like this one, please contact
ming...@apache.org.

Reply via email to