Repository: flink
Updated Branches:
  refs/heads/master e2d3e1f86 -> 14bcac7b9


[hotfix][Kafka] Clean up getKafkaServer method


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14bcac7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14bcac7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14bcac7b

Branch: refs/heads/master
Commit: 14bcac7b92d9df36deca707f437177a7ce370c13
Parents: cd373ef
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Aug 3 11:35:26 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 17 ++++-------------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 14 +++++---------
 2 files changed, 9 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14bcac7b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 5be802f..5a5caad 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -249,20 +249,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        LOG.info("Starting KafkaServer");
                        brokers = new 
ArrayList<>(config.getKafkaServersNumber());
 
+                       ListenerName listenerName = 
ListenerName.forSecurityProtocol(config.isSecureMode() ? 
SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
                        for (int i = 0; i < config.getKafkaServersNumber(); 
i++) {
-                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
-
-                               if (config.isSecureMode()) {
-                                       brokerConnectionString += 
hostAndPortToUrlString(
-                                                       
KafkaTestEnvironment.KAFKA_HOST,
-                                                       
brokers.get(i).socketServer().boundPort(
-                                                                       
ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)));
-                               } else {
-                                       brokerConnectionString += 
hostAndPortToUrlString(
-                                                       
KafkaTestEnvironment.KAFKA_HOST,
-                                                       
brokers.get(i).socketServer().boundPort(
-                                                                       
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)));
-                               }
+                               KafkaServer kafkaServer = getKafkaServer(i, 
tmpKafkaDirs.get(i));
+                               brokers.add(kafkaServer);
+                               brokerConnectionString += 
hostAndPortToUrlString(KAFKA_HOST, 
kafkaServer.socketServer().boundPort(listenerName));
                                brokerConnectionString +=  ",";
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14bcac7b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 676e588..26b41e6 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -30,7 +30,6 @@ import org.apache.flink.util.NetUtils;
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
 import kafka.common.KafkaException;
-import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
@@ -236,15 +235,12 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        LOG.info("Starting KafkaServer");
                        brokers = new 
ArrayList<>(config.getKafkaServersNumber());
 
+                       SecurityProtocol securityProtocol = 
config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : 
SecurityProtocol.PLAINTEXT;
                        for (int i = 0; i < config.getKafkaServersNumber(); 
i++) {
-                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
-
-                               SocketServer socketServer = 
brokers.get(i).socketServer();
-                               if (this.config.isSecureMode()) {
-                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
-                               } else {
-                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
-                               }
+                               KafkaServer kafkaServer = getKafkaServer(i, 
tmpKafkaDirs.get(i));
+                               brokers.add(kafkaServer);
+                               brokerConnectionString += 
hostAndPortToUrlString(KAFKA_HOST, 
kafkaServer.socketServer().boundPort(securityProtocol));
+                               brokerConnectionString +=  ",";
                        }
 
                        LOG.info("ZK and KafkaServer started.");

Reply via email to