Repository: eagle Updated Branches: refs/heads/master 167ec0815 -> 051dc69c5
[MINOR] Add kafka log4j appender integration test cases * Add kafka log4j appender integration test cases `KafkaLog4jAppenderIT` Author: Hao Chen <[email protected]> Closes #838 from haoch/AddKafkaLog4jAppenderIT. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/051dc69c Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/051dc69c Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/051dc69c Branch: refs/heads/master Commit: 051dc69c5cc50bc354e72a4951472ae2c9a25524 Parents: 167ec08 Author: Hao Chen <[email protected]> Authored: Thu Feb 23 11:06:23 2017 +0800 Committer: Hao Chen <[email protected]> Committed: Thu Feb 23 11:06:23 2017 +0800 ---------------------------------------------------------------------- eagle-external/eagle-log4jkafka/pom.xml | 15 ++++ .../src/test/resources/log4j.properties | 6 +- .../log4j/kafka/KafkaLog4jAppenderIT.scala | 80 ++++++++++++++++++++ .../eagle/log4j/kafka/KafkaTestBase.scala | 66 ++++++++++++++++ 4 files changed, 164 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/051dc69c/eagle-external/eagle-log4jkafka/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-external/eagle-log4jkafka/pom.xml b/eagle-external/eagle-log4jkafka/pom.xml index 9bd2983..d05279f 100644 --- a/eagle-external/eagle-log4jkafka/pom.xml +++ b/eagle-external/eagle-log4jkafka/pom.xml @@ -51,6 +51,21 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-client</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/eagle/blob/051dc69c/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties b/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties index 3cb8290..ffd4a21 100644 --- a/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties +++ b/eagle-external/eagle-log4jkafka/src/test/resources/log4j.properties @@ -23,8 +23,8 @@ log4j.appender.KAFKA.BrokerList=sandbox.hortonworks.com:6667 log4j.appender.KAFKA.KeyClass=org.apache.eagle.log4j.kafka.hadoop.AuditLogKeyer log4j.appender.KAFKA.Layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA.Layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n -log4j.appender.KAFKA.ProducerType=async +log4j.appender.KAFKA.ProducerType=sync log4j.appender.KAFKA.BatchSize=1 log4j.appender.KAFKA.QueueSize=1 -log4j.logger.eagle.kafka.producer.TestKafkaAppender$=console,KAFKA -#log4j.logger.kafka.utils.VerifiableProperties=INFO,console +# log4j.logger.org.apache.eagle.log4j.kafka.KafkaLog4jAppenderIT=DEBUG, console,KAFKA +# log4j.logger.kafka.utils.VerifiableProperties=DEBUG,console \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/051dc69c/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaLog4jAppenderIT.scala ---------------------------------------------------------------------- diff --git a/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaLog4jAppenderIT.scala b/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaLog4jAppenderIT.scala new file mode 100644 index 0000000..46f6c21 --- /dev/null +++ b/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaLog4jAppenderIT.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log4j.kafka + +import java.util.Properties + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.log4j.{Level, Logger} +import org.junit.Test + +class KafkaLog4jAppenderIT extends KafkaTestBase { + val KafkaLog4jAppenderTopic = "KafkaLog4jAppender" + val JKafkaLog4jAppenderTopic = "JKafkaLog4jAppender" + val kafkaBrokerList = "localhost:" + kafkaPort + + Logger.getRootLogger.setLevel(Level.ALL) + + val KafkaLog4jAppenderLogger = Logger.getLogger(classOf[KafkaLog4jAppender]) + val JKafkaLog4jAppenderLogger = Logger.getLogger(classOf[JKafkaLog4jAppender]) + + def createConsumerConfig(): Properties = { + val props = new Properties() + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList) + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer") + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") + props.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "kafka.consumer.RoundRobinAssignor") + props + } + + @Test def testKafkaLog4jAppender(): Unit = { + val kafkaLog4jAppender = new KafkaLog4jAppender() + try { + kafkaLog4jAppender.setName(classOf[KafkaLog4jAppender].getName) + kafkaLog4jAppender.setTopic(KafkaLog4jAppenderTopic) + kafkaLog4jAppender.setBrokerList(kafkaBrokerList) + kafkaLog4jAppender.setBatchSize(1) + kafkaLog4jAppender.setQueueSize("1") + + kafkaLog4jAppender.activateOptions() + KafkaLog4jAppenderLogger.addAppender(kafkaLog4jAppender) + KafkaLog4jAppenderLogger.info("message to KafkaLog4jAppender") + } finally { + kafkaLog4jAppender.close() + } + } + + @Test def testJKafkaLog4jAppender(): Unit = { + val jKafkaLog4jAppender = new JKafkaLog4jAppender() + try { + jKafkaLog4jAppender.setName(classOf[JKafkaLog4jAppender].getName) + jKafkaLog4jAppender.setTopic(JKafkaLog4jAppenderTopic) + jKafkaLog4jAppender.setBrokerList(kafkaBrokerList) + jKafkaLog4jAppender.setSyncSend(true) + jKafkaLog4jAppender.setRequiredNumAcks(0) + jKafkaLog4jAppender.activateOptions() + JKafkaLog4jAppenderLogger.addAppender(jKafkaLog4jAppender) + JKafkaLog4jAppenderLogger.info("message to JKafkaLog4jAppender") + } finally { + jKafkaLog4jAppender.close() + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/051dc69c/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaTestBase.scala ---------------------------------------------------------------------- diff --git a/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaTestBase.scala b/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaTestBase.scala new file mode 100644 index 0000000..b77a43d --- /dev/null +++ b/eagle-external/eagle-log4jkafka/src/test/scala/org/apache/eagle/log4j/kafka/KafkaTestBase.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log4j.kafka + +import java.util.Properties + +import kafka.server.{KafkaConfig, KafkaServerStartable} +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.retry.ExponentialBackoffRetry +import org.apache.curator.test.{InstanceSpec, TestingServer} +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.scalatest.junit.JUnitSuite + + +class KafkaTestBase extends JUnitSuite { + val _tempFolder = new TemporaryFolder() + + @Rule def tempFolder = _tempFolder + + var zkServer:TestingServer = _ + var curatorClient:CuratorFramework = _ + var kafkaServer:KafkaServerStartable = _ + val kafkaPort = InstanceSpec.getRandomPort + val zookeeperPort = InstanceSpec.getRandomPort + + @Before + def before(): Unit = { + val logDir = tempFolder.newFolder() + this.zkServer = new TestingServer(zookeeperPort, logDir) + val retryPolicy = new ExponentialBackoffRetry(1000, 3) + this.curatorClient = CuratorFrameworkFactory.newClient(zkServer.getConnectString, retryPolicy) + this.curatorClient.start() + + val p: Properties = new Properties + p.setProperty("zookeeper.connect", zkServer.getConnectString) + p.setProperty("broker.id", "0") + p.setProperty("port", "" + kafkaPort) + p.setProperty("log.dirs", logDir.getAbsolutePath) + p.setProperty("auto.create.topics.enable", "true") + + this.kafkaServer = new KafkaServerStartable(new KafkaConfig(p)) + this.kafkaServer.startup() + } + + @After + def after(): Unit = { + kafkaServer.shutdown() + curatorClient.close() + zkServer.close() + } +}
