This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new eaf825279 SAMZA-2745: handle processing of watermark and end of stream
when elasticity is enabled
new 9f36e727b Merge pull request #1610 from
lakshmi-manasa-g/elasticity-watermark
eaf825279 is described below
commit eaf8252795aeb6c91ccba107bbef629a1b565775
Author: Manasa <[email protected]>
AuthorDate: Tue May 31 12:26:58 2022 -0700
SAMZA-2745: handle processing of watermark and end of stream when
elasticity is enabled
---
.../org/apache/samza/container/TaskInstance.scala | 23 ++++++++++++++++++++--
.../apache/samza/container/TestTaskInstance.scala | 22 ++++++++++++++++++++-
2 files changed, 42 insertions(+), 3 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 929864203..d75d91183 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -212,7 +212,7 @@ class TaskInstance(
// if elasticity is enabled aka elasticity factor > 1
// then this TaskInstance processes only those envelopes whose key falls
// within the keyBucket of the SSP assigned to the task.
- val incomingMessageSsp =
envelope.getSystemStreamPartition(elasticityFactor)
+ val incomingMessageSsp = getIncomingMessageSSP(envelope)
if (!ssp2CaughtupMapping.getOrElse(incomingMessageSsp,
throw new SamzaException(incomingMessageSsp + " is not registered!"))) {
@@ -542,7 +542,7 @@ class TaskInstance(
// if elasticity is enabled aka elasticity factor > 1
// then this TaskInstance handles only those envelopes whose key falls
// within the keyBucket of the SSP assigned to the task.
- val incomingMessageSsp =
envelope.getSystemStreamPartition(elasticityFactor)
+ var incomingMessageSsp = getIncomingMessageSSP(envelope)
if
(IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) {
ssp2CaughtupMapping(incomingMessageSsp) = true
@@ -605,4 +605,23 @@ class TaskInstance(
startingOffset
}
+
+ private def getIncomingMessageSSP(envelope: IncomingMessageEnvelope):
SystemStreamPartition = {
+ if (elasticityFactor <= 1) {
+ return envelope.getSystemStreamPartition
+ }
+ // if elasticityFactor > 1, find the SSP with keyBucket
+ var incomingMessageSsp =
envelope.getSystemStreamPartition(elasticityFactor)
+
+ // if envelope is end of stream or watermark, it needs to be routed to all
tasks consuming the ssp irresp of keyBucket
+ val messageType = MessageType.of(envelope.getMessage)
+ if (envelope.isEndOfStream() || MessageType.END_OF_STREAM == messageType
|| MessageType.WATERMARK == messageType) {
+ incomingMessageSsp = systemStreamPartitions
+ .filter(ssp =>
ssp.getSystemStream.equals(incomingMessageSsp.getSystemStream)
+ && ssp.getPartition.equals(incomingMessageSsp.getPartition))
+ .toIterator.next()
+ debug("for watermark or end of stream envelope, found incoming ssp as
{}" format incomingMessageSsp)
+ }
+ incomingMessageSsp
+ }
}
diff --git
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 08b082ca2..77b1ee35c 100644
---
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -162,7 +162,7 @@ class TestTaskInstance extends AssertionsForJUnit with
MockitoSugar {
verify(processesCounter).inc()
verify(messagesActuallyProcessedCounter).inc()
- // case 1: taskInstance processes the keyBucket=0 of the ssp and envelope
is NOT from same keyBucket
+ // case 2: taskInstance processes the keyBucket=0 of the ssp and envelope
is NOT from same keyBucket
// taskInstance.process should throw the exception ssp is not registered.
when(envelope.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
val thrown = intercept[Exception] {
@@ -171,6 +171,26 @@ class TestTaskInstance extends AssertionsForJUnit with
MockitoSugar {
assert(thrown.isInstanceOf[SamzaException])
assert(thrown.getMessage.contains(notProcessedSSPKeyBucket.toString))
assert(thrown.getMessage.contains("is not registered!"))
+
+ // case 3: taskInstance processes the keyBucket=0 of the ssp and envelope
is watermark NOT from same keyBucket
+ // regular processing should happen as Watermark and end of stream should
be processed by all tasks
+ val watermark =
spy(IncomingMessageEnvelope.buildWatermarkEnvelope(SYSTEM_STREAM_PARTITION,
1234l))
+
when(watermark.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
+ this.taskInstance.process(watermark, coordinator, callbackFactory)
+ assertEquals(2, this.taskInstanceExceptionHandler.numTimesCalled) // case
1 and case 3
+ verify(this.task).processAsync(watermark, this.collector, coordinator,
callback)
+ verify(processesCounter, times(3)).inc() // case 1, 2 and 3
+ verify(messagesActuallyProcessedCounter, times(2)).inc() // case 1 and 3
+
+ // case 4: taskInstance processes the keyBucket=0 of the ssp and envelope
is EndOfStream NOT from same keyBucket
+ // regular processing should happen as Watermark and end of stream should
be processed by all tasks
+ val endOfStream =
spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SYSTEM_STREAM_PARTITION))
+
when(endOfStream.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
+ this.taskInstance.process(endOfStream, coordinator, callbackFactory)
+ assertEquals(3, this.taskInstanceExceptionHandler.numTimesCalled) // case
1 and case 3 and case 4
+ verify(this.task).processAsync(endOfStream, this.collector, coordinator,
callback)
+ verify(processesCounter, times(4)).inc() // case 1, 2, 3 and 4
+ verify(messagesActuallyProcessedCounter, times(3)).inc() // case 1 and 3
and 4
}
@Test