This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af3f9e3a5ad KAFKA-18426 Remove FinalizedFeatureChangeListener (#18441)
af3f9e3a5ad is described below

commit af3f9e3a5adf3379b178b85878160752e8b72457
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Jan 9 01:24:03 2025 +0800

    KAFKA-18426 Remove FinalizedFeatureChangeListener (#18441)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../server/FinalizedFeatureChangeListener.scala    | 265 ---------------------
 1 file changed, 265 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala 
b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
deleted file mode 100644
index 1668118d63e..00000000000
--- a/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.server.metadata.{FeatureCacheUpdateException, ZkMetadataCache}
-
-import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
-import kafka.utils.Logging
-import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion}
-import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler}
-import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.server.util.ShutdownableThread
-
-import scala.concurrent.TimeoutException
-
-/**
- * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
- * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
- * to the latest features read from ZK. The cache updates are serialized 
through a single
- * notification processor thread.
- *
- * This updates the features cached in ZkMetadataCache
- *
- * @param finalizedFeatureCache   the finalized feature cache
- * @param zkClient                the Zookeeper client
- */
-class FinalizedFeatureChangeListener(private val finalizedFeatureCache: 
ZkMetadataCache,
-                                     private val zkClient: KafkaZkClient) 
extends Logging {
-
-  /**
-   * Helper class used to update the FinalizedFeatureCache.
-   *
-   * @param featureZkNodePath   the path to the ZK feature node to be read
-   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
-   *                            updateOrThrow() operation is over
-   */
-  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
-
-    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
-
-    /**
-     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
-     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
-     * exception is raised.
-     *
-     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked exactly
-     * once successfully. A subsequent invocation will raise an exception.
-     *
-     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
-     *           this method is called again after a successful previous 
invocation.
-     * @throws   FeatureCacheUpdateException, if there was an error in 
updating the
-     *           FinalizedFeatureCache.
-     */
-    def updateLatestOrThrow(): Unit = {
-      maybeNotifyOnce.foreach(notifier => {
-        if (notifier.getCount != 1) {
-          throw new IllegalStateException(
-            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
-        }
-      })
-
-      debug(s"Reading feature ZK node at path: $featureZkNodePath")
-      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
-
-      // There are 4 cases:
-      //
-      // (empty dataBytes, valid version)       => The empty dataBytes will 
fail FeatureZNode deserialization.
-      //                                           FeatureZNode, when present 
in ZK, can not have empty contents.
-      // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
-      //                                           if dataBytes contains valid 
data.
-      // (empty dataBytes, unknown version)     => This is a valid case, and 
this can happen if the FeatureZNode
-      //                                           does not exist in ZK.
-      // (non-empty dataBytes, unknown version) => This case is impossible, 
since, KafkaZkClient.getDataAndVersion
-      //                                           API ensures that unknown 
version is returned only when the
-      //                                           ZK node is absent. 
Therefore dataBytes should be empty in such
-      //                                           a case.
-      if (version == ZkVersion.UnknownVersion) {
-        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
-        finalizedFeatureCache.clearFeatures()
-      } else {
-        var maybeFeatureZNode: Option[FeatureZNode] = Option.empty
-        try {
-          maybeFeatureZNode = 
Some(FeatureZNode.decode(mayBeFeatureZNodeBytes.get))
-        } catch {
-          case e: IllegalArgumentException => {
-            error(s"Unable to deserialize feature ZK node at path: 
$featureZkNodePath", e)
-            finalizedFeatureCache.clearFeatures()
-          }
-        }
-        maybeFeatureZNode.foreach(featureZNode => {
-          featureZNode.status match {
-            case FeatureZNodeStatus.Disabled => {
-              info(s"Feature ZK node at path: $featureZkNodePath is in 
disabled status.")
-              finalizedFeatureCache.clearFeatures()
-            }
-            case FeatureZNodeStatus.Enabled => {
-              
finalizedFeatureCache.updateFeaturesOrThrow(featureZNode.features.toMap, 
version)
-            }
-            case _ => throw new IllegalStateException(s"Unexpected 
FeatureZNodeStatus found in $featureZNode")
-          }
-        })
-      }
-
-      maybeNotifyOnce.foreach(notifier => notifier.countDown())
-    }
-
-    /**
-     * Waits until at least a single updateLatestOrThrow completes 
successfully. This method returns
-     * immediately if an updateLatestOrThrow call had already completed 
successfully.
-     *
-     * @param waitTimeMs   the timeout for the wait operation
-     *
-     * @throws             TimeoutException if the wait can not be completed 
in waitTimeMs
-     *                     milli seconds
-     */
-    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
-      maybeNotifyOnce.foreach(notifier => {
-        if (!notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)) {
-          throw new TimeoutException(
-            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
-        }
-      })
-    }
-  }
-
-  /**
-   * A shutdownable thread to process feature node change notifications that 
are populated into the
-   * queue. If any change notification can not be processed successfully 
(unless it is due to an
-   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
-   *
-   * @param name   name of the thread
-   */
-  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name) with Logging {
-
-    this.logIdent = logPrefix
-
-    override def doWork(): Unit = {
-      try {
-        queue.take.updateLatestOrThrow()
-      } catch {
-        case ie: InterruptedException =>
-          // While the queue is empty and this thread is blocking on taking an 
item from the queue,
-          // a concurrent call to FinalizedFeatureChangeListener.close() could 
interrupt the thread
-          // and cause an InterruptedException to be raised from queue.take(). 
In such a case, it is
-          // safe to ignore the exception if the thread is being shutdown. We 
raise the exception
-          // here again, because, it is ignored by ShutdownableThread if it is 
shutting down.
-          throw ie
-        case cacheUpdateException: FeatureCacheUpdateException =>
-          error("Failed to process feature ZK node change event. The broker 
will eventually exit.", cacheUpdateException)
-          throw new FatalExitError(1)
-        case e: Exception =>
-          // do not exit for exceptions unrelated to cache change processing 
(e.g. ZK session expiration)
-          warn("Unexpected exception in feature ZK node change event 
processing; will continue processing.", e)
-      }
-    }
-  }
-
-  // Feature ZK node change handler.
-  private object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
-    override val path: String = FeatureZNode.path
-
-    override def handleCreation(): Unit = {
-      info(s"Feature ZK node created at path: $path")
-      queue.add(new FeatureCacheUpdater(path))
-    }
-
-    override def handleDataChange(): Unit = {
-      info(s"Feature ZK node updated at path: $path")
-      queue.add(new FeatureCacheUpdater(path))
-    }
-
-    override def handleDeletion(): Unit = {
-      warn(s"Feature ZK node deleted at path: $path")
-      // This event may happen, rarely (ex: ZK corruption or operational 
error).
-      // In such a case, we prefer to just log a warning and treat the case as 
if the node is absent,
-      // and populate the FinalizedFeatureCache with empty finalized features.
-      queue.add(new FeatureCacheUpdater(path))
-    }
-  }
-
-  object ZkStateChangeHandler extends StateChangeHandler {
-    val path: String = FeatureZNode.path
-
-    override val name: String = path
-
-    override def afterInitializingSession(): Unit = {
-      queue.add(new FeatureCacheUpdater(path))
-    }
-  }
-
-  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
-
-  private val thread = new 
ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
-
-  /**
-   * This method initializes the feature ZK node change listener. Optionally, 
it also ensures to
-   * update the FinalizedFeatureCache once with the latest contents of the 
feature ZK node
-   * (if the node exists). This step helps ensure that feature 
incompatibilities (if any) in brokers
-   * are conveniently detected before the initOrThrow() method returns to the 
caller. If feature
-   * incompatibilities are detected, this method will throw an Exception to 
the caller, and the Broker
-   * will exit eventually.
-   *
-   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature 
cache to be updated once.
-   *                                   (should be > 0)
-   *
-   * @throws Exception if feature incompatibility check could not be finished 
in a timely manner
-   */
-  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
-    if (waitOnceForCacheUpdateMs <= 0) {
-      throw new IllegalArgumentException(
-        s"Expected waitOnceForCacheUpdateMs > 0, but provided: 
$waitOnceForCacheUpdateMs")
-    }
-
-    thread.start()
-    zkClient.registerStateChangeHandler(ZkStateChangeHandler)
-    
zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
-    val ensureCacheUpdateOnce = new FeatureCacheUpdater(
-      FeatureZNodeChangeHandler.path, Some(new CountDownLatch(1)))
-    queue.add(ensureCacheUpdateOnce)
-    try {
-      ensureCacheUpdateOnce.awaitUpdateOrThrow(waitOnceForCacheUpdateMs)
-    } catch {
-      case e: Exception => {
-        close()
-        throw e
-      }
-    }
-  }
-
-  /**
-   * Closes the feature ZK node change listener by unregistering the listener 
from ZK client,
-   * clearing the queue and shutting down the 
ChangeNotificationProcessorThread.
-   */
-  def close(): Unit = {
-    zkClient.unregisterStateChangeHandler(ZkStateChangeHandler.name)
-    zkClient.unregisterZNodeChangeHandler(FeatureZNodeChangeHandler.path)
-    queue.clear()
-    thread.shutdown()
-  }
-
-  // For testing only.
-  def isListenerInitiated: Boolean = {
-    thread.isRunning && thread.isAlive
-  }
-
-  // For testing only.
-  def isListenerDead: Boolean = {
-    !thread.isRunning && !thread.isAlive
-  }
-}

Reply via email to