Repository: incubator-beam Updated Branches: refs/heads/master b25131422 -> f0f4af581
Use Avro serializer for Kafka checkpoint mark. This is more partable. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937ac3b2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937ac3b2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937ac3b2 Branch: refs/heads/master Commit: 937ac3b2ddc60fd9446440c9354139c6234cb625 Parents: b251314 Author: Raghu Angadi <[email protected]> Authored: Tue Nov 8 07:08:32 2016 -0800 Committer: Dan Halperin <[email protected]> Committed: Fri Nov 11 16:14:07 2016 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/kafka/KafkaCheckpointMark.java | 32 +++++++++++++------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 18 ++++++----- 2 files changed, 32 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937ac3b2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java index 4f9e96f..763a98a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java @@ -20,19 +20,21 @@ package org.apache.beam.sdk.io.kafka; import java.io.IOException; import java.io.Serializable; import java.util.List; + +import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.kafka.common.TopicPartition; /** * Checkpoint for an unbounded KafkaIO.Read. Consists of Kafka topic name, partition id, * and the latest offset consumed so far. */ -@DefaultCoder(SerializableCoder.class) -public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Serializable { +@DefaultCoder(AvroCoder.class) +public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark { + + private List<PartitionMark> partitions; - private final List<PartitionMark> partitions; + private KafkaCheckpointMark() {} // for Avro public KafkaCheckpointMark(List<PartitionMark> partitions) { this.partitions = partitions; @@ -55,16 +57,24 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Seri * for a single partition. */ public static class PartitionMark implements Serializable { - private final TopicPartition topicPartition; - private final long nextOffset; + private String topic; + private int partition; + private long nextOffset; + + private PartitionMark() {} // for Avro - public PartitionMark(TopicPartition topicPartition, long offset) { - this.topicPartition = topicPartition; + public PartitionMark(String topic, int partition, long offset) { + this.topic = topic; + this.partition = partition; this.nextOffset = offset; } - public TopicPartition getTopicPartition() { - return topicPartition; + public String getTopic() { + return topic; + } + + public int getPartition() { + return partition; } public long getNextOffset() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937ac3b2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 834104e..4212d59 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -49,11 +49,12 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.io.UnboundedSource; @@ -721,7 +722,7 @@ public class KafkaIO { @Override public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() { - return SerializableCoder.of(KafkaCheckpointMark.class); + return AvroCoder.of(KafkaCheckpointMark.class); } @Override @@ -856,10 +857,11 @@ public class KafkaIO { for (int i = 0; i < source.assignedPartitions.size(); i++) { PartitionMark ckptMark = checkpointMark.getPartitions().get(i); TopicPartition assigned = source.assignedPartitions.get(i); - - checkState(ckptMark.getTopicPartition().equals(assigned), - "checkpointed partition %s and assigned partition %s don't match", - ckptMark.getTopicPartition(), assigned); + TopicPartition partition = new TopicPartition(ckptMark.getTopic(), + ckptMark.getPartition()); + checkState(partition.equals(assigned), + "checkpointed partition %s and assigned partition %s don't match", + partition, assigned); partitionStates.get(i).nextOffset = ckptMark.getNextOffset(); } @@ -1084,7 +1086,9 @@ public class KafkaIO { Lists.transform(partitionStates, new Function<PartitionState, PartitionMark>() { public PartitionMark apply(PartitionState p) { - return new PartitionMark(p.topicPartition, p.nextOffset); + return new PartitionMark(p.topicPartition.topic(), + p.topicPartition.partition(), + p.nextOffset); } } )));
