This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 964252aed6fb16b7952b93e56c3a0cd8f1072861
Author: bkonold <[email protected]>
AuthorDate: Tue Jan 28 15:43:59 2020 -0800

    SAMZA-2446: Invoke onCheckpoint only for registered SSPs (#1260)
---
 .../org/apache/samza/checkpoint/OffsetManager.scala     | 17 +++++++++++------
 .../org/apache/samza/checkpoint/TestOffsetManager.scala | 10 +++++++++-
 2 files changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 33fca8f..442d83f 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -353,13 +353,18 @@ class OffsetManager(
         }
       }
 
-      // invoke checkpoint listeners
-      checkpoint.getOffsets.asScala.groupBy { case (ssp, _) => ssp.getSystem 
}.foreach {
-        case (systemName:String, offsets: Map[SystemStreamPartition, String]) 
=> {
-          // Option is empty if there is no checkpointListener for this 
systemName
-          
checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava))
+      // Invoke checkpoint listeners only for SSPs that are registered with 
the OffsetManager. For example,
+      // changelog SSPs are not registered but may be present in the 
Checkpoint if transactional state checkpointing
+      // is enabled.
+      val registeredSSPs = systemStreamPartitions.getOrElse(taskName, 
Set[SystemStreamPartition]())
+      checkpoint.getOffsets.asScala
+        .filterKeys(registeredSSPs.contains)
+        .groupBy { case (ssp, _) => ssp.getSystem }.foreach {
+          case (systemName:String, offsets: Map[SystemStreamPartition, 
String]) => {
+            // Option is empty if there is no checkpointListener for this 
systemName
+            
checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava))
+          }
         }
-      }
     }
 
     // delete corresponding startpoints after checkpoint is supposed to be 
committed
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 50c793c..677504d 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -388,9 +388,11 @@ class TestOffsetManager {
     val systemName2 = "test-system2"
     val systemStream = new SystemStream(systemName, "test-stream")
     val systemStream2 = new SystemStream(systemName2, "test-stream2")
+    val systemStream3 = new SystemStream(systemName, "test-stream3")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, 
partition)
     val systemStreamPartition2 = new SystemStreamPartition(systemStream2, 
partition)
+    val unregisteredSystemStreamPartition = new 
SystemStreamPartition(systemStream3, partition)
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, 
Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val testStreamMetadata2 = new 
SystemStreamMetadata(systemStream2.getStream, Map(partition -> new 
SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata, 
systemStream2->testStreamMetadata2)
@@ -420,7 +422,10 @@ class TestOffsetManager {
     offsetManager.start
     // Should get offset 45 back from the checkpoint manager, which is last 
processed, and system admin should return 46 as starting offset.
     
assertTrue(startpointManagerUtil.getStartpointManager.getFanOutForTask(taskName).containsKey(systemStreamPartition))
-    checkpoint(offsetManager, taskName)
+    val offsetsToCheckpoint = new java.util.HashMap[SystemStreamPartition, 
String]()
+    
offsetsToCheckpoint.putAll(offsetManager.buildCheckpoint(taskName).getOffsets)
+    offsetsToCheckpoint.put(unregisteredSystemStreamPartition, "50")
+    offsetManager.writeCheckpoint(taskName, new 
Checkpoint(offsetsToCheckpoint))
 
     intercept[IllegalStateException] {
       // StartpointManager should stop after last fan out is removed
@@ -434,6 +439,9 @@ class TestOffsetManager {
     assertEquals("45", consumer.recentCheckpoint.get(systemStreamPartition))
     // make sure only the system with the callbacks gets them
     assertNull(consumer.recentCheckpoint.get(systemStreamPartition2))
+    // even though systemStream and systemStream3 share the same 
checkpointListener, callback should not execute for
+    // systemStream3 since it is not registered with the OffsetManager
+    
assertNull(consumer.recentCheckpoint.get(unregisteredSystemStreamPartition))
 
     offsetManager.update(taskName, systemStreamPartition, "46")
     offsetManager.update(taskName, systemStreamPartition, "47")

Reply via email to