This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9d5c2c4 SAMZA-2593: Update task callback to store only necessary
fields instead of the message envelope (#1433)
9d5c2c4 is described below
commit 9d5c2c44e6b8bcc608c4042cef382a8ceb706cf4
Author: mynameborat <[email protected]>
AuthorDate: Thu Sep 24 12:42:42 2020 -0700
SAMZA-2593: Update task callback to store only necessary fields instead of
the message envelope (#1433)
Problem: TaskCallbackImpl currently stores the entire message envelope. In
case of jobs that have 1000s of pending callbacks, this adds up to significant
memory pressure. The only use for message envelope is to trace the SSP
information and the offset associated with it so that we commit the right
offset during our commit cycle.
Changes: Modify TaskCallbackImpl to only store SSP and Offset information.
---
.../java/org/apache/samza/container/RunLoop.java | 9 ++---
.../org/apache/samza/task/TaskCallbackImpl.java | 29 ++++++++++-----
.../apache/samza/task/TestTaskCallbackManager.java | 42 +++++++++++-----------
3 files changed, 47 insertions(+), 33 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index f0968fc..57df840 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -621,16 +621,17 @@ public class RunLoop implements Runnable, Throttleable {
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
containerMetrics.processNs().update(clock.nanoTime() -
callbackImpl.getTimeCreatedNs());
log.trace("Got callback complete for task {}, ssp {}",
- callbackImpl.getTaskName(),
callbackImpl.getEnvelope().getSystemStreamPartition());
+ callbackImpl.getTaskName(),
callbackImpl.getSystemStreamPartition());
List<TaskCallbackImpl> callbacksToUpdate =
callbackManager.updateCallback(callbackImpl);
for (TaskCallbackImpl callbackToUpdate : callbacksToUpdate) {
- IncomingMessageEnvelope envelope =
callbackToUpdate.getEnvelope();
- log.trace("Update offset for ssp {}, offset {}",
envelope.getSystemStreamPartition(), envelope.getOffset());
+ log.trace("Update offset for ssp {}, offset {}",
callbackToUpdate.getSystemStreamPartition(),
+ callbackToUpdate.getOffset());
// update offset
if (task.offsetManager() != null) {
- task.offsetManager().update(task.taskName(),
envelope.getSystemStreamPartition(), envelope.getOffset());
+ task.offsetManager().update(task.taskName(),
callbackToUpdate.getSystemStreamPartition(),
+ callbackToUpdate.getOffset());
}
// update coordinator
diff --git
a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
index 0ba2032..3be3eba 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
@@ -19,12 +19,14 @@
package org.apache.samza.task;
+import com.google.common.base.Preconditions;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,13 +40,16 @@ public class TaskCallbackImpl implements TaskCallback,
Comparable<TaskCallbackIm
private static final Logger log =
LoggerFactory.getLogger(TaskCallbackImpl.class);
final TaskName taskName;
- final IncomingMessageEnvelope envelope;
final ReadableCoordinator coordinator;
final long timeCreatedNs;
+
private final AtomicBoolean isComplete = new AtomicBoolean(false);
+ private final long seqNum;
private final TaskCallbackListener listener;
+ private final String offset;
+ private final SystemStreamPartition systemStreamPartition;
+
private ScheduledFuture scheduledFuture = null;
- private final long seqNum;
public TaskCallbackImpl(TaskCallbackListener listener,
TaskName taskName,
@@ -52,9 +57,11 @@ public class TaskCallbackImpl implements TaskCallback,
Comparable<TaskCallbackIm
ReadableCoordinator coordinator,
long seqNum,
long timeCreatedNs) {
+ Preconditions.checkNotNull(envelope, "Incoming message envelope cannot be
null");
this.listener = listener;
this.taskName = taskName;
- this.envelope = envelope;
+ this.offset = envelope.getOffset();
+ this.systemStreamPartition = envelope.getSystemStreamPartition();
this.coordinator = coordinator;
this.seqNum = seqNum;
this.timeCreatedNs = timeCreatedNs;
@@ -64,8 +71,12 @@ public class TaskCallbackImpl implements TaskCallback,
Comparable<TaskCallbackIm
return taskName;
}
- public IncomingMessageEnvelope getEnvelope() {
- return envelope;
+ public String getOffset() {
+ return offset;
+ }
+
+ public SystemStreamPartition getSystemStreamPartition() {
+ return systemStreamPartition;
}
public ReadableCoordinator getCoordinator() {
@@ -82,13 +93,13 @@ public class TaskCallbackImpl implements TaskCallback,
Comparable<TaskCallbackIm
scheduledFuture.cancel(true);
}
log.trace("Callback complete for task {}, ssp {}, offset {}.",
- new Object[] {taskName, envelope.getSystemStreamPartition(),
envelope.getOffset()});
+ new Object[] {taskName, systemStreamPartition, offset});
if (isComplete.compareAndSet(false, true)) {
listener.onComplete(this);
} else {
String msg = String.format("Callback complete was invoked after
completion for task %s, ssp %s, offset %s.",
- taskName, envelope.getSystemStreamPartition(), envelope.getOffset());
+ taskName, systemStreamPartition, offset);
listener.onFailure(this, new IllegalStateException(msg));
}
}
@@ -101,11 +112,11 @@ public class TaskCallbackImpl implements TaskCallback,
Comparable<TaskCallbackIm
if (isComplete.compareAndSet(false, true)) {
String msg = String.format("Callback failed for task %s, ssp %s, offset
%s.",
- taskName, envelope.getSystemStreamPartition(), envelope.getOffset());
+ taskName, systemStreamPartition, offset);
listener.onFailure(this, new SamzaException(msg, t));
} else {
String msg = String.format("Task callback failure was invoked after
completion for task %s, ssp %s, offset %s.",
- taskName, envelope.getSystemStreamPartition(), envelope.getOffset());
+ taskName, systemStreamPartition, offset);
listener.onFailure(this, new IllegalStateException(msg, t));
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
index c7b3f47..405157a 100644
---
a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
+++
b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
@@ -31,6 +31,8 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
public class TestTaskCallbackManager {
TaskCallbackManager callbackManager = null;
@@ -52,10 +54,10 @@ public class TestTaskCallbackManager {
@Test
public void testCreateCallback() {
- TaskCallbackImpl callback = callbackManager.createCallback(new
TaskName("Partition 0"), null, null);
+ TaskCallbackImpl callback = callbackManager.createCallback(new
TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null);
assertTrue(callback.matchSeqNum(0));
- callback = callbackManager.createCallback(new TaskName("Partition 0"),
null, null);
+ callback = callbackManager.createCallback(new TaskName("Partition 0"),
mock(IncomingMessageEnvelope.class), null);
assertTrue(callback.matchSeqNum(1));
}
@@ -71,8 +73,8 @@ public class TestTaskCallbackManager {
assertEquals(1, callbacksToUpdate.size());
TaskCallbackImpl callback = callbacksToUpdate.get(0);
assertTrue(callback.matchSeqNum(0));
- assertEquals(ssp, callback.envelope.getSystemStreamPartition());
- assertEquals("0", callback.envelope.getOffset());
+ assertEquals(ssp, callback.getSystemStreamPartition());
+ assertEquals("0", callback.getOffset());
IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp, "1",
null, null);
TaskCallbackImpl callback1 = new TaskCallbackImpl(listener, taskName,
envelope1, coordinator, 1, 0);
@@ -80,8 +82,8 @@ public class TestTaskCallbackManager {
assertEquals(1, callbacksToUpdate.size());
callback = callbacksToUpdate.get(0);
assertTrue(callback.matchSeqNum(1));
- assertEquals(ssp, callback.envelope.getSystemStreamPartition());
- assertEquals("1", callback.envelope.getOffset());
+ assertEquals(ssp, callback.getSystemStreamPartition());
+ assertEquals("1", callback.getOffset());
}
@Test
@@ -107,18 +109,18 @@ public class TestTaskCallbackManager {
assertEquals(3, callbacksToUpdate.size());
TaskCallbackImpl callback = callbacksToUpdate.get(0);
assertTrue(callback.matchSeqNum(0));
- assertEquals(ssp, callback.envelope.getSystemStreamPartition());
- assertEquals("0", callback.envelope.getOffset());
+ assertEquals(ssp, callback.getSystemStreamPartition());
+ assertEquals("0", callback.getOffset());
callback = callbacksToUpdate.get(1);
assertTrue(callback.matchSeqNum(1));
- assertEquals(ssp, callback.envelope.getSystemStreamPartition());
- assertEquals("1", callback.envelope.getOffset());
+ assertEquals(ssp, callback.getSystemStreamPartition());
+ assertEquals("1", callback.getOffset());
callback = callbacksToUpdate.get(2);
assertTrue(callback.matchSeqNum(2));
- assertEquals(ssp, callback.envelope.getSystemStreamPartition());
- assertEquals("2", callback.envelope.getOffset());
+ assertEquals(ssp, callback.getSystemStreamPartition());
+ assertEquals("2", callback.getOffset());
}
@Test
@@ -150,14 +152,14 @@ public class TestTaskCallbackManager {
//Check for envelope0
TaskCallbackImpl taskCallback = callbacksToUpdate.get(0);
assertTrue(taskCallback.matchSeqNum(0));
- assertEquals(ssp, taskCallback.envelope.getSystemStreamPartition());
- assertEquals("0", taskCallback.envelope.getOffset());
+ assertEquals(ssp, taskCallback.getSystemStreamPartition());
+ assertEquals("0", taskCallback.getOffset());
//Check for envelope1
taskCallback = callbacksToUpdate.get(1);
assertTrue(taskCallback.matchSeqNum(1));
- assertEquals(ssp, taskCallback.envelope.getSystemStreamPartition());
- assertEquals("1", taskCallback.envelope.getOffset());
+ assertEquals(ssp, taskCallback.getSystemStreamPartition());
+ assertEquals("1", taskCallback.getOffset());
}
@Test
@@ -199,12 +201,12 @@ public class TestTaskCallbackManager {
assertEquals(2, callbacksToUpdate.size());
TaskCallbackImpl callback = callbacksToUpdate.get(0);
assertTrue(callback.matchSeqNum(0));
- assertEquals(envelope0.getSystemStreamPartition(),
callback.envelope.getSystemStreamPartition());
- assertEquals(envelope0.getOffset(), callback.envelope.getOffset());
+ assertEquals(envelope0.getSystemStreamPartition(),
callback.getSystemStreamPartition());
+ assertEquals(envelope0.getOffset(), callback.getOffset());
callback = callbacksToUpdate.get(1);
assertTrue(callback.matchSeqNum(1));
- assertEquals(envelope1.getSystemStreamPartition(),
callback.envelope.getSystemStreamPartition());
- assertEquals(envelope1.getOffset(), callback.envelope.getOffset());
+ assertEquals(envelope1.getSystemStreamPartition(),
callback.getSystemStreamPartition());
+ assertEquals(envelope1.getOffset(), callback.getOffset());
}
}