Hi Anton,

with the kafka upgrade, there comes a scala-library update as well and for
the 2.11 scala version there were backward compatibility breaking changes

in this case  scala.collection.JavaConverters.asJavaListConverter has been
removed

I have attached part of my solution of this upgrade.
You may not be able to apply the diff file itself, but the solution is in
it.

Let me know if you need more help with it.






On Wed, Oct 25, 2017 at 10:28 AM, Антон Чевычалов <c...@arenadata.io> wrote:

> Hi to everyone!
>
> I am trying to compile flume with latest kafka 0.11.0.1 and got the
> following
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:2.3.2:compile
> (default-compile) on project flume-kafka-source: Compilation failure:
> Compilation failure:
> [ERROR]
> /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume-
> ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/
> KafkaSource.java:[69,0]
> error: cannot find symbol
> [ERROR] class
> [ERROR]
> /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume-
> ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/
> KafkaSource.java:[468,37]
> error: cannot find symbol
> [ERROR] variable zkUtils of type ZkUtils
> [ERROR]
> /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume-
> ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/
> KafkaSource.java:[600,30]
> error: cannot find symbol
> [ERROR] -> [Help 1]
>
>
> That is strange trouble because we have that class in imports. That looks
> for me like environment trouble (Maven version maybe). Does someone compile
> flume with kafka 0.11.0.1?
>
>
> ---
> Anton B Chevychalov
> CI/CD Engineer of ArenaData
>
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -28,8 +28,10 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import kafka.cluster.Broker;
 import kafka.cluster.BrokerEndPoint;
 import kafka.utils.ZKGroupTopicDirs;
 import kafka.utils.ZkUtils;
@@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.JaasUtils;
 import org.slf4j.Logger;
@@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import scala.Option;
+import scala.collection.Seq;
 
 import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
-import static scala.collection.JavaConverters.asJavaListConverter;
+import scala.collection.JavaConverters;
 
 /**
  * A Source for Kafka which reads messages from kafka topics.
@@ -455,8 +459,13 @@ public class KafkaSource extends AbstractPollableSource
     ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, 
ZK_CONNECTION_TIMEOUT,
         JaasUtils.isZkSecurityEnabled());
     try {
-      List<BrokerEndPoint> endPoints =
-          
asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava();
+      Seq<Broker> allBrokersInCluster = zkUtils.getAllBrokersInCluster();
+      List<Broker> brokerList = 
JavaConverters.seqAsJavaListConverter(zkUtils.getAllBrokersInCluster()).asJava();
+      List<BrokerEndPoint> endPoints = brokerList.stream()
+              .map(broker -> broker.getBrokerEndPoint(
+                  ListenerName.forSecurityProtocol(securityProtocol))
+              )
+              .collect(Collectors.toList());
       List<String> connections = new ArrayList<>();
       for (BrokerEndPoint endPoint : endPoints) {
         connections.add(endPoint.connectionString());
@@ -588,7 +597,7 @@ public class KafkaSource extends AbstractPollableSource
                                                                      String 
topicStr) {
     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
     ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
-    List<String> partitions = asJavaListConverter(
+    List<String> partitions = JavaConverters.seqAsJavaListConverter(
         
client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
     for (String partition : partitions) {
       TopicPartition key = new TopicPartition(topicStr, 
Integer.valueOf(partition));
diff --git a/flume-ng-tests/pom.xml b/flume-ng-tests/pom.xml

Reply via email to