Repository: kafka Updated Branches: refs/heads/trunk 80038e6d2 -> ed8b0315a
KAFKA-1044; Eliminate direct and non-optional log4j references from `core` Use slf4j (via scala-logging) instead. Also: - Log4jController is only initialised if log4j if in the classpath - Use FATAL marker to support log4j's FATAL level (as the log4j-slf4j bridge does) - Removed `Logging.swallow` in favour of CoreUtils.swallow, which logs to the correct logger Author: Viktor Somogyi <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3477 from viktorsomogyi/KAFKA-1044 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed8b0315 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed8b0315 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed8b0315 Branch: refs/heads/trunk Commit: ed8b0315a6c3705b2a163ce3ab4723234779264f Parents: 80038e6 Author: Viktor Somogyi <[email protected]> Authored: Wed Nov 22 12:44:15 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Wed Nov 22 15:06:56 2017 +0000 ---------------------------------------------------------------------- build.gradle | 1 + core/src/main/scala/kafka/Kafka.scala | 2 +- .../src/main/scala/kafka/admin/AclCommand.scala | 4 +- .../ZkNodeChangeNotificationListener.scala | 2 +- .../kafka/consumer/ConsumerFetcherManager.scala | 2 +- .../controller/ControllerChannelManager.scala | 3 +- .../kafka/controller/StateChangeLogger.scala | 6 +- .../main/scala/kafka/log/AbstractIndex.scala | 4 +- core/src/main/scala/kafka/log/LogManager.scala | 6 +- core/src/main/scala/kafka/log/LogSegment.scala | 22 +-- .../scala/kafka/network/BlockingChannel.scala | 8 +- .../scala/kafka/network/RequestChannel.scala | 9 +- .../main/scala/kafka/network/SocketServer.scala | 11 +- .../scala/kafka/producer/SyncProducer.scala | 2 +- .../producer/async/DefaultEventHandler.scala | 9 +- .../security/auth/SimpleAclAuthorizer.scala | 4 +- .../main/scala/kafka/server/AdminManager.scala | 4 +- .../scala/kafka/server/ClientQuotaManager.scala | 10 +- .../main/scala/kafka/server/ConfigHandler.scala | 4 +- .../kafka/server/DynamicConfigManager.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 6 +- .../main/scala/kafka/server/KafkaServer.scala | 42 +++--- .../kafka/server/ReplicaFetcherThread.scala | 6 +- .../scala/kafka/tools/ConsoleConsumer.scala | 8 +- .../scala/kafka/tools/ConsumerPerformance.scala | 6 +- .../main/scala/kafka/tools/MirrorMaker.scala | 8 +- .../scala/kafka/tools/ProducerPerformance.scala | 3 +- .../kafka/tools/SimpleConsumerPerformance.scala | 6 +- core/src/main/scala/kafka/utils/CoreUtils.scala | 21 ++- .../scala/kafka/utils/Log4jController.scala | 11 +- core/src/main/scala/kafka/utils/Logging.scala | 136 +++++++------------ .../scala/kafka/security/minikdc/MiniKdc.scala | 6 +- .../test/scala/kafka/utils/LoggingTest.scala | 37 +++++ .../unit/kafka/server/LogDirFailureTest.scala | 4 +- .../unit/kafka/server/ServerShutdownTest.scala | 2 +- .../scala/unit/kafka/utils/CoreUtilsTest.scala | 7 +- .../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 9 +- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 4 +- gradle/dependencies.gradle | 2 + 39 files changed, 218 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 03426cd..1223150 100644 --- a/build.gradle +++ b/build.gradle @@ -550,6 +550,7 @@ project(':core') { compile libs.metrics compile libs.scala compile libs.slf4jlog4j + compile libs.scalaLogging compile libs.zkclient compile libs.zookeeper http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/Kafka.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 9651038..25a7216 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -94,7 +94,7 @@ object Kafka extends Logging { } catch { case e: Throwable => - fatal(e) + fatal("Exiting Kafka due to fatal exception", e) Exit.exit(1) } Exit.exit(0) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/admin/AclCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 4522135..2732f6c 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ -object AclCommand { +object AclCommand extends Logging { val Newline = scala.util.Properties.lineSeparator val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] ( @@ -77,7 +77,7 @@ object AclCommand { authZ.configure(authorizerProperties.asJava) f(authZ) } - finally CoreUtils.swallow(authZ.close()) + finally CoreUtils.swallow(authZ.close(), this) } private def addAcl(opts: AclCommandOptions) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index f0d4b1b..4cae80c 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -87,7 +87,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, val (data, _) = zkClient.getDataAndStat(changeZnode) data match { case Some(d) => notificationHandler.processNotification(d) - case None => logger.warn(s"read null data from $changeZnode when processing notification $notification") + case None => warn(s"read null data from $changeZnode when processing notification $notification") } lastExecutedChange = changeId } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 7cccfe1..0a6b82e 100755 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -69,7 +69,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, config.clientId, config.socketTimeoutMs, correlationId.getAndIncrement).topicsMetadata - if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) + if(isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) topicsMetadata.foreach { tmd => val topic = tmd.topic tmd.partitionsMetadata.foreach { pmd => http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 7314679..d5bd4e6 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -36,6 +36,7 @@ import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{Node, TopicPartition} +import org.slf4j.event.Level import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap @@ -210,7 +211,7 @@ class RequestSendThread(val controllerId: Int, override def doWork(): Unit = { - def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100)) + def backoff(): Unit = CoreUtils.swallow(Thread.sleep(100), this, Level.TRACE) val QueueItem(apiKey, requestBuilder, callback) = queue.take() var clientResponse: ClientResponse = null http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/controller/StateChangeLogger.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/StateChangeLogger.scala b/core/src/main/scala/kafka/controller/StateChangeLogger.scala index 21c70c3..a1d1bb2 100644 --- a/core/src/main/scala/kafka/controller/StateChangeLogger.scala +++ b/core/src/main/scala/kafka/controller/StateChangeLogger.scala @@ -17,11 +17,11 @@ package kafka.controller +import com.typesafe.scalalogging.Logger import kafka.utils.Logging -import org.apache.log4j object StateChangeLogger { - private val Logger = log4j.Logger.getLogger("state.change.logger") + private val logger = Logger("state.change.logger") } /** @@ -34,7 +34,7 @@ class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, controllerE if (controllerEpoch.isDefined && !inControllerContext) throw new IllegalArgumentException("Controller epoch should only be defined if inControllerContext is true") - override lazy val logger = StateChangeLogger.Logger + override lazy val logger = StateChangeLogger.logger locally { val prefix = if (inControllerContext) "Controller" else "Broker" http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/log/AbstractIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index 899c107..9696d8d 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -75,7 +75,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon idx.position(roundDownToExactMultiple(idx.limit(), entrySize)) idx } finally { - CoreUtils.swallow(raf.close()) + CoreUtils.swallow(raf.close(), this) } } @@ -130,7 +130,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon mmap.position(position) true } finally { - CoreUtils.swallow(raf.close()) + CoreUtils.swallow(raf.close(), this) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index f1e2fc2..9a61be3 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -206,7 +206,7 @@ class LogManager(logDirs: Seq[File], info(s"Logs for partitions ${offlineCurrentTopicPartitions.mkString(",")} are offline and " + s"logs for future partitions ${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log directory $dir") - dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy())) + dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy(), this)) } } @@ -412,7 +412,7 @@ class LogManager(logDirs: Seq[File], // stop the cleaner first if (cleaner != null) { - CoreUtils.swallow(cleaner.shutdown()) + CoreUtils.swallow(cleaner.shutdown(), this) } // close logs in each dir @@ -448,7 +448,7 @@ class LogManager(logDirs: Seq[File], // mark that the shutdown was clean by creating marker file debug("Writing clean shutdown marker at " + dir) - CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath)) + CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this) } } catch { case e: ExecutionException => http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 845f08f..6db2a50 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -287,12 +287,12 @@ class LogSegment(val log: FileRecords, } } catch { case e: CorruptRecordException => - logger.warn("Found invalid messages in log segment %s at byte offset %d: %s." + warn("Found invalid messages in log segment %s at byte offset %d: %s." .format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes if (truncated > 0) - logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery") + debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery") log.truncateTo(validBytes) index.trimToValidSize() @@ -467,21 +467,21 @@ class LogSegment(val log: FileRecords, * Close this log segment */ def close() { - CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)) - CoreUtils.swallow(index.close()) - CoreUtils.swallow(timeIndex.close()) - CoreUtils.swallow(log.close()) - CoreUtils.swallow(txnIndex.close()) + CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true), this) + CoreUtils.swallow(index.close(), this) + CoreUtils.swallow(timeIndex.close(), this) + CoreUtils.swallow(log.close(), this) + CoreUtils.swallow(txnIndex.close(), this) } /** * Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed */ def closeHandlers() { - CoreUtils.swallow(index.closeHandler()) - CoreUtils.swallow(timeIndex.closeHandler()) - CoreUtils.swallow(log.closeHandlers()) - CoreUtils.swallow(txnIndex.close()) + CoreUtils.swallow(index.closeHandler(), this) + CoreUtils.swallow(timeIndex.closeHandler(), this) + CoreUtils.swallow(log.closeHandlers(), this) + CoreUtils.swallow(txnIndex.close(), this) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/network/BlockingChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index 69fd054..3493ad3 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -21,7 +21,7 @@ import java.net.InetSocketAddress import java.nio.channels._ import kafka.api.RequestOrResponse -import kafka.utils.{Logging, nonthreadsafe} +import kafka.utils.{CoreUtils, Logging, nonthreadsafe} import org.apache.kafka.common.network.NetworkReceive @@ -91,15 +91,15 @@ class BlockingChannel( val host: String, def disconnect() = lock synchronized { if(channel != null) { - swallow(channel.close()) - swallow(channel.socket.close()) + CoreUtils.swallow(channel.close(), this) + CoreUtils.swallow(channel.socket.close(), this) channel = null writeChannel = null } // closing the main socket channel *should* close the read channel // but let's do it to be sure. if(readChannel != null) { - swallow(readChannel.close()) + CoreUtils.swallow(readChannel.close(), this) readChannel = null } connected = false http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a50af45..7cc8619 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -21,6 +21,7 @@ import java.net.InetAddress import java.nio.ByteBuffer import java.util.concurrent._ +import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.{Gauge, Meter} import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction} @@ -31,15 +32,14 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Sanitizer, Time} -import org.apache.log4j.Logger import scala.collection.mutable import scala.reflect.ClassTag object RequestChannel extends Logging { - private val requestLogger = Logger.getLogger("kafka.request.logger") + private val requestLogger = Logger("kafka.request.logger") - def isRequestLoggingEnabled: Boolean = requestLogger.isDebugEnabled + def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled sealed trait BaseRequest case object ShutdownRequest extends BaseRequest @@ -176,7 +176,7 @@ object RequestChannel extends Logging { recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos)) if (isRequestLoggingEnabled) { - val detailsEnabled = requestLogger.isTraceEnabled + val detailsEnabled = requestLogger.underlying.isTraceEnabled val responseString = response.responseAsString.getOrElse( throw new IllegalStateException("responseAsString should always be defined if request logging is enabled")) @@ -325,6 +325,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) + } object RequestMetrics { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 4366fea..200bfe2 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -38,6 +38,7 @@ import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaCh import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time} +import org.slf4j.event.Level import scala.collection._ import JavaConverters._ @@ -252,8 +253,8 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ if (channel != null) { debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) connectionQuotas.dec(channel.socket.getInetAddress) - swallowError(channel.socket().close()) - swallowError(channel.close()) + CoreUtils.swallow(channel.socket().close(), this, Level.ERROR) + CoreUtils.swallow(channel.close(), this, Level.ERROR) } } } @@ -319,8 +320,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint, } } finally { debug("Closing server socket and selector.") - swallowError(serverChannel.close()) - swallowError(nioSelector.close()) + CoreUtils.swallow(serverChannel.close(), this, Level.ERROR) + CoreUtils.swallow(nioSelector.close(), this, Level.ERROR) shutdownComplete() } } @@ -481,7 +482,7 @@ private[kafka] class Processor(val id: Int, } } finally { debug("Closing selector - processor " + id) - swallowError(closeAll()) + CoreUtils.swallow(closeAll(), this, Level.ERROR) shutdownComplete() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/producer/SyncProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 04527c8..b132293 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -56,7 +56,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level */ - if (logger.isDebugEnabled) { + if (isDebugEnabled) { val buffer = new RequestOrResponseSend("", request).buffer trace("verifying sendbuffer of size " + buffer.limit()) val requestTypeId = buffer.getShort() http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 3e4eaa3..8c7465f 100755 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -32,6 +32,7 @@ import java.util.concurrent.atomic._ import kafka.api.{ProducerRequest, TopicMetadata} import org.apache.kafka.common.utils.{Time, Utils} +import org.slf4j.event.Level @deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class DefaultEventHandler[K,V](config: ProducerConfig, @@ -72,7 +73,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) if (topicMetadataRefreshInterval >= 0 && Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { - CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) + CoreUtils.swallow(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement), this, Level.ERROR) sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds @@ -83,7 +84,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.retryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those - CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) + CoreUtils.swallow(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement), this, Level.ERROR) sendPartitionPerTopicCache.clear() remainingRetries -= 1 producerStats.resendRate.mark() @@ -105,7 +106,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, case Some(partitionedData) => val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]] for ((brokerid, messagesPerBrokerMap) <- partitionedData) { - if (logger.isTraceEnabled) { + if (isTraceEnabled) { messagesPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) } @@ -277,7 +278,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, if(response != null) { if (response.status.size != producerRequest.data.size) throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) - if (logger.isTraceEnabled) { + if (isTraceEnabled) { val successfullySentData = response.status.filter(_._2.error == Errors.NONE) successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString())))) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index aa25653..e1befc7 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -19,6 +19,7 @@ package kafka.security.auth import java.util import java.util.concurrent.locks.ReentrantReadWriteLock +import com.typesafe.scalalogging.Logger import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener} import kafka.network.RequestChannel.Session import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls @@ -29,7 +30,6 @@ import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, import kafka.zookeeper.ZooKeeperClient import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.SecurityUtils -import org.apache.log4j.Logger import scala.collection.JavaConverters._ import scala.util.Random @@ -51,7 +51,7 @@ object SimpleAclAuthorizer { } class SimpleAclAuthorizer extends Authorizer with Logging { - private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger") + private val authorizerLogger = Logger("kafka.authorizer.logger") private var superUsers = Set.empty[KafkaPrincipal] private var shouldAllowEveryoneIfNoAclIsFound = false private var zkClient: KafkaZkClient = null http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/AdminManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index aefdefd..935fade 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -387,7 +387,7 @@ class AdminManager(val config: KafkaConfig, def shutdown() { topicPurgatory.shutdown() - CoreUtils.swallow(createTopicPolicy.foreach(_.close())) - CoreUtils.swallow(alterConfigPolicy.foreach(_.close())) + CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this) + CoreUtils.swallow(alterConfigPolicy.foreach(_.close()), this) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/ClientQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index a6efa92..8ec27a3 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -194,7 +194,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, // If delayed, add the element to the delayQueue delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) delayQueueSensor.record() - logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) + debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) } throttleTimeMs } @@ -434,7 +434,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } quota match { case Some(newQuota) => - logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}") + info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}") overriddenQuota.put(quotaId, newQuota) (sanitizedUser, clientId) match { case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled @@ -443,7 +443,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case (None, None) => } case None => - logger.info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}") + info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}") overriddenQuota.remove(quotaId) } @@ -463,7 +463,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, if (metric != null) { val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse(""), sanitizedClientId.getOrElse("")) val newQuota = metricConfigEntity.quota - logger.info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig") + info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig") metric.config(getQuotaMetricConfig(newQuota)) } } else { @@ -474,7 +474,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, val metricConfigEntity = quotaEntity(userTag, clientIdTag, Sanitizer.sanitize(clientIdTag)) if (metricConfigEntity.quota != metric.config.quota) { val newQuota = metricConfigEntity.quota - logger.info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig") + info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig") metric.config(getQuotaMetricConfig(newQuota)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/ConfigHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 0edd638..390222d 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -74,10 +74,10 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) { val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop) quotaManager.markThrottled(topic, partitions) - logger.debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions") + debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions") } else { quotaManager.removeThrottle(topic) - logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic") + debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic") } } updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/DynamicConfigManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index 0f1abfc..6392723 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -121,7 +121,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils, } val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, entityType, entity) - logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig") + info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig") configHandlers(entityType).processConfigChanges(entity, entityConfig) } @@ -145,7 +145,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils, val loggableConfig = entityConfig.asScala.map { case (k, v) => (k, if (ScramMechanism.isScram(k)) Password.HIDDEN else v) } - logger.info(s"Processing override for entityPath: $entityPath with config: $loggableConfig") + info(s"Processing override for entityPath: $entityPath with config: $loggableConfig") configHandlers(rootEntityType).processConfigChanges(fullSanitizedEntityName, entityConfig) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ced3f0b..de56986 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -205,7 +205,7 @@ class KafkaApis(val requestChannel: RequestChannel, new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)) } - CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()) + CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads(), this) } def handleUpdateMetadataRequest(request: RequestChannel.Request) { @@ -1823,12 +1823,12 @@ class KafkaApis(val requestChannel: RequestChannel, throw new InvalidRequestException("Invalid empty resource name") auth.addAcls(immutable.Set(acl), resource) - logger.debug(s"Added acl $acl to $resource") + debug(s"Added acl $acl to $resource") new AclCreationResponse(ApiError.NONE) } catch { case throwable: Throwable => - logger.debug(s"Failed to add acl $acl to $resource", throwable) + debug(s"Failed to add acl $acl to $resource", throwable) new AclCreationResponse(ApiError.fromThrowable(throwable)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a13f5af..1812eb0 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -517,64 +517,64 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP // last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to // `true` at the end of this method. if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { - CoreUtils.swallow(controlledShutdown()) + CoreUtils.swallow(controlledShutdown(), this) brokerState.newState(BrokerShuttingDown) if (kafkaHealthcheck != null) - CoreUtils.swallow(kafkaHealthcheck.shutdown()) + CoreUtils.swallow(kafkaHealthcheck.shutdown(), this) if (dynamicConfigManager != null) - CoreUtils.swallow(dynamicConfigManager.shutdown()) + CoreUtils.swallow(dynamicConfigManager.shutdown(), this) // Stop socket server to stop accepting any more connections and requests. // Socket server will be shutdown towards the end of the sequence. if (socketServer != null) - CoreUtils.swallow(socketServer.stopProcessingRequests()) + CoreUtils.swallow(socketServer.stopProcessingRequests(), this) if (requestHandlerPool != null) - CoreUtils.swallow(requestHandlerPool.shutdown()) + CoreUtils.swallow(requestHandlerPool.shutdown(), this) - CoreUtils.swallow(kafkaScheduler.shutdown()) + CoreUtils.swallow(kafkaScheduler.shutdown(), this) if (apis != null) - CoreUtils.swallow(apis.close()) - CoreUtils.swallow(authorizer.foreach(_.close())) + CoreUtils.swallow(apis.close(), this) + CoreUtils.swallow(authorizer.foreach(_.close()), this) if (adminManager != null) - CoreUtils.swallow(adminManager.shutdown()) + CoreUtils.swallow(adminManager.shutdown(), this) if (transactionCoordinator != null) - CoreUtils.swallow(transactionCoordinator.shutdown()) + CoreUtils.swallow(transactionCoordinator.shutdown(), this) if (groupCoordinator != null) - CoreUtils.swallow(groupCoordinator.shutdown()) + CoreUtils.swallow(groupCoordinator.shutdown(), this) if (replicaManager != null) - CoreUtils.swallow(replicaManager.shutdown()) + CoreUtils.swallow(replicaManager.shutdown(), this) if (logManager != null) - CoreUtils.swallow(logManager.shutdown()) + CoreUtils.swallow(logManager.shutdown(), this) if (kafkaController != null) - CoreUtils.swallow(kafkaController.shutdown()) + CoreUtils.swallow(kafkaController.shutdown(), this) if (zkUtils != null) - CoreUtils.swallow(zkUtils.close()) + CoreUtils.swallow(zkUtils.close(), this) if (zkClient != null) - CoreUtils.swallow(zkClient.close()) + CoreUtils.swallow(zkClient.close(), this) if (quotaManagers != null) - CoreUtils.swallow(quotaManagers.shutdown()) + CoreUtils.swallow(quotaManagers.shutdown(), this) // Even though socket server is stopped much earlier, controller can generate // response for controlled shutdown request. Shutdown server at the end to // avoid any failures (e.g. when metrics are recorded) if (socketServer != null) - CoreUtils.swallow(socketServer.shutdown()) + CoreUtils.swallow(socketServer.shutdown(), this) if (metrics != null) - CoreUtils.swallow(metrics.close()) + CoreUtils.swallow(metrics.close(), this) if (brokerTopicStats != null) - CoreUtils.swallow(brokerTopicStats.close()) + CoreUtils.swallow(brokerTopicStats.close(), this) brokerState.newState(NotRunning) startupComplete.set(false) isShuttingDown.set(false) - CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics)) + CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics), this) shutdownLatch.countDown() info("shut down completed") } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 3bc68da..4413165 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -99,14 +99,14 @@ class ReplicaFetcherThread(name: String, throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format( topicPartition, fetchOffset, replica.logEndOffset.messageOffset)) - if (logger.isTraceEnabled) + if (isTraceEnabled) trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d" .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) // Append the leader's messages to the log partition.appendRecordsToFollower(records) - if (logger.isTraceEnabled) + if (isTraceEnabled) trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s" .format(replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition)) val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark) @@ -116,7 +116,7 @@ class ReplicaFetcherThread(name: String, // these values will be computed upon making the leader replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) replica.maybeIncrementLogStartOffset(leaderLogStartOffset) - if (logger.isTraceEnabled) + if (isTraceEnabled) trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark") if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 02690f1..7d2d371 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.CountDownLatch import java.util.{Locale, Properties, Random} +import com.typesafe.scalalogging.LazyLogging import joptsimple._ import kafka.api.OffsetRequest import kafka.common.{MessageFormatter, StreamEndException} @@ -35,7 +36,6 @@ import org.apache.kafka.common.errors.{AuthenticationException, WakeupException} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.utils.Utils -import org.apache.log4j.Logger import scala.collection.JavaConverters._ @@ -568,17 +568,15 @@ class DefaultMessageFormatter extends MessageFormatter { } } -class LoggingMessageFormatter extends MessageFormatter { +class LoggingMessageFormatter extends MessageFormatter with LazyLogging { private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter - val logger = Logger.getLogger(getClass().getName) override def init(props: Properties): Unit = defaultWriter.init(props) def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { import consumerRecord._ defaultWriter.writeTo(consumerRecord, output) - if (logger.isInfoEnabled) - logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} + + logger.info({if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) s"$timestampType:$timestamp, " else ""} + s"key:${if (key == null) "null" else new String(key, StandardCharsets.UTF_8)}, " + s"value:${if (value == null) "null" else new String(value, StandardCharsets.UTF_8)}") } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/ConsumerPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index bdec41f..a3e60e6 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import java.util.concurrent.atomic.AtomicLong import java.nio.channels.ClosedByInterruptException -import org.apache.log4j.Logger import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils @@ -38,13 +37,14 @@ import kafka.consumer.ConsumerTimeoutException import java.text.SimpleDateFormat import java.util.concurrent.atomic.AtomicBoolean +import com.typesafe.scalalogging.LazyLogging + import scala.collection.mutable /** * Performance test for the full zookeeper consumer */ -object ConsumerPerformance { - private val logger = Logger.getLogger(getClass()) +object ConsumerPerformance extends LazyLogging { def main(args: Array[String]): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b5b1540..618fd2a 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -444,18 +444,18 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { exitingOnSendFailure = true fatal("Mirror maker thread failure due to ", t) } finally { - CoreUtils.swallow { + CoreUtils.swallow ({ info("Flushing producer.") producer.flush() // note that this commit is skipped if flush() fails which ensures that we don't lose messages info("Committing consumer offsets.") commitOffsets(mirrorMakerConsumer) - } + }, this) info("Shutting down consumer connectors.") - CoreUtils.swallow(mirrorMakerConsumer.stop()) - CoreUtils.swallow(mirrorMakerConsumer.cleanup()) + CoreUtils.swallow(mirrorMakerConsumer.stop(), this) + CoreUtils.swallow(mirrorMakerConsumer.cleanup(), this) shutdownLatch.countDown() info("Mirror maker thread stopped") // if it exits accidentally, stop the entire mirror maker http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/ProducerPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index 77f560b..365c9af 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -31,7 +31,7 @@ import java.math.BigInteger import java.nio.charset.StandardCharsets import org.apache.kafka.common.utils.Utils -import org.apache.log4j.Logger +import org.slf4j.LoggerFactory /** * Load test for the producer @@ -40,7 +40,6 @@ import org.apache.log4j.Logger object ProducerPerformance extends Logging { def main(args: Array[String]) { - val logger = Logger.getLogger(getClass) val config = new ProducerPerfConfig(args) if (!config.isFixedSize) logger.info("WARN: Throughput will be slower due to changing message size per request") http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala index 1d090b3..888d462 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -20,10 +20,10 @@ package kafka.tools import java.net.URI import java.text.SimpleDateFormat +import com.typesafe.scalalogging.LazyLogging import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo} import kafka.consumer.SimpleConsumer import kafka.utils._ -import org.apache.log4j.Logger import kafka.common.TopicAndPartition import org.apache.kafka.common.utils.Time @@ -32,9 +32,7 @@ import org.apache.kafka.common.utils.Time * Performance test for the simple consumer */ @deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0") -object SimpleConsumerPerformance { - - private val logger = Logger.getLogger(getClass()) +object SimpleConsumerPerformance extends LazyLogging { def main(args: Array[String]) { logger.warn("WARNING: SimpleConsumerPerformance is deprecated and will be dropped in a future release following 0.11.0.0.") http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/utils/CoreUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 7a853d5..efd4d1e 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -31,6 +31,7 @@ import kafka.cluster.EndPoint import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils} +import org.slf4j.event.Level /** * General helper functions! @@ -73,15 +74,23 @@ object CoreUtils extends Logging { new KafkaThread(name, runnable(fun), daemon) /** - * Do the given action and log any exceptions thrown without rethrowing them - * @param log The log method to use for logging. E.g. logger.warn - * @param action The action to execute - */ - def swallow(log: (Object, Throwable) => Unit, action: => Unit) { + * Do the given action and log any exceptions thrown without rethrowing them. + * + * @param action The action to execute. + * @param logging The logging instance to use for logging the thrown exception. + * @param logLevel The log level to use for logging. + */ + def swallow(action: => Unit, logging: Logging, logLevel: Level = Level.WARN) { try { action } catch { - case e: Throwable => log(e.getMessage(), e) + case e: Throwable => logLevel match { + case Level.ERROR => logger.error(e.getMessage, e) + case Level.WARN => logger.warn(e.getMessage, e) + case Level.INFO => logger.info(e.getMessage, e) + case Level.DEBUG => logger.debug(e.getMessage, e) + case Level.TRACE => logger.trace(e.getMessage, e) + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/utils/Log4jController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Log4jController.scala b/core/src/main/scala/kafka/utils/Log4jController.scala index 026fbae..95d0733 100755 --- a/core/src/main/scala/kafka/utils/Log4jController.scala +++ b/core/src/main/scala/kafka/utils/Log4jController.scala @@ -17,19 +17,10 @@ package kafka.utils - -import org.apache.log4j.{Logger, Level, LogManager} import java.util import java.util.Locale - -object Log4jController { - - private val controller = new Log4jController - - CoreUtils.registerMBean(controller, "kafka:type=kafka.Log4jController") - -} +import org.apache.log4j.{Level, LogManager, Logger} /** http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/main/scala/kafka/utils/Logging.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala index c2585ad..e409bba 100755 --- a/core/src/main/scala/kafka/utils/Logging.scala +++ b/core/src/main/scala/kafka/utils/Logging.scala @@ -17,106 +17,64 @@ package kafka.utils -import org.apache.log4j.Logger +import com.typesafe.scalalogging.{LazyLogging, Logger} +import org.slf4j.{Marker, MarkerFactory} -trait Logging { - val loggerName = this.getClass.getName - lazy val logger = Logger.getLogger(loggerName) - protected var logIdent: String = null +object Log4jControllerRegistration { + private val logger = Logger(this.getClass.getName) - // Force initialization to register Log4jControllerMBean - private val log4jController = Log4jController + try { + val log4jController = Class.forName("kafka.utils.Log4jController").asInstanceOf[Class[Object]] + val instance = log4jController.getDeclaredConstructor().newInstance() + CoreUtils.registerMBean(instance, "kafka:type=kafka.Log4jController") + logger.info("Registered kafka:type=kafka.Log4jController MBean") + } catch { + case _: Exception => logger.info("Couldn't register kafka:type=kafka.Log4jController MBean") + } +} + +private object Logging { + private val FatalMarker: Marker = MarkerFactory.getMarker("FATAL") +} + +trait Logging extends LazyLogging { + def loggerName: String = logger.underlying.getName - protected def msgWithLogIdent(msg: String) = + protected var logIdent: String = _ + + Log4jControllerRegistration + + protected def msgWithLogIdent(msg: String): String = if (logIdent == null) msg else logIdent + msg - def trace(msg: => String): Unit = { - if (logger.isTraceEnabled()) - logger.trace(msgWithLogIdent(msg)) - } - def trace(e: => Throwable): Any = { - if (logger.isTraceEnabled()) - logger.trace(logIdent,e) - } - def trace(msg: => String, e: => Throwable) = { - if (logger.isTraceEnabled()) - logger.trace(msgWithLogIdent(msg),e) - } - def swallowTrace(action: => Unit) { - CoreUtils.swallow(logger.trace, action) - } + def trace(msg: => String): Unit = logger.trace(msgWithLogIdent(msg)) - def isDebugEnabled: Boolean = logger.isDebugEnabled + def trace(msg: => String, e: => Throwable): Unit = logger.trace(msgWithLogIdent(msg),e) - def isTraceEnabled: Boolean = logger.isTraceEnabled + def isDebugEnabled: Boolean = logger.underlying.isDebugEnabled - def debug(msg: => String): Unit = { - if (logger.isDebugEnabled()) - logger.debug(msgWithLogIdent(msg)) - } - def debug(e: => Throwable): Any = { - if (logger.isDebugEnabled()) - logger.debug(logIdent,e) - } - def debug(msg: => String, e: => Throwable) = { - if (logger.isDebugEnabled()) - logger.debug(msgWithLogIdent(msg),e) - } - def swallowDebug(action: => Unit) { - CoreUtils.swallow(logger.debug, action) - } + def isTraceEnabled: Boolean = logger.underlying.isTraceEnabled - def info(msg: => String): Unit = { - if (logger.isInfoEnabled()) - logger.info(msgWithLogIdent(msg)) - } - def info(e: => Throwable): Any = { - if (logger.isInfoEnabled()) - logger.info(logIdent,e) - } - def info(msg: => String,e: => Throwable) = { - if (logger.isInfoEnabled()) - logger.info(msgWithLogIdent(msg),e) - } - def swallowInfo(action: => Unit) { - CoreUtils.swallow(logger.info, action) - } + def debug(msg: => String): Unit = logger.debug(msgWithLogIdent(msg)) - def warn(msg: => String): Unit = { - logger.warn(msgWithLogIdent(msg)) - } - def warn(e: => Throwable): Any = { - logger.warn(logIdent,e) - } - def warn(msg: => String, e: => Throwable) = { - logger.warn(msgWithLogIdent(msg),e) - } - def swallowWarn(action: => Unit) { - CoreUtils.swallow(logger.warn, action) - } - def swallow(action: => Unit) = swallowWarn(action) + def debug(msg: => String, e: => Throwable): Unit = logger.debug(msgWithLogIdent(msg),e) - def error(msg: => String): Unit = { - logger.error(msgWithLogIdent(msg)) - } - def error(e: => Throwable): Any = { - logger.error(logIdent,e) - } - def error(msg: => String, e: => Throwable) = { - logger.error(msgWithLogIdent(msg),e) - } - def swallowError(action: => Unit) { - CoreUtils.swallow(logger.error, action) - } + def info(msg: => String): Unit = logger.info(msgWithLogIdent(msg)) - def fatal(msg: => String): Unit = { - logger.fatal(msgWithLogIdent(msg)) - } - def fatal(e: => Throwable): Any = { - logger.fatal(logIdent,e) - } - def fatal(msg: => String, e: => Throwable) = { - logger.fatal(msgWithLogIdent(msg),e) - } + def info(msg: => String,e: => Throwable): Unit = logger.info(msgWithLogIdent(msg),e) + + def warn(msg: => String): Unit = logger.warn(msgWithLogIdent(msg)) + + def warn(msg: => String, e: => Throwable): Unit = logger.warn(msgWithLogIdent(msg),e) + + def error(msg: => String): Unit = logger.error(msgWithLogIdent(msg)) + + def error(msg: => String, e: => Throwable): Unit = logger.error(msgWithLogIdent(msg),e) + + def fatal(msg: => String): Unit = + logger.error(Logging.FatalMarker, msgWithLogIdent(msg)) + + def fatal(msg: => String, e: => Throwable): Unit = + logger.error(Logging.FatalMarker, msgWithLogIdent(msg), e) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index 6c964d7..9b30581 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -205,7 +205,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging { builder.append(line).append("\n") addEntriesToDirectoryService(StrSubstitutor.replace(builder, map.asJava)) } - finally CoreUtils.swallow(reader.close()) + finally CoreUtils.swallow(reader.close(), this) } val bindAddress = config.getProperty(MiniKdc.KdcBindAddress) @@ -254,7 +254,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging { while ({line = reader.readLine(); line != null}) { stringBuilder.append(line).append("{3}") } - } finally CoreUtils.swallow(reader.close()) + } finally CoreUtils.swallow(reader.close(), this) val output = MessageFormat.format(stringBuilder.toString, realm, host, port.toString, System.lineSeparator()) Files.write(krb5conf.toPath, output.getBytes(StandardCharsets.UTF_8)) } @@ -337,7 +337,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging { try { for (ldifEntry <- reader.asScala) ds.getAdminSession.add(new DefaultEntry(ds.getSchemaManager, ldifEntry.getEntry)) - } finally CoreUtils.swallow(reader.close()) + } finally CoreUtils.swallow(reader.close(), this) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/kafka/utils/LoggingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/utils/LoggingTest.scala b/core/src/test/scala/kafka/utils/LoggingTest.scala new file mode 100644 index 0000000..c0600f8 --- /dev/null +++ b/core/src/test/scala/kafka/utils/LoggingTest.scala @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.lang.management.ManagementFactory +import javax.management.ObjectName + +import org.junit.Test +import org.junit.Assert.{assertEquals, assertTrue} + + +class LoggingTest extends Logging { + + @Test + def testLog4jControllerIsRegistered(): Unit = { + val mbs = ManagementFactory.getPlatformMBeanServer() + val log4jControllerName = ObjectName.getInstance("kafka:type=kafka.Log4jController") + assertTrue("kafka.utils.Log4jController is not registered", mbs.isRegistered(log4jControllerName)) + val instance = mbs.getObjectInstance(log4jControllerName) + assertEquals("kafka.utils.Log4jController", instance.getClassName) + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala index afd619d..bf1ee35 100644 --- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala @@ -79,7 +79,7 @@ class LogDirFailureTest extends IntegrationTestHarness { val kafkaConfig = KafkaConfig.fromProps(props) val logDir = new File(kafkaConfig.logDirs.head) // Make log directory of the partition on the leader broker inaccessible by replacing it with a file - CoreUtils.swallow(Utils.delete(logDir)) + CoreUtils.swallow(Utils.delete(logDir), this) logDir.createNewFile() assertTrue(logDir.isFile) @@ -144,7 +144,7 @@ class LogDirFailureTest extends IntegrationTestHarness { // Make log directory of the partition on the leader broker inaccessible by replacing it with a file val replica = leaderServer.replicaManager.getReplicaOrException(partition) val logDir = replica.log.get.dir.getParentFile - CoreUtils.swallow(Utils.delete(logDir)) + CoreUtils.swallow(Utils.delete(logDir), this) logDir.createNewFile() assertTrue(logDir.isFile) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 75adf55..bcddd40 100755 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -135,7 +135,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { @Test def testCleanShutdownAfterFailedStartupDueToCorruptLogs() { - var server = new KafkaServer(config) + val server = new KafkaServer(config) server.startup() createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) server.shutdown() http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala index 5e607be..9ff47dd 100755 --- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala @@ -24,26 +24,25 @@ import java.util.concurrent.locks.ReentrantLock import java.nio.ByteBuffer import java.util.regex.Pattern -import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Assert._ import kafka.common.KafkaException import kafka.utils.CoreUtils.inLock import org.junit.Test import org.apache.kafka.common.utils.{Base64, Utils} +import org.slf4j.event.Level import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} -class CoreUtilsTest extends JUnitSuite { +class CoreUtilsTest extends JUnitSuite with Logging { - private val logger = Logger.getLogger(classOf[CoreUtilsTest]) val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+") @Test def testSwallow() { - CoreUtils.swallow(logger.info, throw new KafkaException("test")) + CoreUtils.swallow(throw new KafkaException("test"), this, Level.INFO) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index adc8d05..d4a829d 100755 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -19,10 +19,9 @@ package kafka.zk import org.apache.zookeeper.server.ZooKeeperServer import org.apache.zookeeper.server.NIOServerCnxnFactory -import kafka.utils.TestUtils +import kafka.utils.{CoreUtils, Logging, TestUtils} import java.net.InetSocketAddress -import kafka.utils.CoreUtils import org.apache.kafka.common.utils.Utils /** @@ -35,7 +34,7 @@ import org.apache.kafka.common.utils.Utils // This should be named EmbeddedZooKeeper for consistency with other classes, but since this is widely used by other // projects (even though it's internal), we keep the name as it is until we have a publicly supported test library for // others to use. -class EmbeddedZookeeper() { +class EmbeddedZookeeper() extends Logging { val snapshotDir = TestUtils.tempDir() val logDir = TestUtils.tempDir() @@ -48,8 +47,8 @@ class EmbeddedZookeeper() { val port = zookeeper.getClientPort def shutdown() { - CoreUtils.swallow(zookeeper.shutdown()) - CoreUtils.swallow(factory.shutdown()) + CoreUtils.swallow(zookeeper.shutdown(), this) + CoreUtils.swallow(factory.shutdown(), this) def isDown(): Boolean = { try { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 03741ef..cc1b9c1 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -63,11 +63,11 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { @After def tearDown() { if (zkUtils != null) - CoreUtils.swallow(zkUtils.close()) + CoreUtils.swallow(zkUtils.close(), this) if (zkClient != null) zkClient.close() if (zookeeper != null) - CoreUtils.swallow(zookeeper.shutdown()) + CoreUtils.swallow(zookeeper.shutdown(), this) Configuration.setConfiguration(null) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8b0315/gradle/dependencies.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index cfb0b9b..2436241 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -57,6 +57,7 @@ versions += [ jersey: "2.25.1", jmh: "1.19", log4j: "1.2.17", + scalaLogging: "3.7.2", jopt: "5.0.4", junit: "4.12", lz4: "1.4", @@ -99,6 +100,7 @@ libs += [ jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh", junit: "junit:junit:$versions.junit", log4j: "log4j:log4j:$versions.log4j", + scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging", joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", lz4: "org.lz4:lz4-java:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
