Repository: flume Updated Branches: refs/heads/trunk 94b5ce497 -> 5ec8bb6ad
FLUME-3281 Update to Kafka 2.0 This has been tested with unit tests. The main difference that caused the most problems is the consumer.poll(Duration) change. This does not block even when it fetches meta data whereas the previous poll(long timeout) blocked indefinitely for meta data fetching. This has resulted in many test timing issues. I tried to do minimal changes at the tests, just enough to make them pass. Kafka 2.0 requires a higher version for slf4j, I had to update it to 1.7.25. Option migrateZookeeperOffsets is deprecated in this PR. This will allow us to get rid of Kafka server libraries in Flume. Compatibility testing. Modified the TestUtil to be able to use external servers. This way I could test against a variety of Kafka Server versions using the normal unit tests. Channel tests using 2.0.1 client: Kafka_2.11_0.9.0.0 Not compatible Kafka_2.11_0.10.0.0 Not compatible Kafka_2.11_0.10.1.0 passed with TestPartition timeouts (rerunning the single test passes so it is a tes isolation issue) Kafka_2.11_0.10.2.0 passed with TestPartition timeouts (rerunning the single test passes so it is a tes isolation issue) Kafka_2.11-0.11.0.3 - timeouts in TestPartitions when creating topics Kafka_2.11-1.0.2 - passed Kafka_2.11-1.1.1 - passed Kafka_2.11-2.0.1 - passed This closes #235 Reviewers: Tristan Stevens, Ferenc Szabo, Peter Turcsanyi (Endre Major via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5ec8bb6a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5ec8bb6a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5ec8bb6a Branch: refs/heads/trunk Commit: 5ec8bb6ad1f3d91355adcb3d44d63058d37dee23 Parents: 94b5ce4 Author: Endre Major <[email protected]> Authored: Tue Nov 20 17:16:51 2018 +0100 Committer: Ferenc Szabo <[email protected]> Committed: Tue Nov 20 17:16:51 2018 +0100 ---------------------------------------------------------------------- LICENSE | 2 +- flume-ng-channels/flume-kafka-channel/pom.xml | 2 +- .../flume/channel/kafka/KafkaChannel.java | 45 +++--- .../channel/kafka/TestKafkaChannelBase.java | 19 +-- .../channel/kafka/TestOffsetsAndMigration.java | 19 ++- .../src/test/resources/kafka-server.properties | 2 + flume-ng-doc/sphinx/FlumeUserGuide.rst | 44 +++--- flume-ng-sinks/flume-ng-kafka-sink/pom.xml | 2 +- .../apache/flume/sink/kafka/TestKafkaSink.java | 130 ++++++++--------- .../flume/sink/kafka/util/KafkaConsumer.java | 98 ------------- .../apache/flume/sink/kafka/util/TestUtil.java | 137 ++++++++++++++---- .../src/test/resources/kafka-server.properties | 2 + .../src/test/resources/testutil.properties | 3 + flume-ng-sources/flume-kafka-source/pom.xml | 4 +- .../apache/flume/source/kafka/KafkaSource.java | 91 ++++++------ .../source/kafka/KafkaSourceEmbeddedKafka.java | 46 ++++-- .../flume/source/kafka/TestKafkaSource.java | 144 ++++++++++++++----- flume-shared/flume-shared-kafka-test/pom.xml | 2 +- .../apache/flume/shared/kafka/KafkaSSLUtil.java | 2 +- .../flume/shared/kafka/KafkaSSLUtilTest.java | 2 +- pom.xml | 8 +- 21 files changed, 432 insertions(+), 372 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/LICENSE ---------------------------------------------------------------------- diff --git a/LICENSE b/LICENSE index 4331d40..40cf470 100644 --- a/LICENSE +++ b/LICENSE @@ -240,7 +240,7 @@ For jetty-<version>.jar: jetty-util-<version>.jar: joda-time-<version>.jar: - kafka_2.10-<version>.jar: + kafka_2.11-<version>.jar: kafka-clients-<version>.jar: kite-data-core-<version>.jar: kite-data-hbase-<version>.jar: http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-channels/flume-kafka-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml index be5bf74..b9fb0d1 100644 --- a/flume-ng-channels/flume-kafka-channel/pom.xml +++ b/flume-ng-channels/flume-kafka-channel/pom.xml @@ -54,7 +54,7 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index d2ea7ae..694cf3f 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -20,8 +20,7 @@ package org.apache.flume.channel.kafka; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import kafka.utils.ZKGroupTopicDirs; -import kafka.utils.ZkUtils; +import kafka.zk.KafkaZkClient; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; @@ -55,6 +54,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; @@ -63,6 +63,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -78,7 +79,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; -import static scala.collection.JavaConverters.asJavaListConverter; public class KafkaChannel extends BasicChannelSemantics { @@ -102,6 +102,7 @@ public class KafkaChannel extends BasicChannelSemantics { private String groupId = DEFAULT_GROUP_ID; private String partitionHeader = null; private Integer staticPartitionId; + @Deprecated private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS; // used to indicate if a rebalance has occurred during the current transaction @@ -312,10 +313,10 @@ public class KafkaChannel extends BasicChannelSemantics { } private void migrateOffsets() { - ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, - JaasUtils.isZkSecurityEnabled()); - KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); - try { + try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect, + JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10, + Time.SYSTEM, "kafka.server", "SessionExpireListener"); + KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) { Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer); if (!kafkaOffsets.isEmpty()) { logger.info("Found Kafka offsets for topic {}. Will not migrate from zookeeper", topicStr); @@ -324,7 +325,8 @@ public class KafkaChannel extends BasicChannelSemantics { } logger.info("No Kafka offsets found. Migrating zookeeper offsets"); - Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = getZookeeperOffsets(zkUtils); + Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = + getZookeeperOffsets(zkClient, consumer); if (zookeeperOffsets.isEmpty()) { logger.warn("No offsets to migrate found in Zookeeper"); return; @@ -339,12 +341,10 @@ public class KafkaChannel extends BasicChannelSemantics { if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) { throw new FlumeException("Offsets could not be committed"); } - } finally { - zkUtils.close(); - consumer.close(); } } + private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets( KafkaConsumer<String, byte[]> client) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); @@ -359,18 +359,17 @@ public class KafkaChannel extends BasicChannelSemantics { return offsets; } - private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) { + private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets( + KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); - ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); - List<String> partitions = asJavaListConverter( - client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); - for (String partition : partitions) { - TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); - Option<String> data = client.readDataMaybeNull( - topicDirs.consumerOffsetDir() + "/" + partition)._1(); - if (data.isDefined()) { - Long offset = Long.valueOf(data.get()); - offsets.put(key, new OffsetAndMetadata(offset)); + List<PartitionInfo> partitions = consumer.partitionsFor(topicStr); + for (PartitionInfo partition : partitions) { + TopicPartition topicPartition = new TopicPartition(topicStr, partition.partition()); + Option<Object> optionOffset = zkClient.getConsumerOffset(groupId, topicPartition); + if (optionOffset.nonEmpty()) { + Long offset = (Long) optionOffset.get(); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset); + offsets.put(topicPartition, offsetAndMetadata); } } return offsets; @@ -690,7 +689,7 @@ public class KafkaChannel extends BasicChannelSemantics { private void poll() { logger.trace("Polling with timeout: {}ms channel-{}", pollTimeout, getName()); try { - records = consumer.poll(pollTimeout); + records = consumer.poll(Duration.ofMillis(pollTimeout)); recordIterator = records.iterator(); logger.debug("{} returned {} records from last poll", getName(), records.count()); } catch (WakeupException e) { http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java index 9f139d6..e1279a3 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java @@ -19,8 +19,6 @@ package org.apache.flume.channel.kafka; import com.google.common.collect.Lists; -import kafka.admin.AdminUtils; -import kafka.utils.ZkUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.flume.Context; import org.apache.flume.Event; @@ -39,7 +37,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; @@ -48,6 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG; @@ -91,21 +89,11 @@ public class TestKafkaChannelBase { } static void createTopic(String topicName, int numPartitions) { - int sessionTimeoutMs = 10000; - int connectionTimeoutMs = 10000; - ZkUtils zkUtils = - ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); - int replicationFactor = 1; - Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); + testUtil.createTopics(Collections.singletonList(topicName), numPartitions); } static void deleteTopic(String topicName) { - int sessionTimeoutMs = 10000; - int connectionTimeoutMs = 10000; - ZkUtils zkUtils = - ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); - AdminUtils.deleteTopic(zkUtils, topicName); + testUtil.deleteTopic(topicName); } KafkaChannel startChannel(boolean parseAsFlume) throws Exception { @@ -121,6 +109,7 @@ public class TestKafkaChannelBase { context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl()); context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume)); context.put(TOPIC_CONFIG, topic); + context.put(KAFKA_CONSUMER_PREFIX + "max.poll.interval.ms", "10000"); return context; } http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java index 47c583a..2362c0d 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java @@ -18,8 +18,7 @@ */ package org.apache.flume.channel.kafka; -import kafka.utils.ZKGroupTopicDirs; -import kafka.utils.ZkUtils; +import kafka.zk.KafkaZkClient; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.Transaction; @@ -30,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.utils.Time; import org.junit.Assert; import org.junit.Test; @@ -97,7 +97,7 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase { } private Event takeEventWithoutCommittingTxn(KafkaChannel channel) { - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 10; i++) { Transaction txn = channel.getTransaction(); txn.begin(); @@ -142,14 +142,13 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase { // Commit 10th offset to zookeeper if (hasZookeeperOffsets) { - ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), 30000, 30000, - JaasUtils.isZkSecurityEnabled()); - ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(group, topic); - // we commit the tenth offset to ensure some data is missed. + KafkaZkClient zkClient = KafkaZkClient.apply(testUtil.getZkUrl(), + JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM, + "kafka.server", "SessionExpireListener"); + zkClient.getConsumerOffset(group, new TopicPartition(topic, 0)); Long offset = tenthOffset + 1; - zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir() + "/0", offset.toString(), - zkUtils.updatePersistentPath$default$3()); - zkUtils.close(); + zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset); + zkClient.close(); } // Commit 5th offset to kafka http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties index 216bfd8..a2071fe 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties +++ b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties @@ -116,3 +116,5 @@ zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 + +offsets.topic.replication.factor=1 http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index a983089..c6d947a 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1448,7 +1448,7 @@ Kafka Source Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group -so each will read a unique set of partitions for the topics. +so each will read a unique set of partitions for the topics. This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release. ================================== =========== =================================================== Property Name Default Description @@ -1481,12 +1481,6 @@ topicHeader topic Defines the name of the header from, if the ``setTopicHeader`` property is set to ``true``. Care should be taken if combining with the Kafka Sink ``topicHeader`` property so as to avoid sending the message back to the same topic in a loop. -migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. - This should be true to support seamless Kafka client migration from older versions of Flume. - Once migrated this can be set to false, though that should generally not be required. - If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset - defines how offsets are handled. - Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>`_ for details kafka.consumer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. *more consumer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for additional properties that need to be set on consumer. @@ -1504,14 +1498,21 @@ Other Kafka Consumer Properties -- These properties are used to co Deprecated Properties -=============================== =================== ============================================================================================= +=============================== =================== ================================================================================================ Property Name Default Description -=============================== =================== ============================================================================================= +=============================== =================== ================================================================================================ topic -- Use kafka.topics groupId flume Use kafka.consumer.group.id zookeeperConnect -- Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers to establish connection with kafka cluster -=============================== =================== ============================================================================================= +migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. + This should be true to support seamless Kafka client migration from older versions of Flume. + Once migrated this can be set to false, though that should generally not be required. + If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset + defines how offsets are handled. + Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>`_ + for details +=============================== =================== ================================================================================================ Example for topic subscription by comma-separated topic list. @@ -3131,9 +3132,9 @@ Kafka Sink This is a Flume Sink implementation that can publish data to a `Kafka <http://kafka.apache.org/>`_ topic. One of the objective is to integrate Flume with Kafka so that pull based processing systems can process the data coming -through various Flume sources. This currently supports Kafka 0.9.x series of releases. +through various Flume sources. -This version of Flume no longer supports Older Versions (0.8.x) of Kafka. +This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release. Required properties are marked in bold font. @@ -3538,9 +3539,7 @@ The Kafka channel can be used for multiple scenarios: #. With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps #. With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr - -This version of Flume requires Kafka version 0.9 or greater due to the reliance on the Kafka clients shipped with that version. The configuration of -the channel has changed compared to previous flume versions. +This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release. The configuration parameters are organized as such: @@ -3570,10 +3569,6 @@ parseAsFlumeEvent true Expecting A This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact -migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. - This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set - to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset - configuration defines how offsets are handled. pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the consumer. https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless @@ -3598,17 +3593,20 @@ kafka.consumer.security.protocol PLAINTEXT Same as kaf Deprecated Properties -================================ ========================== =============================================================================================================== +================================ ========================== ============================================================================================================================ Property Name Default Description -================================ ========================== =============================================================================================================== +================================ ========================== ============================================================================================================================ brokerList -- List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port topic flume-channel Use kafka.topic groupId flume Use kafka.consumer.group.id readSmallestOffset false Use kafka.consumer.auto.offset.reset - -================================ ========================== =============================================================================================================== +migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. + This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set + to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset + configuration defines how offsets are handled. +================================ ========================== ============================================================================================================================ .. note:: Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml index f9f10ae..eb65500 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml +++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml @@ -84,7 +84,7 @@ <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 92151cb..5a94c82 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -19,11 +19,6 @@ package org.apache.flume.sink.kafka; import com.google.common.base.Charsets; - -import kafka.admin.AdminUtils; -import kafka.message.MessageAndMetadata; -import kafka.utils.ZkUtils; - import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -45,6 +40,8 @@ import org.apache.flume.shared.kafka.test.PartitionTestScenario; import org.apache.flume.sink.kafka.util.TestUtil; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.AfterClass; import org.junit.Assert; @@ -54,8 +51,8 @@ import org.mockito.internal.util.reflection.Whitebox; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -75,7 +72,8 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE; import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY; import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -83,7 +81,7 @@ import static org.junit.Assert.fail; */ public class TestKafkaSink { - private static TestUtil testUtil = TestUtil.getInstance(); + private static final TestUtil testUtil = TestUtil.getInstance(); private final Set<String> usedTopics = new HashSet<String>(); @BeforeClass @@ -178,9 +176,15 @@ public class TestKafkaSink { // ignore } - String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC) - .message()); - assertEquals(msg, fetchedMsg); + checkMessageArrived(msg, DEFAULT_TOPIC); + } + + private void checkMessageArrived(String msg, String topic) { + ConsumerRecords recs = pollConsumerRecords(topic); + assertNotNull(recs); + assertTrue(recs.count() > 0); + ConsumerRecord consumerRecord = (ConsumerRecord) recs.iterator().next(); + assertEquals(msg, consumerRecord.value()); } @Test @@ -199,13 +203,11 @@ public class TestKafkaSink { // ignore } - String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer( - TestConstants.STATIC_TOPIC).message()); - assertEquals(msg, fetchedMsg); + checkMessageArrived(msg, TestConstants.STATIC_TOPIC); } @Test - public void testTopicAndKeyFromHeader() throws UnsupportedEncodingException { + public void testTopicAndKeyFromHeader() { Sink kafkaSink = new KafkaSink(); Context context = prepareDefaultContext(); Configurables.configure(kafkaSink, context); @@ -234,21 +236,15 @@ public class TestKafkaSink { // ignore } - MessageAndMetadata fetchedMsg = - testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC); - - assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8")); - assertEquals(TestConstants.CUSTOM_KEY, - new String((byte[]) fetchedMsg.key(), "UTF-8")); + checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC); } /** * Tests that a message will be produced to a topic as specified by a * custom topicHeader parameter (FLUME-3046). - * @throws UnsupportedEncodingException */ @Test - public void testTopicFromConfHeader() throws UnsupportedEncodingException { + public void testTopicFromConfHeader() { String customTopicHeader = "customTopicHeader"; Sink kafkaSink = new KafkaSink(); Context context = prepareDefaultContext(); @@ -278,19 +274,15 @@ public class TestKafkaSink { // ignore } - MessageAndMetadata<?, ?> fetchedMsg = - testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC); - - assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8")); + checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC); } /** * Tests that the topicHeader parameter will be ignored if the allowTopicHeader * parameter is set to false (FLUME-3046). - * @throws UnsupportedEncodingException */ @Test - public void testTopicNotFromConfHeader() throws UnsupportedEncodingException { + public void testTopicNotFromConfHeader() { Sink kafkaSink = new KafkaSink(); Context context = prepareDefaultContext(); context.put(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, "false"); @@ -322,14 +314,12 @@ public class TestKafkaSink { // ignore } - MessageAndMetadata<?, ?> fetchedMsg = - testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC); - - assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8")); + checkMessageArrived(msg, DEFAULT_TOPIC); } @Test - public void testReplaceSubStringOfTopicWithHeaders() throws UnsupportedEncodingException { + public void testReplaceSubStringOfTopicWithHeaders() { + String topic = TestConstants.HEADER_1_VALUE + "-topic"; Sink kafkaSink = new KafkaSink(); Context context = prepareDefaultContext(); context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC); @@ -358,10 +348,7 @@ public class TestKafkaSink { // ignore } - String fetchedMsg = new String((byte[]) - testUtil.getNextMessageFromConsumer(TestConstants.HEADER_1_VALUE + "-topic").message()); - - assertEquals(msg, fetchedMsg); + checkMessageArrived(msg, topic); } @SuppressWarnings("rawtypes") @@ -398,12 +385,15 @@ public class TestKafkaSink { // ignore } - MessageAndMetadata fetchedMsg = testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC); + String topic = TestConstants.CUSTOM_TOPIC; - ByteArrayInputStream in = new ByteArrayInputStream((byte[]) fetchedMsg.message()); + ConsumerRecords<String, String> recs = pollConsumerRecords(topic); + assertNotNull(recs); + assertTrue(recs.count() > 0); + ConsumerRecord<String, String> consumerRecord = recs.iterator().next(); + ByteArrayInputStream in = new ByteArrayInputStream(consumerRecord.value().getBytes()); BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null); - SpecificDatumReader<AvroFlumeEvent> reader = - new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class); + SpecificDatumReader<AvroFlumeEvent> reader = new SpecificDatumReader<>(AvroFlumeEvent.class); AvroFlumeEvent avroevent = reader.read(null, decoder); @@ -411,15 +401,33 @@ public class TestKafkaSink { Map<CharSequence, CharSequence> eventHeaders = avroevent.getHeaders(); assertEquals(msg, eventBody); - assertEquals(TestConstants.CUSTOM_KEY, new String((byte[]) fetchedMsg.key(), "UTF-8")); + assertEquals(TestConstants.CUSTOM_KEY, consumerRecord.key()); assertEquals(TestConstants.HEADER_1_VALUE, eventHeaders.get(new Utf8(TestConstants.HEADER_1_KEY)).toString()); assertEquals(TestConstants.CUSTOM_KEY, eventHeaders.get(new Utf8("key")).toString()); } + private ConsumerRecords<String, String> pollConsumerRecords(String topic) { + return pollConsumerRecords(topic, 20); + } + + private ConsumerRecords<String, String> pollConsumerRecords(String topic, int maxIter) { + ConsumerRecords<String, String> recs = null; + for (int i = 0; i < maxIter; i++) { + recs = testUtil.getNextMessageFromConsumer(topic); + if (recs.count() > 0) break; + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + // + } + } + return recs; + } + @Test - public void testEmptyChannel() throws UnsupportedEncodingException, EventDeliveryException { + public void testEmptyChannel() throws EventDeliveryException { Sink kafkaSink = new KafkaSink(); Context context = prepareDefaultContext(); Configurables.configure(kafkaSink, context); @@ -432,7 +440,9 @@ public class TestKafkaSink { if (status != Sink.Status.BACKOFF) { fail("Error Occurred"); } - assertNull(testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC)); + ConsumerRecords recs = pollConsumerRecords(DEFAULT_TOPIC, 2); + assertNotNull(recs); + assertEquals(recs.count(), 0); } @Test @@ -481,10 +491,9 @@ public class TestKafkaSink { /** * Tests that sub-properties (kafka.producer.*) apply correctly across multiple invocations * of configure() (fix for FLUME-2857). - * @throws Exception */ @Test - public void testDefaultSettingsOnReConfigure() throws Exception { + public void testDefaultSettingsOnReConfigure() { String sampleProducerProp = "compression.type"; String sampleProducerVal = "snappy"; @@ -515,10 +524,8 @@ public class TestKafkaSink { * Expected behaviour: Exception is not thrown because the code avoids an NPE. * * 3. PartitionOption.NOTANUMBER: The partition header is set, but is not an Integer. - * Expected behaviour: ChannelExeption thrown. + * Expected behaviour: ChannelException thrown. * - * @param option - * @throws Exception */ private void doPartitionErrors(PartitionOption option) throws Exception { doPartitionErrors(option, new KafkaSink()); @@ -577,9 +584,6 @@ public class TestKafkaSink { * a large skew to some partitions and then verify that this actually happened * by reading messages directly using a Kafka Consumer. * - * @param usePartitionHeader - * @param staticPtn - * @throws Exception */ private void doPartitionHeader(PartitionTestScenario scenario) throws Exception { final int numPtns = 5; @@ -680,25 +684,15 @@ public class TestKafkaSink { return kafkaSink.process(); } - public static void createTopic(String topicName, int numPartitions) { - int sessionTimeoutMs = 10000; - int connectionTimeoutMs = 10000; - ZkUtils zkUtils = - ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); - int replicationFactor = 1; - Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); + private void createTopic(String topicName, int numPartitions) { + testUtil.createTopics(Collections.singletonList(topicName), numPartitions); } - public static void deleteTopic(String topicName) { - int sessionTimeoutMs = 10000; - int connectionTimeoutMs = 10000; - ZkUtils zkUtils = - ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); - AdminUtils.deleteTopic(zkUtils, topicName); + private void deleteTopic(String topicName) { + testUtil.deleteTopic(topicName); } - public String findUnusedTopic() { + private String findUnusedTopic() { String newTopic = null; boolean topicFound = false; while (!topicFound) { http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java deleted file mode 100644 index d5dfbd6..0000000 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - limitations under the License. - */ - -package org.apache.flume.sink.kafka.util; - -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.ConsumerTimeoutException; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * A Kafka Consumer implementation. This uses the current thread to fetch the - * next message from the queue and doesn't use a multi threaded implementation. - * So this implements a synchronous blocking call. - * To avoid infinite waiting, a timeout is implemented to wait only for - * 10 seconds before concluding that the message will not be available. - */ -public class KafkaConsumer { - - private static final Logger logger = LoggerFactory.getLogger( - KafkaConsumer.class); - - private final ConsumerConnector consumer; - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap; - - public KafkaConsumer() { - consumer = kafka.consumer.Consumer.createJavaConsumerConnector( - createConsumerConfig(TestUtil.getInstance().getZkUrl(), "group_1")); - } - - private static ConsumerConfig createConsumerConfig(String zkUrl, - String groupId) { - Properties props = new Properties(); - props.put("zookeeper.connect", zkUrl); - props.put("group.id", groupId); - props.put("zookeeper.session.timeout.ms", "1000"); - props.put("zookeeper.sync.time.ms", "200"); - props.put("auto.commit.interval.ms", "1000"); - props.put("auto.offset.reset", "smallest"); - props.put("consumer.timeout.ms","1000"); - return new ConsumerConfig(props); - } - - public void initTopicList(List<String> topics) { - Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); - for (String topic : topics) { - // we need only single threaded consumers - topicCountMap.put(topic, new Integer(1)); - } - consumerMap = consumer.createMessageStreams(topicCountMap); - } - - public MessageAndMetadata getNextMessage(String topic) { - List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); - // it has only a single stream, because there is only one consumer - KafkaStream stream = streams.get(0); - final ConsumerIterator<byte[], byte[]> it = stream.iterator(); - int counter = 0; - try { - if (it.hasNext()) { - return it.next(); - } else { - return null; - } - } catch (ConsumerTimeoutException e) { - logger.error("0 messages available to fetch for the topic " + topic); - return null; - } - } - - public void shutdown() { - consumer.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java index 00780fd..cdf9bad 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java @@ -18,16 +18,26 @@ package org.apache.flume.sink.kafka.util; -import kafka.message.MessageAndMetadata; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; import java.net.InetAddress; import java.net.ServerSocket; -import java.net.UnknownHostException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; /** * A utility class for starting/stopping Kafka Server. @@ -35,13 +45,17 @@ import java.util.Properties; public class TestUtil { private static final Logger logger = LoggerFactory.getLogger(TestUtil.class); - private static TestUtil instance = new TestUtil(); + private static final TestUtil instance = new TestUtil(); private KafkaLocal kafkaServer; - private KafkaConsumer kafkaConsumer; - private String hostname = "localhost"; + private boolean externalServers = true; + private String kafkaServerUrl; + private String zkServerUrl; private int kafkaLocalPort; + private Properties clientProps; private int zkLocalPort; + private KafkaConsumer<String, String> consumer; + private AdminClient adminClient; private TestUtil() { init(); @@ -52,16 +66,31 @@ public class TestUtil { } private void init() { - // get the localhost. try { - hostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - logger.warn("Error getting the value of localhost. " + - "Proceeding with 'localhost'.", e); + Properties settings = new Properties(); + InputStream in = Class.class.getResourceAsStream("/testutil.properties"); + if (in != null) { + settings.load(in); + } + externalServers = "true".equalsIgnoreCase(settings.getProperty("external-servers")); + if (externalServers) { + kafkaServerUrl = settings.getProperty("kafka-server-url"); + zkServerUrl = settings.getProperty("zk-server-url"); + } else { + String hostname = InetAddress.getLocalHost().getHostName(); + zkLocalPort = getNextPort(); + kafkaLocalPort = getNextPort(); + kafkaServerUrl = hostname + ":" + kafkaLocalPort; + zkServerUrl = hostname + ":" + zkLocalPort; + } + clientProps = createClientProperties(); + } catch (Exception e) { + logger.error("Unexpected error", e); + throw new RuntimeException("Unexpected error", e); } } - private boolean startKafkaServer() { + private boolean startEmbeddedKafkaServer() { Properties kafkaProperties = new Properties(); Properties zkProperties = new Properties(); @@ -72,7 +101,6 @@ public class TestUtil { "/zookeeper.properties")); //start local Zookeeper - zkLocalPort = getNextPort(); // override the Zookeeper client port with the generated one. zkProperties.setProperty("clientPort", Integer.toString(zkLocalPort)); new ZooKeeperLocal(zkProperties); @@ -84,12 +112,12 @@ public class TestUtil { "/kafka-server.properties")); // override the Zookeeper url. kafkaProperties.setProperty("zookeeper.connect", getZkUrl()); - kafkaLocalPort = getNextPort(); // override the Kafka server port kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort)); kafkaServer = new KafkaLocal(kafkaProperties); kafkaServer.start(); logger.info("Kafka Server is successfully started on port " + kafkaLocalPort); + return true; } catch (Exception e) { @@ -98,25 +126,69 @@ public class TestUtil { } } - private KafkaConsumer getKafkaConsumer() { - synchronized (this) { - if (kafkaConsumer == null) { - kafkaConsumer = new KafkaConsumer(); - } + private AdminClient getAdminClient() { + if (adminClient == null) { + Properties adminClientProps = createAdminClientProperties(); + adminClient = AdminClient.create(adminClientProps); } - return kafkaConsumer; + return adminClient; + } + + private Properties createClientProperties() { + final Properties props = createAdminClientProperties(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put("auto.commit.interval.ms", "1000"); + props.put("auto.offset.reset", "earliest"); + props.put("consumer.timeout.ms","10000"); + props.put("max.poll.interval.ms","10000"); + + // Create the consumer using props. + return props; + } + + private Properties createAdminClientProperties() { + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaServerUrl()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1"); + return props; } public void initTopicList(List<String> topics) { - getKafkaConsumer().initTopicList(topics); + consumer = new KafkaConsumer<>(clientProps); + consumer.subscribe(topics); + } + + public void createTopics(List<String> topicNames, int numPartitions) { + List<NewTopic> newTopics = new ArrayList<>(); + for (String topicName: topicNames) { + NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1); + newTopics.add(newTopic); + } + getAdminClient().createTopics(newTopics); + + //the following lines are a bit of black magic to ensure the topic is ready when we return + DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames); + try { + dtr.all().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException("Error getting topic info", e); + } + } + public void deleteTopic(String topicName) { + getAdminClient().deleteTopics(Collections.singletonList(topicName)); } - public MessageAndMetadata getNextMessageFromConsumer(String topic) { - return getKafkaConsumer().getNextMessage(topic); + public ConsumerRecords<String, String> getNextMessageFromConsumer(String topic) { + return consumer.poll(Duration.ofMillis(1000L)); } public void prepare() { - boolean startStatus = startKafkaServer(); + + if (externalServers) { + return; + } + boolean startStatus = startEmbeddedKafkaServer(); if (!startStatus) { throw new RuntimeException("Error starting the server!"); } @@ -126,21 +198,28 @@ public class TestUtil { } catch (InterruptedException e) { // ignore } - getKafkaConsumer(); logger.info("Completed the prepare phase."); } public void tearDown() { logger.info("Shutting down the Kafka Consumer."); - getKafkaConsumer().shutdown(); + if (consumer != null) { + consumer.close(); + } + if (adminClient != null) { + adminClient.close(); + adminClient = null; + } try { Thread.sleep(3 * 1000); // add this sleep time to // ensure that the server is fully started before proceeding with tests. } catch (InterruptedException e) { // ignore } - logger.info("Shutting down the kafka Server."); - kafkaServer.stop(); + if (kafkaServer != null) { + logger.info("Shutting down the kafka Server."); + kafkaServer.stop(); + } logger.info("Completed the tearDown phase."); } @@ -151,10 +230,10 @@ public class TestUtil { } public String getZkUrl() { - return hostname + ":" + zkLocalPort; + return zkServerUrl; } public String getKafkaServerUrl() { - return hostname + ":" + kafkaLocalPort; + return kafkaServerUrl; } } http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties index 02a81e2..2312247 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties @@ -116,3 +116,5 @@ zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 + +offsets.topic.replication.factor=1 http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/testutil.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/testutil.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/testutil.properties new file mode 100644 index 0000000..97bbf1b --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/testutil.properties @@ -0,0 +1,3 @@ +external-servers=false +kafka-server-url=localhost:9092 +zk-server-url=localhost:2181 http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sources/flume-kafka-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml index 9b5d697..8e6812e 100644 --- a/flume-ng-sources/flume-kafka-source/pom.xml +++ b/flume-ng-sources/flume-kafka-source/pom.xml @@ -56,7 +56,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> @@ -72,7 +72,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> <classifier>test</classifier> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 10b2cfb..ddffa87 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -17,6 +17,7 @@ package org.apache.flume.source.kafka; import java.io.ByteArrayInputStream; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -28,11 +29,12 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import kafka.cluster.Broker; import kafka.cluster.BrokerEndPoint; -import kafka.utils.ZKGroupTopicDirs; -import kafka.utils.ZkUtils; +import kafka.zk.KafkaZkClient; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -59,16 +61,19 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; -import scala.Option; import static org.apache.flume.source.kafka.KafkaSourceConstants.*; -import static scala.collection.JavaConverters.asJavaListConverter; + +import scala.Option; +import scala.collection.JavaConverters; /** * A Source for Kafka which reads messages from kafka topics. @@ -128,6 +133,7 @@ public class KafkaSource extends AbstractPollableSource private String zookeeperConnect; private String bootstrapServers; private String groupId = DEFAULT_GROUP_ID; + @Deprecated private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS; private String topicHeader = null; private boolean setTopicHeader; @@ -189,7 +195,6 @@ public class KafkaSource extends AbstractPollableSource @Override protected Status doProcess() throws EventDeliveryException { final String batchUUID = UUID.randomUUID().toString(); - byte[] kafkaMessage; String kafkaKey; Event event; byte[] eventBody; @@ -206,22 +211,20 @@ public class KafkaSource extends AbstractPollableSource if (it == null || !it.hasNext()) { // Obtaining new records // Poll time is remainder time for current batch. - ConsumerRecords<String, byte[]> records = consumer.poll( - Math.max(0, maxBatchEndTime - System.currentTimeMillis())); + long durMs = Math.max(0L, maxBatchEndTime - System.currentTimeMillis()); + Duration duration = Duration.ofMillis(durMs); + ConsumerRecords<String, byte[]> records = consumer.poll(duration); it = records.iterator(); // this flag is set to true in a callback when some partitions are revoked. // If there are any records we commit them. - if (rebalanceFlag.get()) { - rebalanceFlag.set(false); + if (rebalanceFlag.compareAndSet(true, false)) { break; } // check records after poll if (!it.hasNext()) { - if (log.isDebugEnabled()) { - counter.incrementKafkaEmptyCount(); - log.debug("Returning with backoff. No more data to read"); - } + counter.incrementKafkaEmptyCount(); + log.debug("Returning with backoff. No more data to read"); // batch time exceeded break; } @@ -230,7 +233,6 @@ public class KafkaSource extends AbstractPollableSource // get next message ConsumerRecord<String, byte[]> message = it.next(); kafkaKey = message.key(); - kafkaMessage = message.value(); if (useAvroEventFormat) { //Assume the event is in Avro format using the AvroFlumeEvent schema @@ -471,18 +473,21 @@ public class KafkaSource extends AbstractPollableSource * Allows for backwards compatibility of the zookeeperConnect configuration. */ private String lookupBootstrap(String zookeeperConnect, SecurityProtocol securityProtocol) { - ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, - JaasUtils.isZkSecurityEnabled()); - try { - List<BrokerEndPoint> endPoints = - asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava(); + try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect, + JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10, + Time.SYSTEM, "kafka.server", "SessionExpireListener")) { + List<Broker> brokerList = + JavaConverters.seqAsJavaListConverter(zkClient.getAllBrokersInCluster()).asJava(); + List<BrokerEndPoint> endPoints = brokerList.stream() + .map(broker -> broker.brokerEndPoint( + ListenerName.forSecurityProtocol(securityProtocol)) + ) + .collect(Collectors.toList()); List<String> connections = new ArrayList<>(); for (BrokerEndPoint endPoint : endPoints) { connections.add(endPoint.connectionString()); } return StringUtils.join(connections, ','); - } finally { - zkUtils.close(); } } @@ -535,8 +540,6 @@ public class KafkaSource extends AbstractPollableSource // Subscribe for topics by already specified strategy subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag)); - // Connect to kafka. 1 second is optimal time. - it = consumer.poll(1000).iterator(); log.info("Kafka source {} started.", getName()); counter.start(); } @@ -547,15 +550,17 @@ public class KafkaSource extends AbstractPollableSource consumer.wakeup(); consumer.close(); } - counter.stop(); + if (counter != null) { + counter.stop(); + } log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter); } private void migrateOffsets(String topicStr) { - ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, - JaasUtils.isZkSecurityEnabled()); - KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps); - try { + try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect, + JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10, + Time.SYSTEM, "kafka.server", "SessionExpireListener"); + KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps)) { Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer, topicStr); if (!kafkaOffsets.isEmpty()) { @@ -567,7 +572,7 @@ public class KafkaSource extends AbstractPollableSource log.info("No Kafka offsets found. Migrating zookeeper offsets"); Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = - getZookeeperOffsets(zkUtils, topicStr); + getZookeeperOffsets(zkClient, consumer, topicStr); if (zookeeperOffsets.isEmpty()) { log.warn("No offsets to migrate found in Zookeeper"); return; @@ -583,9 +588,6 @@ public class KafkaSource extends AbstractPollableSource if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) { throw new FlumeException("Offsets could not be committed"); } - } finally { - zkUtils.close(); - consumer.close(); } } @@ -603,19 +605,18 @@ public class KafkaSource extends AbstractPollableSource return offsets; } - private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client, - String topicStr) { + private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets( + KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer, String topicStr) { + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); - ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); - List<String> partitions = asJavaListConverter( - client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); - for (String partition : partitions) { - TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); - Option<String> data = client.readDataMaybeNull( - topicDirs.consumerOffsetDir() + "/" + partition)._1(); - if (data.isDefined()) { - Long offset = Long.valueOf(data.get()); - offsets.put(key, new OffsetAndMetadata(offset)); + List<PartitionInfo> partitions = consumer.partitionsFor(topicStr); + for (PartitionInfo partition : partitions) { + TopicPartition topicPartition = new TopicPartition(topicStr, partition.partition()); + Option<Object> optionOffset = zkClient.getConsumerOffset(groupId, topicPartition); + if (optionOffset.nonEmpty()) { + Long offset = (Long) optionOffset.get(); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset); + offsets.put(topicPartition, offsetAndMetadata); } } return offsets; http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java index 1186f6d..f4fe57d 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -16,12 +16,13 @@ */ package org.apache.flume.source.kafka; -import kafka.admin.AdminUtils; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; import org.apache.commons.io.FileUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -31,9 +32,12 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; public class KafkaSourceEmbeddedKafka { @@ -41,6 +45,7 @@ public class KafkaSourceEmbeddedKafka { KafkaServerStartable kafkaServer; KafkaSourceEmbeddedZookeeper zookeeper; + private AdminClient adminClient; private static int findFreePort() { try (ServerSocket socket = new ServerSocket(0)) { @@ -70,6 +75,7 @@ public class KafkaSourceEmbeddedKafka { props.put("host.name", "localhost"); props.put("port", String.valueOf(serverPort)); props.put("log.dir", dir.getAbsolutePath()); + props.put("offsets.topic.replication.factor", "1"); if (properties != null) { props.putAll(properties); } @@ -132,15 +138,31 @@ public class KafkaSourceEmbeddedKafka { } public void createTopic(String topicName, int numPartitions) { - // Create a ZooKeeper client - int sessionTimeoutMs = 10000; - int connectionTimeoutMs = 10000; - ZkClient zkClient = - ZkUtils.createZkClient(HOST + ":" + zkPort, sessionTimeoutMs, connectionTimeoutMs); - ZkUtils zkUtils = ZkUtils.apply(zkClient, false); - int replicationFactor = 1; - Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); + AdminClient adminClient = getAdminClient(); + NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1); + adminClient.createTopics(Collections.singletonList(newTopic)); + + //the following lines are a bit of black magic to ensure the topic is ready when we return + DescribeTopicsResult dtr = adminClient.describeTopics(Collections.singletonList(topicName)); + try { + dtr.all().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException("Error getting topic info", e); + } + } + + private AdminClient getAdminClient() { + if (adminClient == null) { + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST + ":" + serverPort); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1"); + adminClient = AdminClient.create(props); + } + return adminClient; + } + + public void deleteTopics(List<String> topic) { + getAdminClient().deleteTopics(topic); } } http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index bb20e35..a82c972 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -20,9 +20,8 @@ package org.apache.flume.source.kafka; import com.google.common.base.Charsets; import com.google.common.collect.Lists; import junit.framework.Assert; -import kafka.common.TopicExistsException; -import kafka.utils.ZKGroupTopicDirs; -import kafka.utils.ZkUtils; + +import kafka.zk.KafkaZkClient; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; @@ -37,6 +36,8 @@ import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; @@ -45,8 +46,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Time; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; @@ -58,13 +63,13 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.regex.Pattern; import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT; @@ -83,6 +88,7 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_H import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -92,22 +98,30 @@ public class TestKafkaSource { private static final Logger log = LoggerFactory.getLogger(TestKafkaSource.class); private KafkaSource kafkaSource; - private KafkaSourceEmbeddedKafka kafkaServer; + private static KafkaSourceEmbeddedKafka kafkaServer; private Context context; private List<Event> events; - private final Set<String> usedTopics = new HashSet<String>(); - private String topic0 = "test1"; - private String topic1 = "topic1"; + private final List<String> usedTopics = new ArrayList<>(); + private String topic0; + private String topic1; + + + @BeforeClass + public static void startKafkaServer() { + kafkaServer = new KafkaSourceEmbeddedKafka(null); + startupCheck(); + } @SuppressWarnings("unchecked") @Before public void setup() throws Exception { kafkaSource = new KafkaSource(); - kafkaServer = new KafkaSourceEmbeddedKafka(null); try { + topic0 = findUnusedTopic(); kafkaServer.createTopic(topic0, 1); usedTopics.add(topic0); + topic1 = findUnusedTopic(); kafkaServer.createTopic(topic1, 3); usedTopics.add(topic1); } catch (TopicExistsException e) { @@ -118,6 +132,35 @@ public class TestKafkaSource { kafkaSource.setChannelProcessor(createGoodChannel()); } + private static void startupCheck() { + String startupTopic = "startupCheck"; + KafkaConsumer<String, String> startupConsumer; + kafkaServer.createTopic(startupTopic, 1); + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(startupTopic)); + log.info("Checking Startup"); + boolean success = false; + for (int i = 0; i < 20; i++) { + kafkaServer.produce(startupTopic, "", "record"); + ConsumerRecords recs = consumer.poll(Duration.ofMillis(1000L)); + if (!recs.isEmpty()) { + success = true; + break; + } + } + if (!success) { + fail("Kafka server startup failed"); + } + log.info("Kafka server startup success"); + consumer.close(); + kafkaServer.deleteTopics(Collections.singletonList(startupTopic)); + } + private Context prepareDefaultContext(String groupId) { Context context = new Context(); context.put(BOOTSTRAP_SERVERS, kafkaServer.getBootstrapServers()); @@ -127,10 +170,32 @@ public class TestKafkaSource { @After public void tearDown() throws Exception { - kafkaSource.stop(); + try { + kafkaSource.stop(); + } catch (Exception e) { + log.warn("Error stopping kafkaSource", e); + } + topic0 = null; + topic1 = null; + kafkaServer.deleteTopics(usedTopics); + usedTopics.clear(); + } + + @AfterClass + public static void stopKafkaServer() throws Exception { kafkaServer.stop(); } + private void startKafkaSource() throws EventDeliveryException, InterruptedException { + kafkaSource.start(); + /* Timing magic: We call the process method, that executes a consumer.poll() + A thread.sleep(10000L) does not work even though it takes longer */ + for (int i = 0; i < 3; i++) { + kafkaSource.process(); + Thread.sleep(1000); + } + } + @SuppressWarnings("unchecked") @Test public void testOffsets() throws InterruptedException, EventDeliveryException { @@ -140,7 +205,7 @@ public class TestKafkaSource { String.valueOf(batchDuration)); context.put(BATCH_SIZE, "3"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); Status status = kafkaSource.process(); assertEquals(Status.BACKOFF, status); @@ -190,7 +255,7 @@ public class TestKafkaSource { kafkaSource = new KafkaSource(); kafkaSource.setChannelProcessor(createGoodChannel()); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); kafkaServer.produce(topic1, "", "record14"); Thread.sleep(1000L); assertEquals(Status.READY, kafkaSource.process()); @@ -209,7 +274,7 @@ public class TestKafkaSource { context.put(TOPICS, topic0); context.put(BATCH_SIZE, "1"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); @@ -232,7 +297,7 @@ public class TestKafkaSource { context.put(TOPICS, topic0); context.put(BATCH_SIZE,"2"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); @@ -258,7 +323,7 @@ public class TestKafkaSource { IllegalAccessException, InterruptedException { context.put(TOPICS, topic0); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); Status status = kafkaSource.process(); @@ -272,7 +337,7 @@ public class TestKafkaSource { IllegalAccessException, InterruptedException { context.put(TOPICS,"faketopic"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); Status status = kafkaSource.process(); @@ -287,7 +352,7 @@ public class TestKafkaSource { context.put(TOPICS, topic0); context.put(BOOTSTRAP_SERVERS,"blabla:666"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); Status status = kafkaSource.process(); @@ -299,7 +364,8 @@ public class TestKafkaSource { context.put(TOPICS, topic0); context.put(BATCH_DURATION_MS, "250"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); + kafkaSource.process(); // timing magic Thread.sleep(500L); @@ -323,7 +389,7 @@ public class TestKafkaSource { context.put(TOPICS, topic0); context.put(BATCH_SIZE, "1"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); @@ -334,7 +400,7 @@ public class TestKafkaSource { Assert.assertEquals(Status.READY, kafkaSource.process()); kafkaSource.stop(); Thread.sleep(500L); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); } @@ -346,7 +412,7 @@ public class TestKafkaSource { context.put(BATCH_SIZE,"1"); context.put(BATCH_DURATION_MS,"30000"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); kafkaServer.produce(topic0, "", "hello, world"); @@ -370,7 +436,7 @@ public class TestKafkaSource { context.put(BATCH_SIZE,"1"); context.put(BATCH_DURATION_MS, "30000"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); kafkaServer.produce(topic0, "", "event 1"); @@ -393,7 +459,7 @@ public class TestKafkaSource { context.put(BATCH_DURATION_MS,"30000"); context.put(KAFKA_CONSUMER_PREFIX + "enable.auto.commit", "true"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); kafkaServer.produce(topic0, "", "event 1"); @@ -417,7 +483,7 @@ public class TestKafkaSource { context.put(TOPICS, topic0); context.put(BATCH_SIZE, "1"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); @@ -443,7 +509,7 @@ public class TestKafkaSource { .when(cp).processEventBatch(any(List.class)); kafkaSource.setChannelProcessor(cp); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); @@ -567,7 +633,7 @@ public class TestKafkaSource { context.put(BATCH_SIZE, "1"); context.put(AVRO_EVENT, "true"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); @@ -699,7 +765,7 @@ public class TestKafkaSource { public void testTopicHeaderSet() throws InterruptedException, EventDeliveryException { context.put(TOPICS, topic0); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); @@ -730,7 +796,7 @@ public class TestKafkaSource { context.put(KafkaSourceConstants.TOPIC_HEADER, "customTopicHeader"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); @@ -761,7 +827,7 @@ public class TestKafkaSource { context.put(KafkaSourceConstants.SET_TOPIC_HEADER, "false"); kafkaSource.configure(context); - kafkaSource.start(); + startKafkaSource(); Thread.sleep(500L); @@ -810,14 +876,13 @@ public class TestKafkaSource { // Commit 10th offset to zookeeper if (hasZookeeperOffsets) { - ZkUtils zkUtils = ZkUtils.apply(kafkaServer.getZkConnectString(), 30000, 30000, - JaasUtils.isZkSecurityEnabled()); - ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(group, topic); - // we commit the tenth offset to ensure some data is missed. + KafkaZkClient zkClient = KafkaZkClient.apply(kafkaServer.getZkConnectString(), + JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM, + "kafka.server", "SessionExpireListener"); + zkClient.getConsumerOffset(group, new TopicPartition(topic, 0)); Long offset = tenthOffset + 1; - zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir() + "/0", offset.toString(), - zkUtils.updatePersistentPath$default$3()); - zkUtils.close(); + zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset); + zkClient.close(); } // Commit 5th offset to kafka @@ -832,6 +897,11 @@ public class TestKafkaSource { // Start the source and read some data source.setChannelProcessor(createGoodChannel()); source.start(); + for (int i = 0; i < 3; i++) { + source.process(); + Thread.sleep(1000); + } + Thread.sleep(500L); source.process(); List<Integer> finals = new ArrayList<Integer>(40); http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-shared/flume-shared-kafka-test/pom.xml ---------------------------------------------------------------------- diff --git a/flume-shared/flume-shared-kafka-test/pom.xml b/flume-shared/flume-shared-kafka-test/pom.xml index 49751c2..9227315 100644 --- a/flume-shared/flume-shared-kafka-test/pom.xml +++ b/flume-shared/flume-shared-kafka-test/pom.xml @@ -72,7 +72,7 @@ <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> <scope>provided</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java ---------------------------------------------------------------------- diff --git a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java index b4adcd3..78e6f63 100644 --- a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java +++ b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java @@ -21,7 +21,7 @@ package org.apache.flume.shared.kafka; import org.apache.flume.util.SSLUtil; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.security.auth.SecurityProtocol; import java.util.Properties; http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java ---------------------------------------------------------------------- diff --git a/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java b/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java index 6096bcf..1119152 100644 --- a/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java +++ b/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java @@ -20,7 +20,7 @@ package org.apache.flume.shared.kafka; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flume/blob/5ec8bb6a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 687c471..b6b35fb 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,7 @@ limitations under the License. <jetty.version>9.4.6.v20170531</jetty.version> <joda-time.version>2.9.9</joda-time.version> <junit.version>4.10</junit.version> - <kafka.version>0.9.0.1</kafka.version> + <kafka.version>2.0.1</kafka.version> <kite.version>1.0.0</kite.version> <hive.version>1.0.0</hive.version> <lifecycle-mapping.version>1.0.0</lifecycle-mapping.version> @@ -106,7 +106,7 @@ limitations under the License. <rat.version>0.12</rat.version> <snappy-java.version>1.1.4</snappy-java.version> <solr-global.version>4.3.0</solr-global.version> - <slf4j.version>1.6.1</slf4j.version> + <slf4j.version>1.7.25</slf4j.version> <system-rules.version>1.17.0</system-rules.version> <thrift.version>0.9.3</thrift.version> <twitter4j.version>3.0.3</twitter4j.version> @@ -1809,7 +1809,7 @@ limitations under the License. <!-- Dependencies of Kafka source --> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> @@ -1824,7 +1824,7 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> <classifier>test</classifier> <scope>test</scope>
