This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 49354c6a9 SAMZA-2734: [Elasticity] Update last processed offset after 
an envelope is finished processing when elasticity is enabled
     new 7dc3de5c6 Merge pull request #1607 from 
lakshmi-manasa-g/elasticity-checkpoint-rw
49354c6a9 is described below

commit 49354c6a99a08916b7c009505f0aec0dfa9283cd
Author: Manasa <[email protected]>
AuthorDate: Tue May 17 15:28:32 2022 -0700

    SAMZA-2734: [Elasticity] Update last processed offset after an envelope is 
finished processing when elasticity is enabled
---
 .../apache/samza/checkpoint/OffsetManager.scala    | 12 ++++++-
 .../org/apache/samza/system/SystemConsumers.scala  |  4 +--
 .../samza/checkpoint/TestOffsetManager.scala       | 41 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 3 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index eb5179222..bcb59e082 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -210,9 +210,19 @@ class OffsetManager(
    * Set the last processed offset for a given SystemStreamPartition.
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, 
offset: String) {
+    // without elasticity enabled, there is exactly one entry of an ssp in the 
systemStreamPartitions map for a taskName
+    // with elasticity enabled, there is exactly one of the keyBuckets of an 
ssp that a task consumes
+    // and hence exactly one entry of an ssp with keyBucket in in the 
systemStreamPartitions map for a taskName
+    // hence from the given ssp, find its sspWithKeybucket for the task and 
use that for updating lastProcessedOffsets
+    val sspWithKeyBucket = systemStreamPartitions.getOrElse(taskName,
+      throw new SamzaException("No SSPs registered for task: " + taskName))
+      .filter(ssp => 
ssp.getSystemStream.equals(systemStreamPartition.getSystemStream)
+        && ssp.getPartition.equals(systemStreamPartition.getPartition))
+      .toIterator.next()
+
     lastProcessedOffsets.putIfAbsent(taskName, new 
ConcurrentHashMap[SystemStreamPartition, String]())
     if (offset != null && 
!offset.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
-      lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
+      lastProcessedOffsets.get(taskName).put(sspWithKeyBucket, offset)
     }
   }
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 9367cd73a..4a24303cb 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -188,6 +188,8 @@ class SystemConsumers (
       // but the actual systemConsumer which consumes from the input does not 
know about KeyBucket.
       // hence, use an SSP without KeyBucket
       consumer.register(removeKeyBucket(systemStreamPartition), offset)
+      chooser.register(removeKeyBucket(systemStreamPartition), offset)
+      debug("consumer.register and chooser.register for ssp: %s with offset 
%s" format (systemStreamPartition, offset))
     }
 
     debug("Starting consumers.")
@@ -244,8 +246,6 @@ class SystemConsumers (
     metrics.registerSystemStreamPartition(systemStreamPartition)
     unprocessedMessagesBySSP.put(systemStreamPartition, new 
ArrayDeque[IncomingMessageEnvelope]())
 
-    chooser.register(systemStreamPartition, offset)
-
     try {
       val consumer = consumers(systemStreamPartition.getSystem)
       val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 3949ecf18..3c226c4c6 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -579,6 +579,47 @@ class TestOffsetManager {
     assertEquals("60", modifiedOffsets.get(ssp6))
   }
 
+  @Test
+  def testElasticityUpdateWithoutKeyBucket: Unit = {
+    // When elasticity is enabled, task consumes a part of the full SSP 
represented by SSP With KeyBucket.
+    // OffsetManager tracks the set of SSP with KeyBucket consumed by a task.
+    // However, after an IME processing is complete, OffsetManager.update is 
called without KeyBuket.
+    // OffsetManager has to find and udpate the last processed offset for the 
task correctly for its SSP with KeyBucket.
+    val taskName = new TaskName("c")
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, 
partition)
+    val systemStreamPartitionWithKeyBucket = new 
SystemStreamPartition(systemStreamPartition, 0);
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, 
Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName)
+    val startpointManagerUtil = getStartpointManagerUtil()
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
+    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, 
checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, 
Map(), new OffsetManagerMetrics)
+    // register task and its input SSP with KeyBucket
+    offsetManager.register(taskName, Set(systemStreamPartitionWithKeyBucket))
+
+    offsetManager.start
+
+    // update is called with only the full SSP and no keyBucket information.
+    offsetManager.update(taskName, systemStreamPartition, "46")
+    // Get checkpoint snapshot like we do at the beginning of 
TaskInstance.commit()
+    val checkpoint46 = offsetManager.getLastProcessedOffsets(taskName)
+    offsetManager.update(taskName, systemStreamPartition, "47") // Offset 
updated before checkpoint
+    offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint46))
+    // OffsetManager correctly updates the lastProcessedOffset and checkpoint 
for the task and input SSP with KeyBucket.
+    assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, 
systemStreamPartitionWithKeyBucket))
+    assertEquals("46", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue)
+
+    // Now write the checkpoint for the latest offset
+    val checkpoint47 = offsetManager.getLastProcessedOffsets(taskName)
+    offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint47))
+
+    assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, 
systemStreamPartitionWithKeyBucket))
+    assertEquals("47", 
offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue)
+  }
+
   // Utility method to create and write checkpoint in one statement
   def checkpoint(offsetManager: OffsetManager, taskName: TaskName): Unit = {
     offsetManager.writeCheckpoint(taskName, new 
CheckpointV1(offsetManager.getLastProcessedOffsets(taskName)))

Reply via email to