Repository: flume Updated Branches: refs/heads/trunk 9eb92dab0 -> 1e8f2651d
FLUME-2972. Handle offset migration in the new Kafka Channel Offsets tracking the position in Kafka consumers change from using Zookeeper for offset storage to Kafka when moving from Kafka 0.8.x to 0.9.x. FLUME-2823 makes the client change in the Kafka Channel but does not ensure existing offsets get migrated in order to continue consuming where it left off. Flume should have some automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka. Reviewers: Balázs Donát Bessenyei, Denes Arvay, Mike Percy (Grant Henke via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1e8f2651 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1e8f2651 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1e8f2651 Branch: refs/heads/trunk Commit: 1e8f2651dacf5daef55d75c7b9b12492962e7921 Parents: 9eb92da Author: Grant Henke <[email protected]> Authored: Thu Aug 25 16:48:25 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Thu Aug 25 16:54:17 2016 -0700 ---------------------------------------------------------------------- flume-ng-channels/flume-kafka-channel/pom.xml | 2 +- .../flume/channel/kafka/KafkaChannel.java | 103 ++++++++++++- .../kafka/KafkaChannelConfiguration.java | 3 + .../flume/channel/kafka/TestKafkaChannel.java | 148 ++++++++++++++++--- flume-ng-doc/sphinx/FlumeUserGuide.rst | 4 + 5 files changed, 234 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-channels/flume-kafka-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml index 587b4b4..c1cc844 100644 --- a/flume-ng-channels/flume-kafka-channel/pom.xml +++ b/flume-ng-channels/flume-kafka-channel/pom.xml @@ -40,7 +40,7 @@ limitations under the License. <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> - <scope>test</scope> + <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 90e3288..684120f 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -20,6 +20,8 @@ package org.apache.flume.channel.kafka; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import kafka.utils.ZKGroupTopicDirs; +import kafka.utils.ZkUtils; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; @@ -47,9 +49,12 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.security.JaasUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -70,12 +75,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; +import static scala.collection.JavaConverters.asJavaListConverter; public class KafkaChannel extends BasicChannelSemantics { private static final Logger logger = LoggerFactory.getLogger(KafkaChannel.class); + // Constants used only for offset migration zookeeper connections + private static final int ZK_SESSION_TIMEOUT = 30000; + private static final int ZK_CONNECTION_TIMEOUT = 30000; + private final Properties consumerProps = new Properties(); private final Properties producerProps = new Properties(); @@ -84,6 +94,10 @@ public class KafkaChannel extends BasicChannelSemantics { private AtomicReference<String> topic = new AtomicReference<String>(); private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT; + private String zookeeperConnect = null; + private String topicStr = DEFAULT_TOPIC; + private String groupId = DEFAULT_GROUP_ID; + private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS; //used to indicate if a rebalance has occurred during the current transaction AtomicBoolean rebalanceFlag = new AtomicBoolean(); @@ -113,6 +127,11 @@ public class KafkaChannel extends BasicChannelSemantics { @Override public void start() { logger.info("Starting Kafka Channel: {}", getName()); + // As a migration step check if there are any offsets from the group stored in kafka + // If not read them from Zookeeper and commit them to Kafka + if (migrateZookeeperOffsets && zookeeperConnect != null && !zookeeperConnect.isEmpty()) { + migrateOffsets(); + } producer = new KafkaProducer<String, byte[]>(producerProps); // We always have just one topic being read by one thread logger.info("Topic = {}", topic.get()); @@ -147,12 +166,19 @@ public class KafkaChannel extends BasicChannelSemantics { //Can remove in the next release translateOldProps(ctx); - String topicStr = ctx.getString(TOPIC_CONFIG); + topicStr = ctx.getString(TOPIC_CONFIG); if (topicStr == null || topicStr.isEmpty()) { topicStr = DEFAULT_TOPIC; logger.info("Topic was not specified. Using {} as the topic.", topicStr); } topic.set(topicStr); + + groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); + if (groupId == null || groupId.isEmpty()) { + groupId = DEFAULT_GROUP_ID; + logger.info("Group ID was not specified. Using {} as the group id.", groupId); + } + String bootStrapServers = ctx.getString(BOOTSTRAP_SERVERS_CONFIG); if (bootStrapServers == null || bootStrapServers.isEmpty()) { throw new ConfigurationException("Bootstrap Servers must be specified"); @@ -164,6 +190,10 @@ public class KafkaChannel extends BasicChannelSemantics { parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT); pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT); + migrateZookeeperOffsets = ctx.getBoolean(MIGRATE_ZOOKEEPER_OFFSETS, + DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS); + zookeeperConnect = ctx.getString(ZOOKEEPER_CONNECT); + if (counter == null) { counter = new KafkaChannelCounter(getName()); } @@ -235,11 +265,6 @@ public class KafkaChannel extends BasicChannelSemantics { } private void setConsumerProps(Context ctx, String bootStrapServers) { - String groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); - if (groupId == null || groupId.isEmpty()) { - groupId = DEFAULT_GROUP_ID; - logger.info("Group ID was not specified. Using {} as the group id.", groupId); - } consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET); @@ -272,6 +297,72 @@ public class KafkaChannel extends BasicChannelSemantics { } } + private void migrateOffsets() { + ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, + JaasUtils.isZkSecurityEnabled()); + KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); + try { + Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer); + if (!kafkaOffsets.isEmpty()) { + logger.info("Found Kafka offsets for topic " + topicStr + + ". Will not migrate from zookeeper"); + logger.debug("Offsets found: {}", kafkaOffsets); + return; + } + + logger.info("No Kafka offsets found. Migrating zookeeper offsets"); + Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = getZookeeperOffsets(zkUtils); + if (zookeeperOffsets.isEmpty()) { + logger.warn("No offsets to migrate found in Zookeeper"); + return; + } + + logger.info("Committing Zookeeper offsets to Kafka"); + logger.debug("Offsets to commit: {}", zookeeperOffsets); + consumer.commitSync(zookeeperOffsets); + // Read the offsets to verify they were committed + Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets = getKafkaOffsets(consumer); + logger.debug("Offsets committed: {}", newKafkaOffsets); + if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) { + throw new FlumeException("Offsets could not be committed"); + } + } finally { + zkUtils.close(); + consumer.close(); + } + } + + private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets( + KafkaConsumer<String, byte[]> client) { + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + List<PartitionInfo> partitions = client.partitionsFor(topicStr); + for (PartitionInfo partition : partitions) { + TopicPartition key = new TopicPartition(topicStr, partition.partition()); + OffsetAndMetadata offsetAndMetadata = client.committed(key); + if (offsetAndMetadata != null) { + offsets.put(key, offsetAndMetadata); + } + } + return offsets; + } + + private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) { + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); + List<String> partitions = asJavaListConverter( + client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); + for (String partition : partitions) { + TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); + Option<String> data = client.readDataMaybeNull( + topicDirs.consumerOffsetDir() + "/" + partition)._1(); + if (data.isDefined()) { + Long offset = Long.valueOf(data.get()); + offsets.put(key, new OffsetAndMetadata(offset)); + } + } + return offsets; + } + private void decommissionConsumerAndRecords(ConsumerAndRecords c) { c.consumer.close(); } http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java index ccf46d9..3ab807b 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java @@ -49,6 +49,9 @@ public class KafkaChannelConfiguration { public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent"; public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true; + public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets"; + public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true; + /*** Old Configuration Parameters ****/ public static final String BROKER_LIST_KEY = "metadata.broker.list"; public static final String REQUIRED_ACKS_KEY = "request.required.acks"; http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java index b63ac9b..e7ae68f 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -20,6 +20,7 @@ package org.apache.flume.channel.kafka; import com.google.common.collect.Lists; import kafka.admin.AdminUtils; +import kafka.utils.ZKGroupTopicDirs; import kafka.utils.ZkUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.flume.Context; @@ -30,8 +31,13 @@ import org.apache.flume.event.EventBuilder; import org.apache.flume.sink.kafka.util.TestUtil; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.security.JaasUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -55,13 +61,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; public class TestKafkaChannel { @@ -77,16 +77,9 @@ public class TestKafkaChannel { @Before public void setup() throws Exception { - boolean topicFound = false; - while (!topicFound) { - topic = RandomStringUtils.randomAlphabetic(8); - if (!usedTopics.contains(topic)) { - usedTopics.add(topic); - topicFound = true; - } - } + topic = findUnusedTopic(); try { - createTopic(topic); + createTopic(topic, 5); } catch (Exception e) { } Thread.sleep(2500); @@ -235,6 +228,106 @@ public class TestKafkaChannel { Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody())); } + @Test + public void testMigrateOffsetsNone() throws Exception { + doTestMigrateZookeeperOffsets(false, false, "testMigrateOffsets-none"); + } + + @Test + public void testMigrateOffsetsZookeeper() throws Exception { + doTestMigrateZookeeperOffsets(true, false, "testMigrateOffsets-zookeeper"); + } + + @Test + public void testMigrateOffsetsKafka() throws Exception { + doTestMigrateZookeeperOffsets(false, true, "testMigrateOffsets-kafka"); + } + + @Test + public void testMigrateOffsetsBoth() throws Exception { + doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both"); + } + + public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets, + String group) throws Exception { + // create a topic with 1 partition for simplicity + topic = findUnusedTopic(); + createTopic(topic, 1); + + Context context = prepareDefaultContext(false); + context.put(ZOOKEEPER_CONNECT, testUtil.getZkUrl()); + context.put(GROUP_ID_FLUME, group); + final KafkaChannel channel = createChannel(context); + + // Produce some data and save an offset + Long fifthOffset = 0L; + Long tenthOffset = 0L; + Properties props = channel.getProducerProps(); + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props); + for (int i = 1; i <= 50; i++) { + ProducerRecord<String, byte[]> data = + new ProducerRecord<>(topic, null, String.valueOf(i).getBytes()); + RecordMetadata recordMetadata = producer.send(data).get(); + if (i == 5) { + fifthOffset = recordMetadata.offset(); + } + if (i == 10) { + tenthOffset = recordMetadata.offset(); + } + } + + // Commit 10th offset to zookeeper + if (hasZookeeperOffsets) { + ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), 30000, 30000, + JaasUtils.isZkSecurityEnabled()); + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(group, topic); + // we commit the tenth offset to ensure some data is missed. + Long offset = tenthOffset + 1; + zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir() + "/0", offset.toString(), + zkUtils.updatePersistentPath$default$3()); + zkUtils.close(); + } + + // Commit 5th offset to kafka + if (hasKafkaOffsets) { + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(fifthOffset + 1)); + KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(channel.getConsumerProps()); + consumer.commitSync(offsets); + consumer.close(); + } + + // Start the channel and read some data + channel.start(); + ExecutorCompletionService<Void> submitterSvc = new + ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); + List<Event> events = pullEvents(channel, submitterSvc, + 20, false, false); + wait(submitterSvc, 5); + List<Integer> finals = new ArrayList<Integer>(40); + for (Event event: events) { + finals.add(Integer.parseInt(new String(event.getBody()))); + } + channel.stop(); + + if (!hasKafkaOffsets && !hasZookeeperOffsets) { + // The default behavior is to read the entire log + Assert.assertTrue("Channel should read the the first message", finals.contains(1)); + } else if (hasKafkaOffsets && hasZookeeperOffsets) { + // Respect Kafka offsets if they exist + Assert.assertFalse("Channel should not read the 5th message", finals.contains(5)); + Assert.assertTrue("Channel should read the 6th message", finals.contains(6)); + } else if (hasKafkaOffsets) { + // Respect Kafka offsets if they exist (don't fail if zookeeper offsets are missing) + Assert.assertFalse("Channel should not read the 5th message", finals.contains(5)); + Assert.assertTrue("Channel should read the 6th message", finals.contains(6)); + } else { + // Otherwise migrate the ZooKeeper offsets if they exist + Assert.assertFalse("Channel should not read the 10th message", finals.contains(10)); + Assert.assertTrue("Channel should read the 11th message", finals.contains(11)); + } + } + private Event takeEventWithoutCommittingTxn(KafkaChannel channel) { for (int i = 0; i < 5; i++) { Transaction txn = channel.getTransaction(); @@ -396,9 +489,14 @@ public class TestKafkaChannel { private KafkaChannel startChannel(boolean parseAsFlume) throws Exception { Context context = prepareDefaultContext(parseAsFlume); + KafkaChannel channel = createChannel(context); + channel.start(); + return channel; + } + + private KafkaChannel createChannel(Context context) throws Exception { final KafkaChannel channel = new KafkaChannel(); Configurables.configure(channel, context); - channel.start(); return channel; } @@ -585,8 +683,20 @@ public class TestKafkaChannel { return context; } - public static void createTopic(String topicName) { - int numPartitions = 5; + public String findUnusedTopic() { + String newTopic = null; + boolean topicFound = false; + while (!topicFound) { + newTopic = RandomStringUtils.randomAlphabetic(8); + if (!usedTopics.contains(newTopic)) { + usedTopics.add(newTopic); + topicFound = true; + } + } + return newTopic; + } + + public static void createTopic(String topicName, int numPartitions) { int sessionTimeoutMs = 10000; int connectionTimeoutMs = 10000; ZkUtils zkUtils = http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 5e677c6..7e207aa 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2731,6 +2731,10 @@ parseAsFlumeEvent true Expecting A This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact +migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. + This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set + to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset + configuration defines how offsets are handled. pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the conumer. https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) kafka.consumer.auto.offset.reset latest What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
