[
https://issues.apache.org/jira/browse/KAFKA-4897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339844#comment-16339844
]
ASF GitHub Bot commented on KAFKA-4897:
---------------------------------------
hachikuji closed pull request #4393: KAFKA-4897: Add pause method to
ShutdownableThread
URL: https://github.com/apache/kafka/pull/4393
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 0a6b82e5fc1..23f53569a6b 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -83,7 +83,7 @@ class ConsumerFetcherManager(private val consumerIdString:
String,
}
} catch {
case t: Throwable => {
- if (!isRunning.get())
+ if (!isRunning)
throw t /* If this thread is stopped, propagate this exception
to kill the thread. */
else
warn("Failed to find leader for
%s".format(noLeaderPartitionSet), t)
@@ -98,7 +98,7 @@ class ConsumerFetcherManager(private val consumerIdString:
String,
)
} catch {
case t: Throwable =>
- if (!isRunning.get())
+ if (!isRunning)
throw t /* If this thread is stopped, propagate this exception to
kill the thread. */
else {
warn("Failed to add leader for partitions %s; will
retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e389821ad3b..d5456bea9a8 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -17,7 +17,7 @@
package kafka.controller
import java.net.SocketTimeoutException
-import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
import com.yammer.metrics.core.Gauge
import kafka.api._
@@ -213,13 +213,13 @@ class RequestSendThread(val controllerId: Int,
override def doWork(): Unit = {
- def backoff(): Unit = CoreUtils.swallow(Thread.sleep(100), this,
Level.TRACE)
+ def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
val QueueItem(apiKey, requestBuilder, callback) = queue.take()
var clientResponse: ClientResponse = null
try {
var isSendSuccessful = false
- while (isRunning.get() && !isSendSuccessful) {
+ while (isRunning && !isSendSuccessful) {
// if a broker goes down for a long time, then at some point the
controller's zookeeper listener will trigger a
// removeBroker which will invoke shutdown() on this thread. At that
point, we will stop retrying.
try {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index c5c0d497318..637e24cb01d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
import java.nio._
import java.nio.file.Files
import java.util.Date
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Gauge
import kafka.common._
@@ -233,10 +233,9 @@ class LogCleaner(val config: CleanerConfig,
checkDone = checkDone)
@volatile var lastStats: CleanerStats = new CleanerStats()
- private val backOffWaitLatch = new CountDownLatch(1)
private def checkDone(topicPartition: TopicPartition) {
- if (!isRunning.get())
+ if (!isRunning)
throw new ThreadShutdownException
cleanerManager.checkCleaningAborted(topicPartition)
}
@@ -248,12 +247,6 @@ class LogCleaner(val config: CleanerConfig,
cleanOrSleep()
}
- override def shutdown() = {
- initiateShutdown()
- backOffWaitLatch.countDown()
- awaitShutdown()
- }
-
/**
* Clean a log if there is a dirty log available, otherwise sleep for a bit
*/
@@ -289,7 +282,7 @@ class LogCleaner(val config: CleanerConfig,
}
}
if (!cleaned)
- backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
+ pause(config.backOffMs, TimeUnit.MILLISECONDS)
}
/**
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b078073ba03..925c33095a2 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -148,7 +148,7 @@ abstract class AbstractFetcherThread(name: String,
responseData = fetch(fetchRequest)
} catch {
case t: Throwable =>
- if (isRunning.get) {
+ if (isRunning) {
warn(s"Error in fetch to broker ${sourceBroker.id}, request
$fetchRequest", t)
inLock(partitionMapLock) {
partitionsWithError ++= partitionStates.partitionSet.asScala
@@ -218,7 +218,7 @@ abstract class AbstractFetcherThread(name: String,
partitionsWithError += topicPartition
}
case _ =>
- if (isRunning.get) {
+ if (isRunning) {
error(s"Error for partition $topicPartition from broker
${sourceBroker.id}", partitionData.exception.get)
partitionsWithError += topicPartition
}
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 71f33683dbe..0408e9212a3 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -368,7 +368,7 @@ private class ReplicaFetcher(name: String, sourceBroker:
BrokerEndPoint, topicAn
response = simpleConsumer.fetch(fetchRequest)
} catch {
case t: Throwable =>
- if (!isRunning.get)
+ if (!isRunning)
throw t
}
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index 0922d15e93a..13bbc90f2a5 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -17,8 +17,7 @@
package kafka.utils
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.kafka.common.internals.FatalExitError
@@ -26,8 +25,8 @@ abstract class ShutdownableThread(val name: String, val
isInterruptible: Boolean
extends Thread(name) with Logging {
this.setDaemon(false)
this.logIdent = "[" + name + "]: "
- val isRunning: AtomicBoolean = new AtomicBoolean(true)
- private val shutdownLatch = new CountDownLatch(1)
+ private val shutdownInitiated = new CountDownLatch(1)
+ private val shutdownComplete = new CountDownLatch(1)
def shutdown(): Unit = {
initiateShutdown()
@@ -35,27 +34,42 @@ abstract class ShutdownableThread(val name: String, val
isInterruptible: Boolean
}
def isShutdownComplete: Boolean = {
- shutdownLatch.getCount == 0
+ shutdownComplete.getCount == 0
}
def initiateShutdown(): Boolean = {
- if (isRunning.compareAndSet(true, false)) {
- info("Shutting down")
- if (isInterruptible)
- interrupt()
- true
- } else
- false
+ this.synchronized {
+ if (isRunning) {
+ info("Shutting down")
+ shutdownInitiated.countDown()
+ if (isInterruptible)
+ interrupt()
+ true
+ } else
+ false
+ }
}
- /**
+ /**
* After calling initiateShutdown(), use this API to wait until the shutdown
is complete
*/
def awaitShutdown(): Unit = {
- shutdownLatch.await()
+ shutdownComplete.await()
info("Shutdown completed")
}
+ /**
+ * Causes the current thread to wait until the shutdown is initiated,
+ * or the specified waiting time elapses.
+ *
+ * @param timeout
+ * @param unit
+ */
+ def pause(timeout: Long, unit: TimeUnit): Unit = {
+ if (shutdownInitiated.await(timeout, unit))
+ trace("shutdownInitiated latch count reached zero. Shutdown called.")
+ }
+
/**
* This method is repeatedly invoked until the thread shuts down or this
method throws an exception
*/
@@ -64,19 +78,24 @@ abstract class ShutdownableThread(val name: String, val
isInterruptible: Boolean
override def run(): Unit = {
info("Starting")
try {
- while (isRunning.get)
+ while (isRunning)
doWork()
} catch {
case e: FatalExitError =>
- isRunning.set(false)
- shutdownLatch.countDown()
+ shutdownInitiated.countDown()
+ shutdownComplete.countDown()
info("Stopped")
Exit.exit(e.statusCode())
case e: Throwable =>
- if (isRunning.get())
+ if (isRunning)
error("Error due to", e)
+ } finally {
+ shutdownComplete.countDown()
}
- shutdownLatch.countDown()
info("Stopped")
}
+
+ def isRunning: Boolean = {
+ shutdownInitiated.getCount() != 0
+ }
}
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 8917921d85f..58d1be9ee5c 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -104,7 +104,7 @@ class ConsumerBounceTest extends IntegrationTestHarness
with Logging {
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
- while (scheduler.isRunning.get()) {
+ while (scheduler.isRunning) {
val records = consumer.poll(100).asScala
assertEquals(Set(tp), consumer.assignment.asScala)
@@ -146,7 +146,7 @@ class ConsumerBounceTest extends IntegrationTestHarness
with Logging {
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
- while(scheduler.isRunning.get()) {
+ while(scheduler.isRunning) {
val coin = TestUtils.random.nextInt(3)
if (coin == 0) {
info("Seeking to end of log")
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index fc06b731034..a760d7d1d06 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -439,7 +439,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
@volatile var sent = 0
override def doWork(): Unit = {
try {
- while (isRunning.get) {
+ while (isRunning) {
sent += 1
val record = new ProducerRecord(topic, s"key$sent",
s"value$sent")
producer.send(record).get(10, TimeUnit.SECONDS)
@@ -456,7 +456,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
var received = 0
override def doWork(): Unit = {
try {
- while (isRunning.get || (received < producerThread.sent &&
System.currentTimeMillis < endTimeMs)) {
+ while (isRunning || (received < producerThread.sent &&
System.currentTimeMillis < endTimeMs)) {
received += consumer.poll(50).count
}
} finally {
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 3bc3d39b339..66b4874e3f9 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -88,21 +88,6 @@ For a detailed description of findbugs bug categories, see
http://findbugs.sourc
</Or>
</Match>
- <Match>
- <!-- Add a suppression for KAFKA-4897: LogCleaner#cleanSegments should
not ignore failures to delete files.
- TODO: remove this suppression when KAFKA-4897 is fixed. -->
- <Package name="kafka.log"/>
- <Source name="LogCleaner.scala"/>
- <Bug
pattern="RV_RETURN_VALUE_IGNORED,RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
- </Match>
-
- <Match>
- <!-- Add a suppression for ignoring the return value of
CountDownLatch#await. -->
- <Class name="kafka.log.Cleaner"/>
- <Method name="cleanOrSleep"/>
- <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
- </Match>
-
<Match>
<!-- Add a suppression for having the thread start in the constructor
of the old, deprecated consumer. -->
<Class name="kafka.producer.Producer"/>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> LogCleaner#cleanSegments should not ignore failures to delete files
> -------------------------------------------------------------------
>
> Key: KAFKA-4897
> URL: https://issues.apache.org/jira/browse/KAFKA-4897
> Project: Kafka
> Issue Type: Bug
> Components: core
> Reporter: Colin P. McCabe
> Assignee: Manikumar
> Priority: Major
>
> LogCleaner#cleanSegments should not ignore failures to delete files.
> Currently, it ignores the failure and does not even log an error message.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)