dajac commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1108755031


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -44,11 +44,40 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchIsolation, FetchParams, LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchIsolation, FetchParams, LogOffsetMetadata, LogOffsetsListener}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition 
fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No 
further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = 
{}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}
+
+  /**
+   * Called when the Partition is deleted.
+   */
+  def onDeleted(partition: TopicPartition): Unit = {}

Review Comment:
   I kind of like `onDeleted` because it goes well with `onFailed`. Isn't it 
implicit that we are talking about the local partition (== replica)? I could 
perhaps make this clearer in the javadoc.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1843,8 +1866,11 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock 
synchronized {
-    allPartitions.put(tp, HostedPartition.Offline)
-    Partition.removeMetrics(tp)
+    allPartitions.put(tp, HostedPartition.Offline) match {
+      case HostedPartition.Online(partition) =>
+        partition.fail()

Review Comment:
   `markOffline` sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to