Oh!

I should learn Scala. I never suspect such changes in minor version. Thanks
for help. I made patch based on your  and attach that here. Should I do
something more? Create ticket in Jira maybe?



On 25 October 2017 at 15:22, Ferenc Szabo <fsz...@cloudera.com> wrote:

> Hi Anton,
>
> with the kafka upgrade, there comes a scala-library update as well and for
> the 2.11 scala version there were backward compatibility breaking changes
>
> in this case  scala.collection.JavaConverters.asJavaListConverter has
> been removed
>
> I have attached part of my solution of this upgrade.
> You may not be able to apply the diff file itself, but the solution is in
> it.
>
> Let me know if you need more help with it.
>
>
>
>
>
>
> On Wed, Oct 25, 2017 at 10:28 AM, Антон Чевычалов <c...@arenadata.io>
> wrote:
>
>> Hi to everyone!
>>
>> I am trying to compile flume with latest kafka 0.11.0.1 and got the
>> following
>>
>> [ERROR] Failed to execute goal
>> org.apache.maven.plugins:maven-compiler-plugin:2.3.2:compile
>> (default-compile) on project flume-kafka-source: Compilation failure:
>> Compilation failure:
>> [ERROR]
>> /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume-ng-
>> sources/flume-kafka-source/src/main/java/org/apache/flume/
>> source/kafka/KafkaSource.java:[69,0]
>> error: cannot find symbol
>> [ERROR] class
>> [ERROR]
>> /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume-ng-
>> sources/flume-kafka-source/src/main/java/org/apache/flume/
>> source/kafka/KafkaSource.java:[468,37]
>> error: cannot find symbol
>> [ERROR] variable zkUtils of type ZkUtils
>> [ERROR]
>> /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume-ng-
>> sources/flume-kafka-source/src/main/java/org/apache/flume/
>> source/kafka/KafkaSource.java:[600,30]
>> error: cannot find symbol
>> [ERROR] -> [Help 1]
>>
>>
>> That is strange trouble because we have that class in imports. That looks
>> for me like environment trouble (Maven version maybe). Does someone
>> compile
>> flume with kafka 0.11.0.1?
>>
>>
>> ---
>> Anton B Chevychalov
>> CI/CD Engineer of ArenaData
>>
>
>


-- 
Anton B Chevychalov
CI/CD Engineer of ArenaData
From f809342685fcf1e1a2dc0fc227de84ccb26dad10 Mon Sep 17 00:00:00 2001
From: Anton Chevychalov <c...@arenadata.io>
Date: Wed, 25 Oct 2017 15:47:48 +0300
Subject: [PATCH] Fix kafka and Scala 2.11 trouble

---
 .../org/apache/flume/channel/kafka/KafkaChannel.java   |  4 ++--
 .../org/apache/flume/source/kafka/KafkaSource.java     | 18 ++++++++++++++----
 2 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 5bd9be0..46494fd 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -77,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
-import static scala.collection.JavaConverters.asJavaListConverter;
+import scala.collection.JavaConverters;
 
 public class KafkaChannel extends BasicChannelSemantics {
 
@@ -357,7 +357,7 @@ public class KafkaChannel extends BasicChannelSemantics {
   private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) {
     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
     ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
-    List<String> partitions = asJavaListConverter(
+    List<String> partitions = JavaConverters.seqAsJavaListConverter(
         client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
     for (String partition : partitions) {
       TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index ffdc96e..960e9e8 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -28,8 +28,10 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import kafka.cluster.Broker;
 import kafka.cluster.BrokerEndPoint;
 import kafka.utils.ZKGroupTopicDirs;
 import kafka.utils.ZkUtils;
@@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.JaasUtils;
 import org.slf4j.Logger;
@@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import scala.Option;
+import scala.collection.Seq;
 
 import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
-import static scala.collection.JavaConverters.asJavaListConverter;
+import scala.collection.JavaConverters;
 
 /**
  * A Source for Kafka which reads messages from kafka topics.
@@ -464,8 +468,14 @@ public class KafkaSource extends AbstractPollableSource
     ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
         JaasUtils.isZkSecurityEnabled());
     try {
-      List<BrokerEndPoint> endPoints =
-          asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava();
+      Seq<Broker> allBrokersInCluster = zkUtils.getAllBrokersInCluster();
+      List<Broker> brokerList = JavaConverters.seqAsJavaListConverter(
+          zkUtils.getAllBrokersInCluster()).asJava();
+      List<BrokerEndPoint> endPoints = brokerList.stream()
+              .map(broker -> broker.getBrokerEndPoint(
+                  ListenerName.forSecurityProtocol(securityProtocol))
+              )
+              .collect(Collectors.toList());
       List<String> connections = new ArrayList<>();
       for (BrokerEndPoint endPoint : endPoints) {
         connections.add(endPoint.connectionString());
@@ -597,7 +607,7 @@ public class KafkaSource extends AbstractPollableSource
                                                                      String topicStr) {
     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
     ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
-    List<String> partitions = asJavaListConverter(
+    List<String> partitions = JavaConverters.seqAsJavaListConverter(
         client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
     for (String partition : partitions) {
       TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
-- 
1.9.1

Reply via email to