Repository: bigtop Updated Branches: refs/heads/branch-1.3 8488fb4c2 -> 5990156cd
Add missing scala patch for flume The commit is to add patch to fix flume/kafka build issue which is missed in BIGTOP-3082. Signed-off-by: Jun He <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/5990156c Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/5990156c Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/5990156c Branch: refs/heads/branch-1.3 Commit: 5990156cd4816c89029c103b1f958dbb6855c021 Parents: 8488fb4 Author: Jun He <[email protected]> Authored: Thu Sep 20 07:41:52 2018 +0800 Committer: Jun He <[email protected]> Committed: Wed Sep 19 23:59:30 2018 +0000 ---------------------------------------------------------------------- .../src/common/flume/patch2-scala-symbol.diff | 96 ++++++++++++++++++++ 1 file changed, 96 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bigtop/blob/5990156c/bigtop-packages/src/common/flume/patch2-scala-symbol.diff ---------------------------------------------------------------------- diff --git a/bigtop-packages/src/common/flume/patch2-scala-symbol.diff b/bigtop-packages/src/common/flume/patch2-scala-symbol.diff new file mode 100644 index 0000000..17833d4 --- /dev/null +++ b/bigtop-packages/src/common/flume/patch2-scala-symbol.diff @@ -0,0 +1,96 @@ +From f809342685fcf1e1a2dc0fc227de84ccb26dad10 Mon Sep 17 00:00:00 2001 +From: Anton Chevychalov <[email protected]> +Date: Wed, 25 Oct 2017 15:47:48 +0300 +Subject: [PATCH] Fix kafka and Scala 2.11 trouble + +--- + .../org/apache/flume/channel/kafka/KafkaChannel.java | 4 ++-- + .../org/apache/flume/source/kafka/KafkaSource.java | 18 ++++++++++++++---- + 2 files changed, 16 insertions(+), 6 deletions(-) + +diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +index 5bd9be0..46494fd 100644 +--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ++++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +@@ -77,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicReference; + + import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; +-import static scala.collection.JavaConverters.asJavaListConverter; ++import scala.collection.JavaConverters; + + public class KafkaChannel extends BasicChannelSemantics { + +@@ -357,7 +357,7 @@ public class KafkaChannel extends BasicChannelSemantics { + private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) { + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); +- List<String> partitions = asJavaListConverter( ++ List<String> partitions = JavaConverters.seqAsJavaListConverter( + client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); + for (String partition : partitions) { + TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); +diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +index ffdc96e..960e9e8 100644 +--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ++++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +@@ -28,8 +28,10 @@ import java.util.Properties; + import java.util.UUID; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.regex.Pattern; ++import java.util.stream.Collectors; + + import com.google.common.annotations.VisibleForTesting; ++import kafka.cluster.Broker; + import kafka.cluster.BrokerEndPoint; + import kafka.utils.ZKGroupTopicDirs; + import kafka.utils.ZkUtils; +@@ -57,6 +59,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; + import org.apache.kafka.clients.consumer.OffsetAndMetadata; + import org.apache.kafka.common.PartitionInfo; + import org.apache.kafka.common.TopicPartition; ++import org.apache.kafka.common.network.ListenerName; + import org.apache.kafka.common.protocol.SecurityProtocol; + import org.apache.kafka.common.security.JaasUtils; + import org.slf4j.Logger; +@@ -64,9 +67,10 @@ import org.slf4j.LoggerFactory; + + import com.google.common.base.Optional; + import scala.Option; ++import scala.collection.Seq; + + import static org.apache.flume.source.kafka.KafkaSourceConstants.*; +-import static scala.collection.JavaConverters.asJavaListConverter; ++import scala.collection.JavaConverters; + + /** + * A Source for Kafka which reads messages from kafka topics. +@@ -464,8 +468,14 @@ public class KafkaSource extends AbstractPollableSource + ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, + JaasUtils.isZkSecurityEnabled()); + try { +- List<BrokerEndPoint> endPoints = +- asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava(); ++ Seq<Broker> allBrokersInCluster = zkUtils.getAllBrokersInCluster(); ++ List<Broker> brokerList = JavaConverters.seqAsJavaListConverter( ++ zkUtils.getAllBrokersInCluster()).asJava(); ++ List<BrokerEndPoint> endPoints = brokerList.stream() ++ .map(broker -> broker.getBrokerEndPoint( ++ ListenerName.forSecurityProtocol(securityProtocol)) ++ ) ++ .collect(Collectors.toList()); + List<String> connections = new ArrayList<>(); + for (BrokerEndPoint endPoint : endPoints) { + connections.add(endPoint.connectionString()); +@@ -597,7 +607,7 @@ public class KafkaSource extends AbstractPollableSource + String topicStr) { + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr); +- List<String> partitions = asJavaListConverter( ++ List<String> partitions = JavaConverters.seqAsJavaListConverter( + client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava(); + for (String partition : partitions) { + TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition)); +-- +1.9.1 +
