Oh! I should learn Scala. I never suspect such changes in minor version. Thanks for help. I made patch based on your and attach that here. Should I do something more? Create ticket in Jira maybe?
On 25 October 2017 at 15:22, Ferenc Szabo <fsz...@cloudera.com> wrote: > Hi Anton, > > with the kafka upgrade, there comes a scala-library update as well and for > the 2.11 scala version there were backward compatibility breaking changes > > in this case scala.collection.JavaConverters.asJavaListConverter has > been removed > > I have attached part of my solution of this upgrade. > You may not be able to apply the diff file itself, but the solution is in > it. > > Let me know if you need more help with it. > > > > > > > On Wed, Oct 25, 2017 at 10:28 AM, Антон Чевычалов <c...@arenadata.io> > wrote: > >> Hi to everyone! >> >> I am trying to compile flume with latest kafka 0.11.0.1 and got the >> following >> >> [ERROR] Failed to execute goal >> org.apache.maven.plugins:maven-compiler-plugin:2.3.2:compile >> (default-compile) on project flume-kafka-source: Compilation failure: >> Compilation failure: >> [ERROR] >> /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume-ng- >> sources/flume-kafka-source/src/main/java/org/apache/flume/ >> source/kafka/KafkaSource.java:[69,0] >> error: cannot find symbol >> [ERROR] class >> [ERROR] >> /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume-ng- >> sources/flume-kafka-source/src/main/java/org/apache/flume/ >> source/kafka/KafkaSource.java:[468,37] >> error: cannot find symbol >> [ERROR] variable zkUtils of type ZkUtils >> [ERROR] >> /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume-ng- >> sources/flume-kafka-source/src/main/java/org/apache/flume/ >> source/kafka/KafkaSource.java:[600,30] >> error: cannot find symbol >> [ERROR] -> [Help 1] >> >> >> That is strange trouble because we have that class in imports. That looks >> for me like environment trouble (Maven version maybe). Does someone >> compile >> flume with kafka 0.11.0.1? >> >> >> --- >> Anton B Chevychalov >> CI/CD Engineer of ArenaData >> > > -- Anton B Chevychalov CI/CD Engineer of ArenaData
From f809342685fcf1e1a2dc0fc227de84ccb26dad10 Mon Sep 17 00:00:00 2001 From: Anton Chevychalov <c...@arenadata.io> Date: Wed, 25 Oct 2017 15:47:48 +0300 Subject: [PATCH] Fix kafka and Scala 2.11 trouble --- .../org/apache/flume/channel/kafka/KafkaChannel.java | 4 ++-- .../org/apache/flume/source/kafka/KafkaSource.java | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 5bd9be0..46494fd 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -77,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; -import static scala.collection.JavaConverters.asJavaListConverter; +import scala.collection.JavaConverters; public class KafkaChannel extends BasicChannelSemantics { @@ -357,7 +357,7 @@ public class KafkaChannel extends BasicChannelSemantics { private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); - List<String> partitions = asJavaListConverter( + List<String> partitions = JavaConverters.seqAsJavaListConverter( client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); for (String partition : partitions) { TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index ffdc96e..960e9e8 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -28,8 +28,10 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import kafka.cluster.Broker; import kafka.cluster.BrokerEndPoint; import kafka.utils.ZKGroupTopicDirs; import kafka.utils.ZkUtils; @@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.security.JaasUtils; import org.slf4j.Logger; @@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import scala.Option; +import scala.collection.Seq; import static org.apache.flume.source.kafka.KafkaSourceConstants.*; -import static scala.collection.JavaConverters.asJavaListConverter; +import scala.collection.JavaConverters; /** * A Source for Kafka which reads messages from kafka topics. @@ -464,8 +468,14 @@ public class KafkaSource extends AbstractPollableSource ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, JaasUtils.isZkSecurityEnabled()); try { - List<BrokerEndPoint> endPoints = - asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava(); + Seq<Broker> allBrokersInCluster = zkUtils.getAllBrokersInCluster(); + List<Broker> brokerList = JavaConverters.seqAsJavaListConverter( + zkUtils.getAllBrokersInCluster()).asJava(); + List<BrokerEndPoint> endPoints = brokerList.stream() + .map(broker -> broker.getBrokerEndPoint( + ListenerName.forSecurityProtocol(securityProtocol)) + ) + .collect(Collectors.toList()); List<String> connections = new ArrayList<>(); for (BrokerEndPoint endPoint : endPoints) { connections.add(endPoint.connectionString()); @@ -597,7 +607,7 @@ public class KafkaSource extends AbstractPollableSource String topicStr) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); - List<String> partitions = asJavaListConverter( + List<String> partitions = JavaConverters.seqAsJavaListConverter( client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); for (String partition : partitions) { TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); -- 1.9.1