[ 
https://issues.apache.org/jira/browse/BEAM-591?focusedWorklogId=86078&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86078
 ]

ASF GitHub Bot logged work on BEAM-591:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Mar/18 16:58
            Start Date: 30/Mar/18 16:58
    Worklog Time Spent: 10m 
      Work Description: XuMingmin closed pull request #4935: [BEAM-591] Support 
custom timestamps & CreateTime support
URL: https://github.com/apache/beam/pull/4935
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 42bb0e99bf3..c285d01ebc2 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -122,11 +122,19 @@
       <artifactId>hamcrest-core</artifactId>
       <scope>test</scope>
     </dependency>
+
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-library</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
new file mode 100644
index 00000000000..f2dbbe8d7ac
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A policy for custom record timestamps where timestamps within a partition 
are expected to be
+ * roughly monotonically increasing with a cap on out of order event delays 
(say 1 minute).
+ * The watermark at any time is '({@code Min(now(), Max(event timestamp so 
far)) - max delay})'.
+ * However, watermark is never set in future and capped to 'now - max delay'. 
In addition,
+ * watermark advanced to 'now - max delay' when a partition is idle.
+ */
+public class CustomTimestampPolicyWithLimitedDelay<K, V> extends 
TimestampPolicy<K, V> {
+
+  private final Duration maxDelay;
+  private final SerializableFunction<KafkaRecord<K, V>, Instant> 
timestampFunction;
+  private Instant maxEventTimestamp;
+
+  /**
+   * A policy for custom record timestamps where timestamps are expected to be 
roughly monotonically
+   * increasing with out of order event delays less than {@code maxDelay}. The 
watermark at any
+   * time is {@code Min(now(), max_event_timestamp) - maxDelay}.
+   * @param timestampFunction A function to extract timestamp from the record
+   * @param maxDelay For any record in the Kafka partition, the timestamp of 
any subsequent
+   *                 record is expected to be after {@code current record 
timestamp - maxDelay}.
+   * @param previousWatermark Latest check-pointed watermark, see
+   *                {@link 
TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}
+   */
+  public CustomTimestampPolicyWithLimitedDelay(
+    SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction,
+    Duration maxDelay,
+    Optional<Instant> previousWatermark) {
+    this.maxDelay = maxDelay;
+    this.timestampFunction = timestampFunction;
+
+    // 'previousWatermark' is not the same as maxEventTimestamp (e.g. it could 
have been in future).
+    // Initialize it such that watermark before reading any event same as 
previousWatermark.
+    maxEventTimestamp = previousWatermark
+      .orElse(BoundedWindow.TIMESTAMP_MIN_VALUE)
+      .plus(maxDelay);
+  }
+
+  @Override
+  public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> 
record) {
+    Instant ts = timestampFunction.apply(record);
+    if (ts.isAfter(maxEventTimestamp)) {
+      maxEventTimestamp = ts;
+    }
+    return ts;
+  }
+
+  @Override
+  public Instant getWatermark(PartitionContext ctx) {
+    // Watermark == maxEventTime - maxDelay, except in two special cases:
+    //   a) maxEventTime in future : probably due to incorrect timestamps. Cap 
it to 'now'.
+    //   b) partition is idle : Need to advance watermark if there are no 
records in the partition.
+    //         We assume that future records will have timestamp >= 'now - 
maxDelay' and advance
+    //         the watermark accordingly.
+    // The above handles majority of common use cases for custom timestamps. 
Users can implement
+    // their own policy if this does not work.
+
+    Instant now = Instant.now();
+    return getWatermark(ctx, now);
+  }
+
+  @VisibleForTesting
+  Instant getWatermark(PartitionContext ctx, Instant now) {
+    if (maxEventTimestamp.isAfter(now)) {
+      return now.minus(maxDelay);  // (a) above.
+    } else if (
+      ctx.getMessageBacklog() == 0
+      && ctx.getBacklogCheckTime().minus(maxDelay).isAfter(maxEventTimestamp) 
// Idle
+      && maxEventTimestamp.getMillis() > 0) { // Read at least one record with 
positive timestamp.
+      return ctx.getBacklogCheckTime().minus(maxDelay);
+    } else {
+      return maxEventTimestamp.minus(maxDelay);
+    }
+  }
+}
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index eb292299256..e5d2cd9b648 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -111,8 +111,9 @@
  *       // settings for ConsumerConfig. e.g :
  *       .updateConsumerProperties(ImmutableMap.of("group.id", 
"my_beam_app_1"))
  *
- *       // set event times and watermark based on LogAppendTime. To provide a 
custom
+ *       // set event times and watermark based on 'LogAppendTime'. To provide 
a custom
  *       // policy see withTimestampPolicyFactory(). withProcessingTime() is 
the default.
+ *       // Use withCreateTime() with topics that have 'CreateTime' timestamps.
  *       .withLogAppendTime()
  *
  *       // restrict reader to committed messages on Kafka (see method 
documentation).
@@ -482,23 +483,41 @@
       return 
withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
     }
 
-
     /**
      * Sets {@link TimestampPolicy} to {@link 
TimestampPolicyFactory.ProcessingTimePolicy}.
      * This is the default timestamp policy. It assigns processing time to 
each record.
      * Specifically, this is the timestamp when the record becomes 'current' 
in the reader.
-     * The watermark aways advances to current time. If servicer side time 
(log append time) is
+     * The watermark aways advances to current time. If server side time (log 
append time) is
      * enabled in Kafka, {@link #withLogAppendTime()} is recommended over this.
      */
     public Read<K, V> withProcessingTime() {
       return 
withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
     }
 
+    /**
+     * Sets the timestamps policy based on {@link 
KafkaTimestampType#CREATE_TIME} timestamp of the
+     * records. It is an error if a record's timestamp type is not
+     * {@link KafkaTimestampType#CREATE_TIME}. The timestamps within a 
partition are expected to
+     * be roughly monotonically increasing with a cap on out of order delays 
(e.g. 'max delay' of
+     * 1 minute). The watermark at any time is
+     * '({@code Min(now(), Max(event timestamp so far)) - max delay})'. 
However, watermark is never
+     * set in future and capped to 'now - max delay'. In addition, watermark 
advanced to
+     * 'now - max delay' when a partition is idle.
+     *
+     * @param maxDelay For any record in the Kafka partition, the timestamp of 
any subsequent
+     *                 record is expected to be after {@code current record 
timestamp - maxDelay}.
+     */
+    public Read<K, V> withCreateTime(Duration maxDelay) {
+      return 
withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay));
+    }
+
     /**
      * Provide custom {@link TimestampPolicyFactory} to set event times and 
watermark for each
      * partition. {@link 
TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}
      * is invoked for each partition when the reader starts.
-     * @see #withLogAppendTime() and {@link #withProcessingTime()}
+     * @see #withLogAppendTime()
+     * @see #withCreateTime(Duration)
+     * @see #withProcessingTime()
      */
     public Read<K, V> withTimestampPolicyFactory(
       TimestampPolicyFactory<K, V> timestampPolicyFactory) {
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
index d84bfe87216..8feccb6159b 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
@@ -16,6 +16,8 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.io.Serializable;
 import java.util.Optional;
 import org.apache.beam.sdk.io.kafka.TimestampPolicy.PartitionContext;
@@ -31,7 +33,8 @@
  * the the reader while starting or resuming from a checkpoint. Two commonly 
used policies are
  * provided. See {@link #withLogAppendTime()} and {@link 
#withProcessingTime()}.
  */
-public abstract class TimestampPolicyFactory<KeyT, ValueT> implements 
Serializable {
+@FunctionalInterface
+public interface TimestampPolicyFactory<KeyT, ValueT> extends Serializable {
 
   /**
    * Creates a TimestampPolicy for a partition. This is invoked by the reader 
at the start or while
@@ -42,9 +45,8 @@
    *           is resuming from a checkpoint. This is a good value to return 
by implementations
    *           of {@link TimestampPolicy#getWatermark(PartitionContext)} until 
a better watermark
    *           can be established as more records are read.
-   * @return
    */
-  public abstract TimestampPolicy<KeyT, ValueT> createTimestampPolicy(
+  TimestampPolicy<KeyT, ValueT> createTimestampPolicy(
     TopicPartition tp, Optional<Instant> previousWatermark);
 
   /**
@@ -52,14 +54,8 @@
    * Specifically, this is the timestamp when the record becomes 'current' in 
the reader.
    * The watermark aways advances to current time.
    */
-  public static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {
-    return new TimestampPolicyFactory<K, V>() {
-      @Override
-      public TimestampPolicy<K, V>
-      createTimestampPolicy(TopicPartition tp, Optional<Instant> 
previousWatermark) {
-        return new ProcessingTimePolicy<>();
-      }
-    };
+  static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() {
+    return (tp, prev) -> new ProcessingTimePolicy<>();
   }
 
   /**
@@ -68,51 +64,42 @@
    * read. If a partition is idle, the watermark advances roughly to 'current 
time - 2 seconds'.
    * See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
    */
-  public static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {
-    //return (tp, previousWatermark) -> new 
LogAppendTimePolicy<>(previousWatermark);
-    return new TimestampPolicyFactory<K, V>() {
-      @Override
-      public TimestampPolicy<K, V>
-      createTimestampPolicy(TopicPartition tp, Optional<Instant> 
previousWatermark) {
-        return new LogAppendTimePolicy<>(previousWatermark);
-      }
-    };
+  static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() {
+    return (tp, previousWatermark) -> new 
LogAppendTimePolicy<>(previousWatermark);
   }
 
-  /*
-   * TODO
-   * Provide a another built in implementation where the watermark is based on 
all the timestamps
-   * seen in last 1 minute of wall clock time (this duration could be 
configurable). This is
-   * similar to watermark set by PubsubIO.
-   *
-   * public static <K, V> TimestampPolicyFactory<K, V> withCreateTime() {
-   *   return withCustomTypestamp(...);
-   * }
-   *
-   * public static <K, V> TimestampPolicyFactory<K, V> withCustomTimestamp() {
-   * }
+  /**
+   * {@link CustomTimestampPolicyWithLimitedDelay} using {@link 
KafkaTimestampType#CREATE_TIME}
+   * from the record for timestamp. See {@link 
KafkaIO.Read#withCreateTime(Duration)} for more
+   * complete documentation.
    */
+  static <K, V> TimestampPolicyFactory<K, V> withCreateTime(Duration maxDelay) 
{
+    SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction = 
record -> {
+      checkArgument(
+        record.getTimestampType() == KafkaTimestampType.CREATE_TIME,
+        "Kafka record's timestamp is not 'CREATE_TIME' "
+        + "(topic: %s, partition %s, offset %s, timestamp type '%s')",
+        record.getTopic(), record.getPartition(), record.getOffset(), 
record.getTimestampType());
+      return new Instant(record.getTimestamp());
+    };
+
+    return (tp, previousWatermark) ->
+      new CustomTimestampPolicyWithLimitedDelay<>(timestampFunction, maxDelay, 
previousWatermark);
+  }
 
   /**
    * Used by the Read transform to support old timestamp functions API.
    */
   static <K, V> TimestampPolicyFactory<K, V> withTimestampFn(
     final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
-
-    return new TimestampPolicyFactory<K, V>() {
-      @Override
-      public TimestampPolicy<K, V> createTimestampPolicy(TopicPartition tp,
-                                                         Optional<Instant> 
previousWatermark) {
-        return new TimestampFnPolicy<>(timestampFn, previousWatermark);
-      }
-    };
+    return (tp, previousWatermark) -> new TimestampFnPolicy<>(timestampFn, 
previousWatermark);
   }
 
   /**
    * A simple policy that uses current time for event time and watermark. This 
should be used
    * when better timestamps like LogAppendTime are not available for a topic.
    */
-  public static class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> 
{
+  class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> {
 
     @Override
     public Instant getTimestampForRecord(PartitionContext context, 
KafkaRecord<K, V> record) {
@@ -131,7 +118,7 @@ public Instant getWatermark(PartitionContext context) {
    * read. If a partition is idle, the watermark advances roughly to 'current 
time - 2 seconds'.
    * See {@link KafkaIO.Read#withLogAppendTime()} for longer description.
    */
-  public static class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> {
+  class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> {
 
     /**
      * When a partition is idle or caught up (i.e. backlog is zero), we 
advance the watermark
@@ -181,7 +168,7 @@ public Instant getWatermark(PartitionContext context) {
    * Internal policy to support deprecated withTimestampFn API. It returns 
last record
    * timestamp for watermark!.
    */
-  private static class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> {
+  class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> {
 
     final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
     Instant lastRecordTimestamp;
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
new file mode 100644
index 00000000000..04e86a6a94b
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link CustomTimestampPolicyWithLimitedDelay}. */
+@RunWith(JUnit4.class)
+public class CustomTimestampPolicyWithLimitedDelayTest {
+
+  // Takes offsets of timestamps from now returns the results as offsets from 
'now'.
+  private static List<Long> getTimestampsForRecords(
+    TimestampPolicy<String, String> policy, Instant now, List<Long> 
timestampOffsets) {
+
+    return timestampOffsets
+      .stream()
+      .map(ts -> {
+        Instant result = policy.getTimestampForRecord(
+          null, new KafkaRecord<>("topic", 0, 0, now.getMillis() + ts,
+                                  KafkaTimestampType.CREATE_TIME, "key", 
"value"));
+        return result.getMillis() - now.getMillis();
+      })
+      .collect(Collectors.toList());
+  }
+
+
+  @Test
+  public void testCustomTimestampPolicyWithLimitedDelay() {
+    // Verifies that max delay is applies appropriately for reporting watermark
+
+    Duration maxDelay = Duration.standardSeconds(60);
+
+    CustomTimestampPolicyWithLimitedDelay<String, String> policy =
+      new CustomTimestampPolicyWithLimitedDelay<>(
+        (record -> new Instant(record.getTimestamp())),
+        maxDelay,
+        Optional.empty());
+
+    Instant now = Instant.now();
+
+    TimestampPolicy.PartitionContext ctx = 
mock(TimestampPolicy.PartitionContext.class);
+    when(ctx.getMessageBacklog()).thenReturn(100L);
+    when(ctx.getBacklogCheckTime()).thenReturn(now);
+
+    assertThat(policy.getWatermark(ctx), 
is(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    // (1) Test simple case : watermark == max_timesatmp - max_delay
+
+    List<Long> input = ImmutableList.of(-200_000L,
+                                        -150_000L,
+                                        -120_000L,
+                                        -140_000L,
+                                        -100_000L,  // <<< Max timestamp
+                                        -110_000L);
+    assertThat(getTimestampsForRecords(policy, now, input), is(input));
+
+    // Watermark should be max_timestamp - maxDelay
+    assertThat(policy.getWatermark(ctx), is(now
+                                              
.minus(Duration.standardSeconds(100))
+                                              .minus(maxDelay)));
+
+    // (2) Verify future timestamps
+
+    input = ImmutableList.of(-200_000L,
+                             -150_000L,
+                             -120_000L,
+                             -140_000L,
+                              100_000L,  // <<< timestamp is in future
+                             -100_000L,
+                             -110_000L);
+
+    assertThat(getTimestampsForRecords(policy, now, input), is(input));
+
+    // Watermark should be now - max_delay (backlog in context still non zero)
+    assertThat(policy.getWatermark(ctx, now), is(now.minus(maxDelay)));
+
+    // (3) Verify that Watermark advances when there is no backlog
+
+    // advance current time by 5 minutes
+    now = now.plus(Duration.standardSeconds(300));
+    Instant backlogCheckTime = now.minus(Duration.standardSeconds(10));
+
+    when(ctx.getMessageBacklog()).thenReturn(0L);
+    when(ctx.getBacklogCheckTime()).thenReturn(backlogCheckTime);
+
+    assertThat(policy.getWatermark(ctx, now), 
is(backlogCheckTime.minus(maxDelay)));
+  }
+}
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 3718c410827..71f7f134dba 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -151,12 +151,14 @@
   public ExpectedException thrown = ExpectedException.none();
 
   private static final Instant LOG_APPEND_START_TIME = new Instant(600 * 1000);
+  private static final String TIMESTAMP_START_MILLIS_CONFIG = 
"test.timestamp.start.millis";
+  private static final String TIMESTAMP_TYPE_CONFIG = "test.timestamp.type";
 
   // Update mock consumer with records distributed among the given topics, 
each with given number
   // of partitions. Records are assigned in round-robin order among the 
partitions.
   private static MockConsumer<byte[], byte[]> mkMockConsumer(
       List<String> topics, int partitionsPerTopic, int numElements,
-      OffsetResetStrategy offsetResetStrategy) {
+      OffsetResetStrategy offsetResetStrategy, Map<String, Object> config) {
 
     final List<TopicPartition> partitions = new ArrayList<>();
     final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = 
new HashMap<>();
@@ -176,6 +178,10 @@
     int numPartitions = partitions.size();
     final long[] offsets = new long[numPartitions];
 
+    long timestampStartMillis = (Long) 
config.getOrDefault(TIMESTAMP_START_MILLIS_CONFIG,
+                                                           
LOG_APPEND_START_TIME.getMillis());
+    TimestampType timestampType = TimestampType.forName((String)
+      config.getOrDefault(TIMESTAMP_TYPE_CONFIG, 
TimestampType.LOG_APPEND_TIME.toString()));
 
     for (int i = 0; i < numElements; i++) {
       int pIdx = i % numPartitions;
@@ -189,8 +195,8 @@
               tp.topic(),
               tp.partition(),
               offsets[pIdx]++,
-              
LOG_APPEND_START_TIME.plus(Duration.standardSeconds(i)).getMillis(),
-              TimestampType.LOG_APPEND_TIME,
+              timestampStartMillis + Duration.standardSeconds(i).getMillis(),
+              timestampType,
               0, key.length, value.length, key, value));
     }
 
@@ -277,7 +283,7 @@ public void run() {
 
     @Override
     public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
-      return mkMockConsumer(topics, partitionsPerTopic, numElements, 
offsetResetStrategy);
+      return mkMockConsumer(topics, partitionsPerTopic, numElements, 
offsetResetStrategy, config);
     }
   }
 
@@ -498,8 +504,73 @@ public void testUnboundedSourceLogAppendTimestamps() {
     p.run();
   }
 
+  @Test
+  public void testUnboundedSourceCustomTimestamps() {
+    // The custom timestamps is set to customTimestampStartMillis + value.
+    // Tests basic functionality of custom timestamps.
+
+    final int numElements = 1000;
+    final long customTimestampStartMillis = 80000L;
+
+    PCollection<Long> input =
+      p.apply(mkKafkaReadTransform(numElements, null)
+                .withTimestampPolicyFactory(
+                  (tp, prevWatermark) -> new 
CustomTimestampPolicyWithLimitedDelay<Integer, Long>(
+                    (record -> new 
Instant(TimeUnit.SECONDS.toMillis(record.getKV().getValue())
+                                             + customTimestampStartMillis)),
+                   Duration.millis(0),
+                   prevWatermark))
+                .withoutMetadata())
+        .apply(Values.create());
+
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs =
+      input
+        .apply(MapElements.into(TypeDescriptors.longs())
+                 .via(t -> TimeUnit.SECONDS.toMillis(t) + 
customTimestampStartMillis))
+        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+        .apply("DistinctTimestamps", Distinct.create());
+
+    // This assert also confirms that diff only has one unique value.
+    PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
+  @Test
+  public void testUnboundedSourceCreateTimestamps() {
+    // Same as testUnboundedSourceCustomTimestamps with create timestamp.
+
+    final int numElements = 1000;
+    final long createTimestampStartMillis = 50000L;
+
+    PCollection<Long> input =
+      p.apply(mkKafkaReadTransform(numElements, null)
+                .withCreateTime(Duration.millis(0))
+                .updateConsumerProperties(ImmutableMap.of(
+                  TIMESTAMP_TYPE_CONFIG, "CreateTime",
+                  TIMESTAMP_START_MILLIS_CONFIG, createTimestampStartMillis))
+                .withoutMetadata())
+        .apply(Values.create());
+
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs =
+      input
+        .apply(MapElements.into(TypeDescriptors.longs())
+                 .via(t -> TimeUnit.SECONDS.toMillis(t) + 
createTimestampStartMillis))
+        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+        .apply("DistinctTimestamps", Distinct.create());
+
+    // This assert also confirms that diff only has one unique value.
+    PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
   // Returns TIMESTAMP_MAX_VALUE for watermark when all the records are read 
from a partition.
-  static class TimestampPolicyWithEndOfSource<K, V> extends 
TimestampPolicyFactory<K, V> {
+  static class TimestampPolicyWithEndOfSource<K, V> implements 
TimestampPolicyFactory<K, V> {
     private final long maxOffset;
 
     TimestampPolicyWithEndOfSource(long maxOffset) {
@@ -553,10 +624,9 @@ public void testUnboundedSourceWithoutBoundedWrapper() {
       .withTimestampPolicyFactory(
         new TimestampPolicyWithEndOfSource<>(numElements / numPartitions - 1));
 
-    PCollection <Long> input =
-      p.apply("readFromKafka", reader.withoutMetadata())
-        .apply(Values.create())
-        .apply(Window.into(FixedWindows.of(Duration.standardDays(100))));
+    p.apply("readFromKafka", reader.withoutMetadata())
+      .apply(Values.create())
+      .apply(Window.into(FixedWindows.of(Duration.standardDays(100))));
 
     PipelineResult result = p.run();
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 86078)
    Time Spent: 3h 40m  (was: 3.5h)

> Better handling of watermark in KafkaIO
> ---------------------------------------
>
>                 Key: BEAM-591
>                 URL: https://issues.apache.org/jira/browse/BEAM-591
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>            Reporter: Raghu Angadi
>            Assignee: Raghu Angadi
>            Priority: Major
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Right now default watermark in KafkaIO is same as timestamp of the record. 
> The main problem with this is that watermark does not change if there n't any 
> new records on the topic. This can hold up many open windows. 
> The record timestamp by default is set to processing time (i.e. when the 
> runner reads a record from Kafka reader).
> A user can provide functions to calculate watermark and record timestamps. 
> There are a few concerns with current design:
> * What should happen when a kafka topic is idle:
>   ** in default case, I think watermark should advance to current time.
>   ** What should happen when user has provided a function to calculate record 
> timestamp? 
>    *** Should the watermark stay same as record timestamp?
>    *** same when user has provided own watermark function? 
> * Are the current semantics of user provided watermark function correct?
>   ** -it is run once for each record read-.
>   ** -Should it instead be run inside {{getWatermark()}} called by the runner 
> (we could still provide the last user record, and its timestamp)-.
>   ** It does run inside {{getWatermark()}}. should we pass current record 
> timestamp in addition to the record?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to