Hi Anton, with the kafka upgrade, there comes a scala-library update as well and for the 2.11 scala version there were backward compatibility breaking changes
in this case scala.collection.JavaConverters.asJavaListConverter has been removed I have attached part of my solution of this upgrade. You may not be able to apply the diff file itself, but the solution is in it. Let me know if you need more help with it. On Wed, Oct 25, 2017 at 10:28 AM, Антон Чевычалов <c...@arenadata.io> wrote: > Hi to everyone! > > I am trying to compile flume with latest kafka 0.11.0.1 and got the > following > > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:2.3.2:compile > (default-compile) on project flume-kafka-source: Compilation failure: > Compilation failure: > [ERROR] > /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume- > ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/ > KafkaSource.java:[69,0] > error: cannot find symbol > [ERROR] class > [ERROR] > /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume- > ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/ > KafkaSource.java:[468,37] > error: cannot find symbol > [ERROR] variable zkUtils of type ZkUtils > [ERROR] > /code/build/flume/rpm/BUILD/apache-flume-1.8.0-src/flume- > ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/ > KafkaSource.java:[600,30] > error: cannot find symbol > [ERROR] -> [Help 1] > > > That is strange trouble because we have that class in imports. That looks > for me like environment trouble (Maven version maybe). Does someone compile > flume with kafka 0.11.0.1? > > > --- > Anton B Chevychalov > CI/CD Engineer of ArenaData >
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -28,8 +28,10 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import kafka.cluster.Broker; import kafka.cluster.BrokerEndPoint; import kafka.utils.ZKGroupTopicDirs; import kafka.utils.ZkUtils; @@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.security.JaasUtils; import org.slf4j.Logger; @@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import scala.Option; +import scala.collection.Seq; import static org.apache.flume.source.kafka.KafkaSourceConstants.*; -import static scala.collection.JavaConverters.asJavaListConverter; +import scala.collection.JavaConverters; /** * A Source for Kafka which reads messages from kafka topics. @@ -455,8 +459,13 @@ public class KafkaSource extends AbstractPollableSource ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, JaasUtils.isZkSecurityEnabled()); try { - List<BrokerEndPoint> endPoints = - asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava(); + Seq<Broker> allBrokersInCluster = zkUtils.getAllBrokersInCluster(); + List<Broker> brokerList = JavaConverters.seqAsJavaListConverter(zkUtils.getAllBrokersInCluster()).asJava(); + List<BrokerEndPoint> endPoints = brokerList.stream() + .map(broker -> broker.getBrokerEndPoint( + ListenerName.forSecurityProtocol(securityProtocol)) + ) + .collect(Collectors.toList()); List<String> connections = new ArrayList<>(); for (BrokerEndPoint endPoint : endPoints) { connections.add(endPoint.connectionString()); @@ -588,7 +597,7 @@ public class KafkaSource extends AbstractPollableSource String topicStr) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); - List<String> partitions = asJavaListConverter( + List<String> partitions = JavaConverters.seqAsJavaListConverter( client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); for (String partition : partitions) { TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); diff --git a/flume-ng-tests/pom.xml b/flume-ng-tests/pom.xml