Repository: kafka
Updated Branches:
  refs/heads/trunk 2adeb214b -> 3e5afbfa0


KAFKA-3078: Add ducktape tests for KafkaLog4jAppender producing to SASL enabled 
Kafka cluster

Note that KAFKA-3077 will be required to run these tests.

Author: Ashish Singh <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes #747 from SinghAsDev/KAFKA-3078


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e5afbfa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e5afbfa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e5afbfa

Branch: refs/heads/trunk
Commit: 3e5afbfa0dd4ddfca65fae1f3b2a268ae1ed2025
Parents: 2adeb21
Author: Ashish Singh <[email protected]>
Authored: Mon Jan 11 23:15:42 2016 -0800
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Mon Jan 11 23:15:42 2016 -0800

----------------------------------------------------------------------
 .../kafkatest/services/kafka_log4j_appender.py  | 12 +++++--
 tests/kafkatest/tests/log4j_appender_test.py    | 17 ++++++---
 .../kafka/tools/VerifiableLog4jAppender.java    | 36 ++++++++++++++++++--
 3 files changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3e5afbfa/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py 
b/tests/kafkatest/services/kafka_log4j_appender.py
index 0cc39c0..3732bb0 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -49,10 +49,18 @@ class KafkaLog4jAppender(BackgroundThreadService):
 
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
-        if self.security_protocol == SecurityConfig.SSL:
-            cmd += " --security-protocol SSL"
+        if self.security_protocol != SecurityConfig.PLAINTEXT:
+            cmd += " --security-protocol %s" % str(self.security_protocol)
+        if self.security_protocol == SecurityConfig.SSL or 
self.security_protocol == SecurityConfig.SASL_SSL:
             cmd += " --ssl-truststore-location %s" % 
str(SecurityConfig.TRUSTSTORE_PATH)
             cmd += " --ssl-truststore-password %s" % 
str(SecurityConfig.ssl_stores['ssl.truststore.password'])
+        if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \
+                self.security_protocol == SecurityConfig.SASL_SSL or \
+                self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI 
or \
+                self.security_protocol == SecurityConfig.SASL_MECHANISM_PLAIN:
+            cmd += " --sasl-kerberos-service-name %s" % str('kafka')
+            cmd += " --client-jaas-conf-path %s" % 
str(SecurityConfig.JAAS_CONF_PATH)
+            cmd += " --kerb5-conf-path %s" % str(SecurityConfig.KRB5CONF_PATH)
 
         cmd += " 2>> /mnt/kafka_log4j_appender.log | tee -a 
/mnt/kafka_log4j_appender.log &"
         return cmd

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e5afbfa/tests/kafkatest/tests/log4j_appender_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/log4j_appender_test.py 
b/tests/kafkatest/tests/log4j_appender_test.py
index db33d76..42cfeea 100644
--- a/tests/kafkatest/tests/log4j_appender_test.py
+++ b/tests/kafkatest/tests/log4j_appender_test.py
@@ -35,6 +35,7 @@ class Log4jAppenderTest(Test):
         super(Log4jAppenderTest, self).__init__(test_context)
         self.num_zk = 1
         self.num_brokers = 1
+        self.messages_received_count = 0
         self.topics = {
             TOPIC: {'partitions': 1, 'replication-factor': 1}
         }
@@ -56,13 +57,20 @@ class Log4jAppenderTest(Test):
                                            security_protocol=security_protocol)
         self.appender.start()
 
+    def custom_message_validator(self, msg):
+        if msg and "INFO : org.apache.kafka.tools.VerifiableLog4jAppender" in 
msg:
+            self.logger.debug("Received message: %s" % msg)
+            self.messages_received_count += 1
+
+
     def start_consumer(self, security_protocol):
-        enable_new_consumer = security_protocol == SecurityConfig.SSL
+        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
         self.consumer = ConsoleConsumer(self.test_context, 
num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
-                                        consumer_timeout_ms=1000, 
new_consumer=enable_new_consumer)
+                                        consumer_timeout_ms=1000, 
new_consumer=enable_new_consumer,
+                                        
message_validator=self.custom_message_validator)
         self.consumer.start()
 
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 
'SASL_SSL'])
     def test_log4j_appender(self, security_protocol='PLAINTEXT'):
         """
         Tests if KafkaLog4jAppender is producing to Kafka topic
@@ -79,8 +87,7 @@ class Log4jAppenderTest(Test):
             timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to 
start")
 
         # Verify consumed messages count
-        expected_lines_count = MAX_MESSAGES * 2  # two times to account for 
new lines introduced by log4j
-        wait_until(lambda: len(self.consumer.messages_consumed[1]) == 
expected_lines_count, timeout_sec=10,
+        wait_until(lambda: self.messages_received_count == MAX_MESSAGES, 
timeout_sec=10,
                    err_msg="Timed out waiting to consume expected number of 
messages.")
 
         self.consumer.stop()

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e5afbfa/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
index a48b301..ffbf7dc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
@@ -21,6 +21,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 
@@ -96,7 +97,7 @@ public class VerifiableLog4jAppender {
             .required(false)
             .setDefault("PLAINTEXT")
             .type(String.class)
-            .choices("PLAINTEXT", "SSL")
+            .choices("PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL")
             .metavar("SECURITY-PROTOCOL")
             .dest("securityProtocol")
             .help("Security protocol to be used while communicating with Kafka 
brokers.");
@@ -124,6 +125,30 @@ public class VerifiableLog4jAppender {
             .metavar("CONFIG_FILE")
             .help("Log4jAppender config properties file.");
 
+        parser.addArgument("--sasl-kerberos-service-name")
+            .action(store())
+            .required(false)
+            .type(String.class)
+            .metavar("SASL-KERBEROS-SERVICE-NAME")
+            .dest("saslKerberosServiceName")
+            .help("Name of sasl kerberos service.");
+
+        parser.addArgument("--client-jaas-conf-path")
+            .action(store())
+            .required(false)
+            .type(String.class)
+            .metavar("CLIENT-JAAS-CONF-PATH")
+            .dest("clientJaasConfPath")
+            .help("Path of JAAS config file of Kafka client.");
+
+        parser.addArgument("--kerb5-conf-path")
+            .action(store())
+            .required(false)
+            .type(String.class)
+            .metavar("KERB5-CONF-PATH")
+            .dest("kerb5ConfPath")
+            .help("Path of Kerb5 config file.");
+
         return parser;
     }
 
@@ -171,11 +196,18 @@ public class VerifiableLog4jAppender {
             props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", 
res.getString("acks"));
             props.setProperty("log4j.appender.KAFKA.SyncSend", "true");
             final String securityProtocol = res.getString("securityProtocol");
-            if (securityProtocol != null && securityProtocol.equals("SSL")) {
+            if (securityProtocol != null && 
!securityProtocol.equals(SecurityProtocol.PLAINTEXT.toString())) {
                 props.setProperty("log4j.appender.KAFKA.SecurityProtocol", 
securityProtocol);
+            }
+            if (securityProtocol != null && securityProtocol.contains("SSL")) {
                 
props.setProperty("log4j.appender.KAFKA.SslTruststoreLocation", 
res.getString("sslTruststoreLocation"));
                 
props.setProperty("log4j.appender.KAFKA.SslTruststorePassword", 
res.getString("sslTruststorePassword"));
             }
+            if (securityProtocol != null && securityProtocol.contains("SASL")) 
{
+                
props.setProperty("log4j.appender.KAFKA.SaslKerberosServiceName", 
res.getString("saslKerberosServiceName"));
+                props.setProperty("log4j.appender.KAFKA.clientJaasConfPath", 
res.getString("clientJaasConfPath"));
+                props.setProperty("log4j.appender.KAFKA.kerb5ConfPath", 
res.getString("kerb5ConfPath"));
+            }
             props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA");
 
             if (configFile != null) {

Reply via email to