This is an automated email from the ASF dual-hosted git repository.
szaboferee pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 606eabb FLUME-3314 Fixed NPE in Kafka source/channel during offset
migration (#276)
606eabb is described below
commit 606eabb09997fa939735b5600f526a99895c350c
Author: turcsanyip <[email protected]>
AuthorDate: Thu Feb 7 12:26:58 2019 +0100
FLUME-3314 Fixed NPE in Kafka source/channel during offset migration (#276)
* FLUME-3314 Fixed NPE in Kafka source/channel during offset migration
Kafka source/channel threw NPE when migrateOffsets() was called
on nonexistent topics.
It has been fixed by adding null check and logging a warning message
(topic not found, skipping offset migration).
After skipping the offset migration, the source/channel works the same way
as the "non offset migration" case:
- starts and prints warning messages about the non-existing topic
periodically
- can recover if the topic is created later
* FLUME-3314 Additional null checks.
Reviewers: Ferenc Szabo
(Peter Turcsanyi via Ferenc Szabo)
---
flume-ng-channels/flume-kafka-channel/pom.xml | 6 ++++-
.../apache/flume/channel/kafka/KafkaChannel.java | 22 ++++++++++++------
.../channel/kafka/TestOffsetsAndMigration.java | 17 ++++++++++++++
.../src/test/resources/kafka-server.properties | 3 ++-
.../src/test/resources/kafka-server.properties | 1 -
flume-ng-sources/flume-kafka-source/pom.xml | 5 +++++
.../org/apache/flume/source/kafka/KafkaSource.java | 22 ++++++++++++------
.../source/kafka/KafkaSourceEmbeddedKafka.java | 1 +
.../apache/flume/source/kafka/TestKafkaSource.java | 26 +++++++++++++++++++++-
9 files changed, 85 insertions(+), 18 deletions(-)
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml
b/flume-ng-channels/flume-kafka-channel/pom.xml
index 7819826..9bf1e27 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -71,7 +71,11 @@ limitations under the License.
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 40494d4..852b4bd 100644
---
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -318,6 +318,10 @@ public class KafkaChannel extends BasicChannelSemantics {
Time.SYSTEM, "kafka.server", "SessionExpireListener");
KafkaConsumer<String, byte[]> consumer = new
KafkaConsumer<>(consumerProps)) {
Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
getKafkaOffsets(consumer);
+ if (kafkaOffsets == null) {
+ logger.warn("Topic " + topicStr + " not found in Kafka. Offset
migration will be skipped.");
+ return;
+ }
if (!kafkaOffsets.isEmpty()) {
logger.info("Found Kafka offsets for topic {}. Will not migrate from
zookeeper", topicStr);
logger.debug("Offsets found: {}", kafkaOffsets);
@@ -338,7 +342,8 @@ public class KafkaChannel extends BasicChannelSemantics {
// Read the offsets to verify they were committed
Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets =
getKafkaOffsets(consumer);
logger.debug("Offsets committed: {}", newKafkaOffsets);
- if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
+ if (newKafkaOffsets == null
+ || !newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet()))
{
throw new FlumeException("Offsets could not be committed");
}
}
@@ -347,13 +352,16 @@ public class KafkaChannel extends BasicChannelSemantics {
private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
KafkaConsumer<String, byte[]> client) {
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ Map<TopicPartition, OffsetAndMetadata> offsets = null;
List<PartitionInfo> partitions = client.partitionsFor(topicStr);
- for (PartitionInfo partition : partitions) {
- TopicPartition key = new TopicPartition(topicStr, partition.partition());
- OffsetAndMetadata offsetAndMetadata = client.committed(key);
- if (offsetAndMetadata != null) {
- offsets.put(key, offsetAndMetadata);
+ if (partitions != null) {
+ offsets = new HashMap<>();
+ for (PartitionInfo partition : partitions) {
+ TopicPartition key = new TopicPartition(topicStr,
partition.partition());
+ OffsetAndMetadata offsetAndMetadata = client.committed(key);
+ if (offsetAndMetadata != null) {
+ offsets.put(key, offsetAndMetadata);
+ }
}
}
return offsets;
diff --git
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
index 2362c0d..7657aa6 100644
---
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
+++
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
@@ -22,6 +22,7 @@ import kafka.zk.KafkaZkClient;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
+import org.apache.flume.lifecycle.LifecycleState;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -190,4 +191,20 @@ public class TestOffsetsAndMigration extends
TestKafkaChannelBase {
Assert.assertTrue("Channel should read the 11th message",
finals.contains(11));
}
}
+
+ @Test
+ public void testMigrateZookeeperOffsetsWhenTopicNotExists() throws Exception
{
+ topic = findUnusedTopic();
+
+ Context context = prepareDefaultContext(false);
+ context.put(ZOOKEEPER_CONNECT_FLUME_KEY, testUtil.getZkUrl());
+ context.put(GROUP_ID_FLUME, "testMigrateOffsets-nonExistingTopic");
+ KafkaChannel channel = createChannel(context);
+
+ channel.start();
+
+ Assert.assertEquals(LifecycleState.START, channel.getLifecycleState());
+
+ channel.stop();
+ }
}
diff --git
a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
index a2071fe..55fa20d 100644
---
a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
+++
b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
@@ -1,5 +1,4 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
-# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
@@ -118,3 +117,5 @@ zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=1000000
offsets.topic.replication.factor=1
+
+auto.create.topics.enable=false
diff --git
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
index 2312247..b6e1207 100644
---
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
+++
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
@@ -1,5 +1,4 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
-# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
diff --git a/flume-ng-sources/flume-kafka-source/pom.xml
b/flume-ng-sources/flume-kafka-source/pom.xml
index 0ec9d72..affc999 100644
--- a/flume-ng-sources/flume-kafka-source/pom.xml
+++ b/flume-ng-sources/flume-kafka-source/pom.xml
@@ -76,5 +76,10 @@
<classifier>test</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 20f7c7d..b02285d 100644
---
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -567,6 +567,10 @@ public class KafkaSource extends AbstractPollableSource
KafkaConsumer<String, byte[]> consumer = new
KafkaConsumer<>(kafkaProps)) {
Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
getKafkaOffsets(consumer, topicStr);
+ if (kafkaOffsets == null) {
+ log.warn("Topic " + topicStr + " not found in Kafka. Offset migration
will be skipped.");
+ return;
+ }
if (!kafkaOffsets.isEmpty()) {
log.info("Found Kafka offsets for topic " + topicStr +
". Will not migrate from zookeeper");
@@ -589,7 +593,8 @@ public class KafkaSource extends AbstractPollableSource
Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets =
getKafkaOffsets(consumer, topicStr);
log.debug("Offsets committed: {}", newKafkaOffsets);
- if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
+ if (newKafkaOffsets == null
+ || !newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet()))
{
throw new FlumeException("Offsets could not be committed");
}
}
@@ -597,13 +602,16 @@ public class KafkaSource extends AbstractPollableSource
private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
KafkaConsumer<String, byte[]> client, String topicStr) {
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ Map<TopicPartition, OffsetAndMetadata> offsets = null;
List<PartitionInfo> partitions = client.partitionsFor(topicStr);
- for (PartitionInfo partition : partitions) {
- TopicPartition key = new TopicPartition(topicStr, partition.partition());
- OffsetAndMetadata offsetAndMetadata = client.committed(key);
- if (offsetAndMetadata != null) {
- offsets.put(key, offsetAndMetadata);
+ if (partitions != null) {
+ offsets = new HashMap<>();
+ for (PartitionInfo partition : partitions) {
+ TopicPartition key = new TopicPartition(topicStr,
partition.partition());
+ OffsetAndMetadata offsetAndMetadata = client.committed(key);
+ if (offsetAndMetadata != null) {
+ offsets.put(key, offsetAndMetadata);
+ }
}
}
return offsets;
diff --git
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index f4fe57d..56a582a 100644
---
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -76,6 +76,7 @@ public class KafkaSourceEmbeddedKafka {
props.put("port", String.valueOf(serverPort));
props.put("log.dir", dir.getAbsolutePath());
props.put("offsets.topic.replication.factor", "1");
+ props.put("auto.create.topics.enable", "false");
if (properties != null) {
props.putAll(properties);
}
diff --git
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index a82c972..d866c98 100644
---
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -34,6 +34,7 @@ import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource.Status;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -340,6 +341,8 @@ public class TestKafkaSource {
startKafkaSource();
Thread.sleep(500L);
+ assertEquals(LifecycleState.START, kafkaSource.getLifecycleState());
+
Status status = kafkaSource.process();
assertEquals(Status.BACKOFF, status);
}
@@ -845,7 +848,7 @@ public class TestKafkaSource {
kafkaSource.stop();
}
- public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets,
boolean hasKafkaOffsets,
+ private void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets,
boolean hasKafkaOffsets,
String group) throws Exception {
// create a topic with 1 partition for simplicity
String topic = findUnusedTopic();
@@ -928,6 +931,27 @@ public class TestKafkaSource {
}
}
+ @Test
+ public void testMigrateZookeeperOffsetsWhenTopicNotExists() throws Exception
{
+ String topic = findUnusedTopic();
+
+ Context context =
prepareDefaultContext("testMigrateOffsets-nonExistingTopic");
+ context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
+ context.put(TOPIC, topic);
+ KafkaSource source = new KafkaSource();
+ source.doConfigure(context);
+
+ source.setChannelProcessor(createGoodChannel());
+ source.start();
+
+ assertEquals(LifecycleState.START, source.getLifecycleState());
+
+ Status status = source.process();
+ assertEquals(Status.BACKOFF, status);
+
+ source.stop();
+ }
+
ChannelProcessor createGoodChannel() {
ChannelProcessor channelProcessor = mock(ChannelProcessor.class);