[ https://issues.apache.org/jira/browse/BEAM-591?focusedWorklogId=86078&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86078 ]
ASF GitHub Bot logged work on BEAM-591: --------------------------------------- Author: ASF GitHub Bot Created on: 30/Mar/18 16:58 Start Date: 30/Mar/18 16:58 Worklog Time Spent: 10m Work Description: XuMingmin closed pull request #4935: [BEAM-591] Support custom timestamps & CreateTime support URL: https://github.com/apache/beam/pull/4935 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml index 42bb0e99bf3..c285d01ebc2 100644 --- a/sdks/java/io/kafka/pom.xml +++ b/sdks/java/io/kafka/pom.xml @@ -122,11 +122,19 @@ <artifactId>hamcrest-core</artifactId> <scope>test</scope> </dependency> + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java new file mode 100644 index 00000000000..f2dbbe8d7ac --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Optional; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A policy for custom record timestamps where timestamps within a partition are expected to be + * roughly monotonically increasing with a cap on out of order event delays (say 1 minute). + * The watermark at any time is '({@code Min(now(), Max(event timestamp so far)) - max delay})'. + * However, watermark is never set in future and capped to 'now - max delay'. In addition, + * watermark advanced to 'now - max delay' when a partition is idle. + */ +public class CustomTimestampPolicyWithLimitedDelay<K, V> extends TimestampPolicy<K, V> { + + private final Duration maxDelay; + private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction; + private Instant maxEventTimestamp; + + /** + * A policy for custom record timestamps where timestamps are expected to be roughly monotonically + * increasing with out of order event delays less than {@code maxDelay}. The watermark at any + * time is {@code Min(now(), max_event_timestamp) - maxDelay}. + * @param timestampFunction A function to extract timestamp from the record + * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent + * record is expected to be after {@code current record timestamp - maxDelay}. + * @param previousWatermark Latest check-pointed watermark, see + * {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} + */ + public CustomTimestampPolicyWithLimitedDelay( + SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction, + Duration maxDelay, + Optional<Instant> previousWatermark) { + this.maxDelay = maxDelay; + this.timestampFunction = timestampFunction; + + // 'previousWatermark' is not the same as maxEventTimestamp (e.g. it could have been in future). + // Initialize it such that watermark before reading any event same as previousWatermark. + maxEventTimestamp = previousWatermark + .orElse(BoundedWindow.TIMESTAMP_MIN_VALUE) + .plus(maxDelay); + } + + @Override + public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) { + Instant ts = timestampFunction.apply(record); + if (ts.isAfter(maxEventTimestamp)) { + maxEventTimestamp = ts; + } + return ts; + } + + @Override + public Instant getWatermark(PartitionContext ctx) { + // Watermark == maxEventTime - maxDelay, except in two special cases: + // a) maxEventTime in future : probably due to incorrect timestamps. Cap it to 'now'. + // b) partition is idle : Need to advance watermark if there are no records in the partition. + // We assume that future records will have timestamp >= 'now - maxDelay' and advance + // the watermark accordingly. + // The above handles majority of common use cases for custom timestamps. Users can implement + // their own policy if this does not work. + + Instant now = Instant.now(); + return getWatermark(ctx, now); + } + + @VisibleForTesting + Instant getWatermark(PartitionContext ctx, Instant now) { + if (maxEventTimestamp.isAfter(now)) { + return now.minus(maxDelay); // (a) above. + } else if ( + ctx.getMessageBacklog() == 0 + && ctx.getBacklogCheckTime().minus(maxDelay).isAfter(maxEventTimestamp) // Idle + && maxEventTimestamp.getMillis() > 0) { // Read at least one record with positive timestamp. + return ctx.getBacklogCheckTime().minus(maxDelay); + } else { + return maxEventTimestamp.minus(maxDelay); + } + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index eb292299256..e5d2cd9b648 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -111,8 +111,9 @@ * // settings for ConsumerConfig. e.g : * .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1")) * - * // set event times and watermark based on LogAppendTime. To provide a custom + * // set event times and watermark based on 'LogAppendTime'. To provide a custom * // policy see withTimestampPolicyFactory(). withProcessingTime() is the default. + * // Use withCreateTime() with topics that have 'CreateTime' timestamps. * .withLogAppendTime() * * // restrict reader to committed messages on Kafka (see method documentation). @@ -482,23 +483,41 @@ return withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime()); } - /** * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.ProcessingTimePolicy}. * This is the default timestamp policy. It assigns processing time to each record. * Specifically, this is the timestamp when the record becomes 'current' in the reader. - * The watermark aways advances to current time. If servicer side time (log append time) is + * The watermark aways advances to current time. If server side time (log append time) is * enabled in Kafka, {@link #withLogAppendTime()} is recommended over this. */ public Read<K, V> withProcessingTime() { return withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()); } + /** + * Sets the timestamps policy based on {@link KafkaTimestampType#CREATE_TIME} timestamp of the + * records. It is an error if a record's timestamp type is not + * {@link KafkaTimestampType#CREATE_TIME}. The timestamps within a partition are expected to + * be roughly monotonically increasing with a cap on out of order delays (e.g. 'max delay' of + * 1 minute). The watermark at any time is + * '({@code Min(now(), Max(event timestamp so far)) - max delay})'. However, watermark is never + * set in future and capped to 'now - max delay'. In addition, watermark advanced to + * 'now - max delay' when a partition is idle. + * + * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent + * record is expected to be after {@code current record timestamp - maxDelay}. + */ + public Read<K, V> withCreateTime(Duration maxDelay) { + return withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay)); + } + /** * Provide custom {@link TimestampPolicyFactory} to set event times and watermark for each * partition. {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} * is invoked for each partition when the reader starts. - * @see #withLogAppendTime() and {@link #withProcessingTime()} + * @see #withLogAppendTime() + * @see #withCreateTime(Duration) + * @see #withProcessingTime() */ public Read<K, V> withTimestampPolicyFactory( TimestampPolicyFactory<K, V> timestampPolicyFactory) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java index d84bfe87216..8feccb6159b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java @@ -16,6 +16,8 @@ */ package org.apache.beam.sdk.io.kafka; +import static com.google.common.base.Preconditions.checkArgument; + import java.io.Serializable; import java.util.Optional; import org.apache.beam.sdk.io.kafka.TimestampPolicy.PartitionContext; @@ -31,7 +33,8 @@ * the the reader while starting or resuming from a checkpoint. Two commonly used policies are * provided. See {@link #withLogAppendTime()} and {@link #withProcessingTime()}. */ -public abstract class TimestampPolicyFactory<KeyT, ValueT> implements Serializable { +@FunctionalInterface +public interface TimestampPolicyFactory<KeyT, ValueT> extends Serializable { /** * Creates a TimestampPolicy for a partition. This is invoked by the reader at the start or while @@ -42,9 +45,8 @@ * is resuming from a checkpoint. This is a good value to return by implementations * of {@link TimestampPolicy#getWatermark(PartitionContext)} until a better watermark * can be established as more records are read. - * @return */ - public abstract TimestampPolicy<KeyT, ValueT> createTimestampPolicy( + TimestampPolicy<KeyT, ValueT> createTimestampPolicy( TopicPartition tp, Optional<Instant> previousWatermark); /** @@ -52,14 +54,8 @@ * Specifically, this is the timestamp when the record becomes 'current' in the reader. * The watermark aways advances to current time. */ - public static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() { - return new TimestampPolicyFactory<K, V>() { - @Override - public TimestampPolicy<K, V> - createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) { - return new ProcessingTimePolicy<>(); - } - }; + static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() { + return (tp, prev) -> new ProcessingTimePolicy<>(); } /** @@ -68,51 +64,42 @@ * read. If a partition is idle, the watermark advances roughly to 'current time - 2 seconds'. * See {@link KafkaIO.Read#withLogAppendTime()} for longer description. */ - public static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() { - //return (tp, previousWatermark) -> new LogAppendTimePolicy<>(previousWatermark); - return new TimestampPolicyFactory<K, V>() { - @Override - public TimestampPolicy<K, V> - createTimestampPolicy(TopicPartition tp, Optional<Instant> previousWatermark) { - return new LogAppendTimePolicy<>(previousWatermark); - } - }; + static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() { + return (tp, previousWatermark) -> new LogAppendTimePolicy<>(previousWatermark); } - /* - * TODO - * Provide a another built in implementation where the watermark is based on all the timestamps - * seen in last 1 minute of wall clock time (this duration could be configurable). This is - * similar to watermark set by PubsubIO. - * - * public static <K, V> TimestampPolicyFactory<K, V> withCreateTime() { - * return withCustomTypestamp(...); - * } - * - * public static <K, V> TimestampPolicyFactory<K, V> withCustomTimestamp() { - * } + /** + * {@link CustomTimestampPolicyWithLimitedDelay} using {@link KafkaTimestampType#CREATE_TIME} + * from the record for timestamp. See {@link KafkaIO.Read#withCreateTime(Duration)} for more + * complete documentation. */ + static <K, V> TimestampPolicyFactory<K, V> withCreateTime(Duration maxDelay) { + SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction = record -> { + checkArgument( + record.getTimestampType() == KafkaTimestampType.CREATE_TIME, + "Kafka record's timestamp is not 'CREATE_TIME' " + + "(topic: %s, partition %s, offset %s, timestamp type '%s')", + record.getTopic(), record.getPartition(), record.getOffset(), record.getTimestampType()); + return new Instant(record.getTimestamp()); + }; + + return (tp, previousWatermark) -> + new CustomTimestampPolicyWithLimitedDelay<>(timestampFunction, maxDelay, previousWatermark); + } /** * Used by the Read transform to support old timestamp functions API. */ static <K, V> TimestampPolicyFactory<K, V> withTimestampFn( final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) { - - return new TimestampPolicyFactory<K, V>() { - @Override - public TimestampPolicy<K, V> createTimestampPolicy(TopicPartition tp, - Optional<Instant> previousWatermark) { - return new TimestampFnPolicy<>(timestampFn, previousWatermark); - } - }; + return (tp, previousWatermark) -> new TimestampFnPolicy<>(timestampFn, previousWatermark); } /** * A simple policy that uses current time for event time and watermark. This should be used * when better timestamps like LogAppendTime are not available for a topic. */ - public static class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> { + class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> { @Override public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) { @@ -131,7 +118,7 @@ public Instant getWatermark(PartitionContext context) { * read. If a partition is idle, the watermark advances roughly to 'current time - 2 seconds'. * See {@link KafkaIO.Read#withLogAppendTime()} for longer description. */ - public static class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> { + class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> { /** * When a partition is idle or caught up (i.e. backlog is zero), we advance the watermark @@ -181,7 +168,7 @@ public Instant getWatermark(PartitionContext context) { * Internal policy to support deprecated withTimestampFn API. It returns last record * timestamp for watermark!. */ - private static class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> { + class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> { final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn; Instant lastRecordTimestamp; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java new file mode 100644 index 00000000000..04e86a6a94b --- /dev/null +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link CustomTimestampPolicyWithLimitedDelay}. */ +@RunWith(JUnit4.class) +public class CustomTimestampPolicyWithLimitedDelayTest { + + // Takes offsets of timestamps from now returns the results as offsets from 'now'. + private static List<Long> getTimestampsForRecords( + TimestampPolicy<String, String> policy, Instant now, List<Long> timestampOffsets) { + + return timestampOffsets + .stream() + .map(ts -> { + Instant result = policy.getTimestampForRecord( + null, new KafkaRecord<>("topic", 0, 0, now.getMillis() + ts, + KafkaTimestampType.CREATE_TIME, "key", "value")); + return result.getMillis() - now.getMillis(); + }) + .collect(Collectors.toList()); + } + + + @Test + public void testCustomTimestampPolicyWithLimitedDelay() { + // Verifies that max delay is applies appropriately for reporting watermark + + Duration maxDelay = Duration.standardSeconds(60); + + CustomTimestampPolicyWithLimitedDelay<String, String> policy = + new CustomTimestampPolicyWithLimitedDelay<>( + (record -> new Instant(record.getTimestamp())), + maxDelay, + Optional.empty()); + + Instant now = Instant.now(); + + TimestampPolicy.PartitionContext ctx = mock(TimestampPolicy.PartitionContext.class); + when(ctx.getMessageBacklog()).thenReturn(100L); + when(ctx.getBacklogCheckTime()).thenReturn(now); + + assertThat(policy.getWatermark(ctx), is(BoundedWindow.TIMESTAMP_MIN_VALUE)); + + // (1) Test simple case : watermark == max_timesatmp - max_delay + + List<Long> input = ImmutableList.of(-200_000L, + -150_000L, + -120_000L, + -140_000L, + -100_000L, // <<< Max timestamp + -110_000L); + assertThat(getTimestampsForRecords(policy, now, input), is(input)); + + // Watermark should be max_timestamp - maxDelay + assertThat(policy.getWatermark(ctx), is(now + .minus(Duration.standardSeconds(100)) + .minus(maxDelay))); + + // (2) Verify future timestamps + + input = ImmutableList.of(-200_000L, + -150_000L, + -120_000L, + -140_000L, + 100_000L, // <<< timestamp is in future + -100_000L, + -110_000L); + + assertThat(getTimestampsForRecords(policy, now, input), is(input)); + + // Watermark should be now - max_delay (backlog in context still non zero) + assertThat(policy.getWatermark(ctx, now), is(now.minus(maxDelay))); + + // (3) Verify that Watermark advances when there is no backlog + + // advance current time by 5 minutes + now = now.plus(Duration.standardSeconds(300)); + Instant backlogCheckTime = now.minus(Duration.standardSeconds(10)); + + when(ctx.getMessageBacklog()).thenReturn(0L); + when(ctx.getBacklogCheckTime()).thenReturn(backlogCheckTime); + + assertThat(policy.getWatermark(ctx, now), is(backlogCheckTime.minus(maxDelay))); + } +} diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 3718c410827..71f7f134dba 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -151,12 +151,14 @@ public ExpectedException thrown = ExpectedException.none(); private static final Instant LOG_APPEND_START_TIME = new Instant(600 * 1000); + private static final String TIMESTAMP_START_MILLIS_CONFIG = "test.timestamp.start.millis"; + private static final String TIMESTAMP_TYPE_CONFIG = "test.timestamp.type"; // Update mock consumer with records distributed among the given topics, each with given number // of partitions. Records are assigned in round-robin order among the partitions. private static MockConsumer<byte[], byte[]> mkMockConsumer( List<String> topics, int partitionsPerTopic, int numElements, - OffsetResetStrategy offsetResetStrategy) { + OffsetResetStrategy offsetResetStrategy, Map<String, Object> config) { final List<TopicPartition> partitions = new ArrayList<>(); final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>(); @@ -176,6 +178,10 @@ int numPartitions = partitions.size(); final long[] offsets = new long[numPartitions]; + long timestampStartMillis = (Long) config.getOrDefault(TIMESTAMP_START_MILLIS_CONFIG, + LOG_APPEND_START_TIME.getMillis()); + TimestampType timestampType = TimestampType.forName((String) + config.getOrDefault(TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString())); for (int i = 0; i < numElements; i++) { int pIdx = i % numPartitions; @@ -189,8 +195,8 @@ tp.topic(), tp.partition(), offsets[pIdx]++, - LOG_APPEND_START_TIME.plus(Duration.standardSeconds(i)).getMillis(), - TimestampType.LOG_APPEND_TIME, + timestampStartMillis + Duration.standardSeconds(i).getMillis(), + timestampType, 0, key.length, value.length, key, value)); } @@ -277,7 +283,7 @@ public void run() { @Override public Consumer<byte[], byte[]> apply(Map<String, Object> config) { - return mkMockConsumer(topics, partitionsPerTopic, numElements, offsetResetStrategy); + return mkMockConsumer(topics, partitionsPerTopic, numElements, offsetResetStrategy, config); } } @@ -498,8 +504,73 @@ public void testUnboundedSourceLogAppendTimestamps() { p.run(); } + @Test + public void testUnboundedSourceCustomTimestamps() { + // The custom timestamps is set to customTimestampStartMillis + value. + // Tests basic functionality of custom timestamps. + + final int numElements = 1000; + final long customTimestampStartMillis = 80000L; + + PCollection<Long> input = + p.apply(mkKafkaReadTransform(numElements, null) + .withTimestampPolicyFactory( + (tp, prevWatermark) -> new CustomTimestampPolicyWithLimitedDelay<Integer, Long>( + (record -> new Instant(TimeUnit.SECONDS.toMillis(record.getKV().getValue()) + + customTimestampStartMillis)), + Duration.millis(0), + prevWatermark)) + .withoutMetadata()) + .apply(Values.create()); + + addCountingAsserts(input, numElements); + + PCollection<Long> diffs = + input + .apply(MapElements.into(TypeDescriptors.longs()) + .via(t -> TimeUnit.SECONDS.toMillis(t) + customTimestampStartMillis)) + .apply("TimestampDiff", ParDo.of(new ElementValueDiff())) + .apply("DistinctTimestamps", Distinct.create()); + + // This assert also confirms that diff only has one unique value. + PAssert.thatSingleton(diffs).isEqualTo(0L); + + p.run(); + } + + @Test + public void testUnboundedSourceCreateTimestamps() { + // Same as testUnboundedSourceCustomTimestamps with create timestamp. + + final int numElements = 1000; + final long createTimestampStartMillis = 50000L; + + PCollection<Long> input = + p.apply(mkKafkaReadTransform(numElements, null) + .withCreateTime(Duration.millis(0)) + .updateConsumerProperties(ImmutableMap.of( + TIMESTAMP_TYPE_CONFIG, "CreateTime", + TIMESTAMP_START_MILLIS_CONFIG, createTimestampStartMillis)) + .withoutMetadata()) + .apply(Values.create()); + + addCountingAsserts(input, numElements); + + PCollection<Long> diffs = + input + .apply(MapElements.into(TypeDescriptors.longs()) + .via(t -> TimeUnit.SECONDS.toMillis(t) + createTimestampStartMillis)) + .apply("TimestampDiff", ParDo.of(new ElementValueDiff())) + .apply("DistinctTimestamps", Distinct.create()); + + // This assert also confirms that diff only has one unique value. + PAssert.thatSingleton(diffs).isEqualTo(0L); + + p.run(); + } + // Returns TIMESTAMP_MAX_VALUE for watermark when all the records are read from a partition. - static class TimestampPolicyWithEndOfSource<K, V> extends TimestampPolicyFactory<K, V> { + static class TimestampPolicyWithEndOfSource<K, V> implements TimestampPolicyFactory<K, V> { private final long maxOffset; TimestampPolicyWithEndOfSource(long maxOffset) { @@ -553,10 +624,9 @@ public void testUnboundedSourceWithoutBoundedWrapper() { .withTimestampPolicyFactory( new TimestampPolicyWithEndOfSource<>(numElements / numPartitions - 1)); - PCollection <Long> input = - p.apply("readFromKafka", reader.withoutMetadata()) - .apply(Values.create()) - .apply(Window.into(FixedWindows.of(Duration.standardDays(100)))); + p.apply("readFromKafka", reader.withoutMetadata()) + .apply(Values.create()) + .apply(Window.into(FixedWindows.of(Duration.standardDays(100)))); PipelineResult result = p.run(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 86078) Time Spent: 3h 40m (was: 3.5h) > Better handling of watermark in KafkaIO > --------------------------------------- > > Key: BEAM-591 > URL: https://issues.apache.org/jira/browse/BEAM-591 > Project: Beam > Issue Type: Bug > Components: io-java-kafka > Reporter: Raghu Angadi > Assignee: Raghu Angadi > Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Right now default watermark in KafkaIO is same as timestamp of the record. > The main problem with this is that watermark does not change if there n't any > new records on the topic. This can hold up many open windows. > The record timestamp by default is set to processing time (i.e. when the > runner reads a record from Kafka reader). > A user can provide functions to calculate watermark and record timestamps. > There are a few concerns with current design: > * What should happen when a kafka topic is idle: > ** in default case, I think watermark should advance to current time. > ** What should happen when user has provided a function to calculate record > timestamp? > *** Should the watermark stay same as record timestamp? > *** same when user has provided own watermark function? > * Are the current semantics of user provided watermark function correct? > ** -it is run once for each record read-. > ** -Should it instead be run inside {{getWatermark()}} called by the runner > (we could still provide the last user record, and its timestamp)-. > ** It does run inside {{getWatermark()}}. should we pass current record > timestamp in addition to the record? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)