This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new eaf825279 SAMZA-2745: handle processing of watermark and end of stream 
when elasticity is enabled
     new 9f36e727b Merge pull request #1610 from 
lakshmi-manasa-g/elasticity-watermark
eaf825279 is described below

commit eaf8252795aeb6c91ccba107bbef629a1b565775
Author: Manasa <[email protected]>
AuthorDate: Tue May 31 12:26:58 2022 -0700

    SAMZA-2745: handle processing of watermark and end of stream when 
elasticity is enabled
---
 .../org/apache/samza/container/TaskInstance.scala  | 23 ++++++++++++++++++++--
 .../apache/samza/container/TestTaskInstance.scala  | 22 ++++++++++++++++++++-
 2 files changed, 42 insertions(+), 3 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 929864203..d75d91183 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -212,7 +212,7 @@ class TaskInstance(
     // if elasticity is enabled aka elasticity factor > 1
     // then this TaskInstance processes only those envelopes whose key falls
     // within the keyBucket of the SSP assigned to the task.
-    val incomingMessageSsp = 
envelope.getSystemStreamPartition(elasticityFactor)
+    val incomingMessageSsp = getIncomingMessageSSP(envelope)
 
     if (!ssp2CaughtupMapping.getOrElse(incomingMessageSsp,
       throw new SamzaException(incomingMessageSsp + " is not registered!"))) {
@@ -542,7 +542,7 @@ class TaskInstance(
     // if elasticity is enabled aka elasticity factor > 1
     // then this TaskInstance handles only those envelopes whose key falls
     // within the keyBucket of the SSP assigned to the task.
-    val incomingMessageSsp = 
envelope.getSystemStreamPartition(elasticityFactor)
+    var incomingMessageSsp = getIncomingMessageSSP(envelope)
 
     if 
(IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) {
       ssp2CaughtupMapping(incomingMessageSsp) = true
@@ -605,4 +605,23 @@ class TaskInstance(
 
     startingOffset
   }
+
+  private def getIncomingMessageSSP(envelope: IncomingMessageEnvelope): 
SystemStreamPartition = {
+    if (elasticityFactor <= 1) {
+      return envelope.getSystemStreamPartition
+    }
+    // if elasticityFactor > 1, find the SSP with keyBucket
+    var incomingMessageSsp = 
envelope.getSystemStreamPartition(elasticityFactor)
+
+    // if envelope is end of stream or watermark, it needs to be routed to all 
tasks consuming the ssp irresp of keyBucket
+    val messageType = MessageType.of(envelope.getMessage)
+    if (envelope.isEndOfStream() || MessageType.END_OF_STREAM == messageType 
|| MessageType.WATERMARK == messageType) {
+      incomingMessageSsp = systemStreamPartitions
+        .filter(ssp => 
ssp.getSystemStream.equals(incomingMessageSsp.getSystemStream)
+          && ssp.getPartition.equals(incomingMessageSsp.getPartition))
+        .toIterator.next()
+      debug("for watermark or end of stream envelope, found incoming ssp as 
{}" format incomingMessageSsp)
+    }
+    incomingMessageSsp
+  }
 }
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 08b082ca2..77b1ee35c 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -162,7 +162,7 @@ class TestTaskInstance extends AssertionsForJUnit with 
MockitoSugar {
     verify(processesCounter).inc()
     verify(messagesActuallyProcessedCounter).inc()
 
-    // case 1: taskInstance processes the keyBucket=0 of the ssp and envelope 
is NOT from same keyBucket
+    // case 2: taskInstance processes the keyBucket=0 of the ssp and envelope 
is NOT from same keyBucket
     // taskInstance.process should throw the exception ssp is not registered.
     
when(envelope.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
     val thrown = intercept[Exception] {
@@ -171,6 +171,26 @@ class TestTaskInstance extends AssertionsForJUnit with 
MockitoSugar {
     assert(thrown.isInstanceOf[SamzaException])
     assert(thrown.getMessage.contains(notProcessedSSPKeyBucket.toString))
     assert(thrown.getMessage.contains("is not registered!"))
+
+    // case 3: taskInstance processes the keyBucket=0 of the ssp and envelope 
is watermark NOT from same keyBucket
+    // regular processing should happen as Watermark and end of stream should 
be processed by all tasks
+    val watermark = 
spy(IncomingMessageEnvelope.buildWatermarkEnvelope(SYSTEM_STREAM_PARTITION, 
1234l))
+    
when(watermark.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
+    this.taskInstance.process(watermark, coordinator, callbackFactory)
+    assertEquals(2, this.taskInstanceExceptionHandler.numTimesCalled) // case 
1 and case 3
+    verify(this.task).processAsync(watermark, this.collector, coordinator, 
callback)
+    verify(processesCounter, times(3)).inc() // case 1, 2 and 3
+    verify(messagesActuallyProcessedCounter, times(2)).inc() // case 1 and 3
+
+    // case 4: taskInstance processes the keyBucket=0 of the ssp and envelope 
is EndOfStream NOT from same keyBucket
+    // regular processing should happen as Watermark and end of stream should 
be processed by all tasks
+    val endOfStream = 
spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SYSTEM_STREAM_PARTITION))
+    
when(endOfStream.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
+    this.taskInstance.process(endOfStream, coordinator, callbackFactory)
+    assertEquals(3, this.taskInstanceExceptionHandler.numTimesCalled) // case 
1 and case 3 and case 4
+    verify(this.task).processAsync(endOfStream, this.collector, coordinator, 
callback)
+    verify(processesCounter, times(4)).inc() // case 1, 2, 3 and 4
+    verify(messagesActuallyProcessedCounter, times(3)).inc() // case 1 and 3 
and 4
   }
 
   @Test

Reply via email to