Repository: flume
Updated Branches:
  refs/heads/trunk 9eb92dab0 -> 1e8f2651d


FLUME-2972. Handle offset migration in the new Kafka Channel

Offsets tracking the position in Kafka consumers change from using
Zookeeper for offset storage to Kafka when moving from Kafka 0.8.x to
0.9.x.

FLUME-2823 makes the client change in the Kafka Channel but does not
ensure existing offsets get migrated in order to continue consuming
where it left off.

Flume should have some automated logic on startup to check if Kafka
offsets exist, if not and migration is enabled (by default) then copy
the offsets from Zookeeper and commit them to Kafka.

Reviewers: Balázs Donát Bessenyei, Denes Arvay, Mike Percy

(Grant Henke via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1e8f2651
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1e8f2651
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1e8f2651

Branch: refs/heads/trunk
Commit: 1e8f2651dacf5daef55d75c7b9b12492962e7921
Parents: 9eb92da
Author: Grant Henke <[email protected]>
Authored: Thu Aug 25 16:48:25 2016 -0700
Committer: Mike Percy <[email protected]>
Committed: Thu Aug 25 16:54:17 2016 -0700

----------------------------------------------------------------------
 flume-ng-channels/flume-kafka-channel/pom.xml   |   2 +-
 .../flume/channel/kafka/KafkaChannel.java       | 103 ++++++++++++-
 .../kafka/KafkaChannelConfiguration.java        |   3 +
 .../flume/channel/kafka/TestKafkaChannel.java   | 148 ++++++++++++++++---
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   4 +
 5 files changed, 234 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-channels/flume-kafka-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml 
b/flume-ng-channels/flume-kafka-channel/pom.xml
index 587b4b4..c1cc844 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -40,7 +40,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
-      <scope>test</scope>
+      <version>${kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>

http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 90e3288..684120f 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -20,6 +20,8 @@ package org.apache.flume.channel.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
@@ -47,9 +49,12 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.security.JaasUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -70,12 +75,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+import static scala.collection.JavaConverters.asJavaListConverter;
 
 public class KafkaChannel extends BasicChannelSemantics {
 
   private static final Logger logger =
           LoggerFactory.getLogger(KafkaChannel.class);
 
+  // Constants used only for offset migration zookeeper connections
+  private static final int ZK_SESSION_TIMEOUT = 30000;
+  private static final int ZK_CONNECTION_TIMEOUT = 30000;
+
   private final Properties consumerProps = new Properties();
   private final Properties producerProps = new Properties();
 
@@ -84,6 +94,10 @@ public class KafkaChannel extends BasicChannelSemantics {
 
   private AtomicReference<String> topic = new AtomicReference<String>();
   private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT;
+  private String zookeeperConnect = null;
+  private String topicStr = DEFAULT_TOPIC;
+  private String groupId = DEFAULT_GROUP_ID;
+  private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
 
   //used to indicate if a rebalance has occurred during the current transaction
   AtomicBoolean rebalanceFlag = new AtomicBoolean();
@@ -113,6 +127,11 @@ public class KafkaChannel extends BasicChannelSemantics {
   @Override
   public void start() {
     logger.info("Starting Kafka Channel: {}", getName());
+    // As a migration step check if there are any offsets from the group 
stored in kafka
+    // If not read them from Zookeeper and commit them to Kafka
+    if (migrateZookeeperOffsets && zookeeperConnect != null && 
!zookeeperConnect.isEmpty()) {
+      migrateOffsets();
+    }
     producer = new KafkaProducer<String, byte[]>(producerProps);
     // We always have just one topic being read by one thread
     logger.info("Topic = {}", topic.get());
@@ -147,12 +166,19 @@ public class KafkaChannel extends BasicChannelSemantics {
     //Can remove in the next release
     translateOldProps(ctx);
 
-    String topicStr = ctx.getString(TOPIC_CONFIG);
+    topicStr = ctx.getString(TOPIC_CONFIG);
     if (topicStr == null || topicStr.isEmpty()) {
       topicStr = DEFAULT_TOPIC;
       logger.info("Topic was not specified. Using {} as the topic.", topicStr);
     }
     topic.set(topicStr);
+
+    groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + 
ConsumerConfig.GROUP_ID_CONFIG);
+    if (groupId == null || groupId.isEmpty()) {
+      groupId = DEFAULT_GROUP_ID;
+      logger.info("Group ID was not specified. Using {} as the group id.", 
groupId);
+    }
+
     String bootStrapServers = ctx.getString(BOOTSTRAP_SERVERS_CONFIG);
     if (bootStrapServers == null || bootStrapServers.isEmpty()) {
       throw new ConfigurationException("Bootstrap Servers must be specified");
@@ -164,6 +190,10 @@ public class KafkaChannel extends BasicChannelSemantics {
     parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, 
DEFAULT_PARSE_AS_FLUME_EVENT);
     pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT);
 
+    migrateZookeeperOffsets = ctx.getBoolean(MIGRATE_ZOOKEEPER_OFFSETS,
+      DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS);
+    zookeeperConnect = ctx.getString(ZOOKEEPER_CONNECT);
+
     if (counter == null) {
       counter = new KafkaChannelCounter(getName());
     }
@@ -235,11 +265,6 @@ public class KafkaChannel extends BasicChannelSemantics {
   }
 
   private void setConsumerProps(Context ctx, String bootStrapServers) {
-    String groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + 
ConsumerConfig.GROUP_ID_CONFIG);
-    if (groupId == null || groupId.isEmpty()) {
-      groupId = DEFAULT_GROUP_ID;
-      logger.info("Group ID was not specified. Using {} as the group id.", 
groupId);
-    }
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
DEFAULT_KEY_DESERIALIZER);
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
DEFAULT_VALUE_DESERIAIZER);
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
DEFAULT_AUTO_OFFSET_RESET);
@@ -272,6 +297,72 @@ public class KafkaChannel extends BasicChannelSemantics {
     }
   }
 
+  private void migrateOffsets() {
+    ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, 
ZK_CONNECTION_TIMEOUT,
+        JaasUtils.isZkSecurityEnabled());
+    KafkaConsumer<String, byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+    try {
+      Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = 
getKafkaOffsets(consumer);
+      if (!kafkaOffsets.isEmpty()) {
+        logger.info("Found Kafka offsets for topic " + topicStr +
+            ". Will not migrate from zookeeper");
+        logger.debug("Offsets found: {}", kafkaOffsets);
+        return;
+      }
+
+      logger.info("No Kafka offsets found. Migrating zookeeper offsets");
+      Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = 
getZookeeperOffsets(zkUtils);
+      if (zookeeperOffsets.isEmpty()) {
+        logger.warn("No offsets to migrate found in Zookeeper");
+        return;
+      }
+
+      logger.info("Committing Zookeeper offsets to Kafka");
+      logger.debug("Offsets to commit: {}", zookeeperOffsets);
+      consumer.commitSync(zookeeperOffsets);
+      // Read the offsets to verify they were committed
+      Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets = 
getKafkaOffsets(consumer);
+      logger.debug("Offsets committed: {}", newKafkaOffsets);
+      if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
+        throw new FlumeException("Offsets could not be committed");
+      }
+    } finally {
+      zkUtils.close();
+      consumer.close();
+    }
+  }
+
+  private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
+      KafkaConsumer<String, byte[]> client) {
+    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+    List<PartitionInfo> partitions = client.partitionsFor(topicStr);
+    for (PartitionInfo partition : partitions) {
+      TopicPartition key = new TopicPartition(topicStr, partition.partition());
+      OffsetAndMetadata offsetAndMetadata = client.committed(key);
+      if (offsetAndMetadata != null) {
+        offsets.put(key, offsetAndMetadata);
+      }
+    }
+    return offsets;
+  }
+
+  private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils 
client) {
+    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
+    List<String> partitions = asJavaListConverter(
+        
client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
+    for (String partition : partitions) {
+      TopicPartition key = new TopicPartition(topicStr, 
Integer.valueOf(partition));
+      Option<String> data = client.readDataMaybeNull(
+          topicDirs.consumerOffsetDir() + "/" + partition)._1();
+      if (data.isDefined()) {
+        Long offset = Long.valueOf(data.get());
+        offsets.put(key, new OffsetAndMetadata(offset));
+      }
+    }
+    return offsets;
+  }
+
   private void decommissionConsumerAndRecords(ConsumerAndRecords c) {
     c.consumer.close();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index ccf46d9..3ab807b 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -49,6 +49,9 @@ public class KafkaChannelConfiguration {
   public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
   public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
 
+  public static final String MIGRATE_ZOOKEEPER_OFFSETS = 
"migrateZookeeperOffsets";
+  public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
+
   /*** Old Configuration Parameters ****/
   public static final String BROKER_LIST_KEY = "metadata.broker.list";
   public static final String REQUIRED_ACKS_KEY = "request.required.acks";

http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index b63ac9b..e7ae68f 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -20,6 +20,7 @@ package org.apache.flume.channel.kafka;
 
 import com.google.common.collect.Lists;
 import kafka.admin.AdminUtils;
+import kafka.utils.ZKGroupTopicDirs;
 import kafka.utils.ZkUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.flume.Context;
@@ -30,8 +31,13 @@ import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.kafka.util.TestUtil;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.security.JaasUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -55,13 +61,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
 
 public class TestKafkaChannel {
 
@@ -77,16 +77,9 @@ public class TestKafkaChannel {
 
   @Before
   public void setup() throws Exception {
-    boolean topicFound = false;
-    while (!topicFound) {
-      topic = RandomStringUtils.randomAlphabetic(8);
-      if (!usedTopics.contains(topic)) {
-        usedTopics.add(topic);
-        topicFound = true;
-      }
-    }
+    topic = findUnusedTopic();
     try {
-      createTopic(topic);
+      createTopic(topic, 5);
     } catch (Exception e) {
     }
     Thread.sleep(2500);
@@ -235,6 +228,106 @@ public class TestKafkaChannel {
     Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody()));
   }
 
+  @Test
+  public void testMigrateOffsetsNone() throws Exception {
+    doTestMigrateZookeeperOffsets(false, false, "testMigrateOffsets-none");
+  }
+
+  @Test
+  public void testMigrateOffsetsZookeeper() throws Exception {
+    doTestMigrateZookeeperOffsets(true, false, "testMigrateOffsets-zookeeper");
+  }
+
+  @Test
+  public void testMigrateOffsetsKafka() throws Exception {
+    doTestMigrateZookeeperOffsets(false, true, "testMigrateOffsets-kafka");
+  }
+
+  @Test
+  public void testMigrateOffsetsBoth() throws Exception {
+    doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
+  }
+
+  public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, 
boolean hasKafkaOffsets,
+                                            String group) throws Exception {
+    // create a topic with 1 partition for simplicity
+    topic = findUnusedTopic();
+    createTopic(topic, 1);
+
+    Context context = prepareDefaultContext(false);
+    context.put(ZOOKEEPER_CONNECT, testUtil.getZkUrl());
+    context.put(GROUP_ID_FLUME, group);
+    final KafkaChannel channel = createChannel(context);
+
+    // Produce some data and save an offset
+    Long fifthOffset = 0L;
+    Long tenthOffset = 0L;
+    Properties props = channel.getProducerProps();
+    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
+    for (int i = 1; i <= 50; i++) {
+      ProducerRecord<String, byte[]> data =
+          new ProducerRecord<>(topic, null, String.valueOf(i).getBytes());
+      RecordMetadata recordMetadata = producer.send(data).get();
+      if (i == 5) {
+        fifthOffset = recordMetadata.offset();
+      }
+      if (i == 10) {
+        tenthOffset = recordMetadata.offset();
+      }
+    }
+
+    // Commit 10th offset to zookeeper
+    if (hasZookeeperOffsets) {
+      ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), 30000, 30000,
+          JaasUtils.isZkSecurityEnabled());
+      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(group, topic);
+      // we commit the tenth offset to ensure some data is missed.
+      Long offset = tenthOffset + 1;
+      zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir() + "/0", 
offset.toString(),
+          zkUtils.updatePersistentPath$default$3());
+      zkUtils.close();
+    }
+
+    // Commit 5th offset to kafka
+    if (hasKafkaOffsets) {
+      Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+      offsets.put(new TopicPartition(topic, 0), new 
OffsetAndMetadata(fifthOffset + 1));
+      KafkaConsumer<String, byte[]> consumer = new 
KafkaConsumer<>(channel.getConsumerProps());
+      consumer.commitSync(offsets);
+      consumer.close();
+    }
+
+    // Start the channel and read some data
+    channel.start();
+    ExecutorCompletionService<Void> submitterSvc = new
+        ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc,
+        20, false, false);
+    wait(submitterSvc, 5);
+    List<Integer> finals = new ArrayList<Integer>(40);
+    for (Event event: events) {
+      finals.add(Integer.parseInt(new String(event.getBody())));
+    }
+    channel.stop();
+
+    if (!hasKafkaOffsets && !hasZookeeperOffsets) {
+      // The default behavior is to read the entire log
+      Assert.assertTrue("Channel should read the the first message", 
finals.contains(1));
+    } else if (hasKafkaOffsets && hasZookeeperOffsets) {
+      // Respect Kafka offsets if they exist
+      Assert.assertFalse("Channel should not read the 5th message", 
finals.contains(5));
+      Assert.assertTrue("Channel should read the 6th message", 
finals.contains(6));
+    } else if (hasKafkaOffsets) {
+      // Respect Kafka offsets if they exist (don't fail if zookeeper offsets 
are missing)
+      Assert.assertFalse("Channel should not read the 5th message", 
finals.contains(5));
+      Assert.assertTrue("Channel should read the 6th message", 
finals.contains(6));
+    } else {
+      // Otherwise migrate the ZooKeeper offsets if they exist
+      Assert.assertFalse("Channel should not read the 10th message", 
finals.contains(10));
+      Assert.assertTrue("Channel should read the 11th message", 
finals.contains(11));
+    }
+  }
+
   private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
     for (int i = 0; i < 5; i++) {
       Transaction txn = channel.getTransaction();
@@ -396,9 +489,14 @@ public class TestKafkaChannel {
 
   private KafkaChannel startChannel(boolean parseAsFlume) throws Exception {
     Context context = prepareDefaultContext(parseAsFlume);
+    KafkaChannel channel = createChannel(context);
+    channel.start();
+    return channel;
+  }
+
+  private KafkaChannel createChannel(Context context) throws Exception {
     final KafkaChannel channel = new KafkaChannel();
     Configurables.configure(channel, context);
-    channel.start();
     return channel;
   }
 
@@ -585,8 +683,20 @@ public class TestKafkaChannel {
     return context;
   }
 
-  public static void createTopic(String topicName) {
-    int numPartitions = 5;
+  public String findUnusedTopic() {
+    String newTopic = null;
+    boolean topicFound = false;
+    while (!topicFound) {
+      newTopic = RandomStringUtils.randomAlphabetic(8);
+      if (!usedTopics.contains(newTopic)) {
+        usedTopics.add(newTopic);
+        topicFound = true;
+      }
+    }
+    return newTopic;
+  }
+
+  public static void createTopic(String topicName, int numPartitions) {
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
     ZkUtils zkUtils =

http://git-wip-us.apache.org/repos/asf/flume/blob/1e8f2651/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 5e677c6..7e207aa 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2731,6 +2731,10 @@ parseAsFlumeEvent                        true            
            Expecting A
                                                                      This 
should be true if Flume source is writing to the channel and false if other 
producers are
                                                                      writing 
into the topic that the channel is using. Flume source messages to Kafka can be 
parsed outside of Flume by using
                                                                      
org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk 
artifact
+migrateZookeeperOffsets                  true                        When no 
Kafka stored offset is found, look up the offsets in Zookeeper and commit them 
to Kafka.
+                                                                     This 
should be true to support seamless Kafka client migration from older versions 
of Flume. Once migrated this can be set
+                                                                     to false, 
though that should generally not be required. If no Zookeeper offset is found 
the kafka.consumer.auto.offset.reset
+                                                                     
configuration defines how offsets are handled.
 pollTimeout                              500                         The 
amount of time(in milliseconds) to wait in the "poll()" call of the conumer.
                                                                      
https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
 kafka.consumer.auto.offset.reset         latest                      What to 
do when there is no initial offset in Kafka or if the current offset does not 
exist any more on the server

Reply via email to