This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 95291432fa3 KAFKA-14412: Use shared cache for Task offset sums (#20954)
95291432fa3 is described below
commit 95291432fa3f31262a07e58af2247c73a92af8d6
Author: Nick Telford <[email protected]>
AuthorDate: Wed Feb 25 19:00:36 2026 +0000
KAFKA-14412: Use shared cache for Task offset sums (#20954)
Instead of reading Task state offsets for non-open Tasks from the
`.checkpoint` file, we now maintain an in-memory cache of the latest
changelog offsets for every Task on the instance.
On start-up, this cache is seeded with the changelog offsets for every
on-disk StateStore. Running Active and Standby Tasks then update this
cache on every checkpoint to ensure it always reflects the offsets
on-disk.
This breaks the tight coupling between `TaskManager` and `.checkpoint`
files, which will enable us to remove `.checkpoint` files in a later
commit as part of KIP-1035.
Reviewers: Eduwer Camacaro<[email protected]>, Bill Bejeck
<[email protected]>
---
.../processor/internals/ProcessorStateManager.java | 9 ++++
.../processor/internals/StateDirectory.java | 49 +++++++++++++++++
.../processor/internals/StateManagerUtil.java | 1 +
.../streams/processor/internals/TaskManager.java | 61 +++-------------------
.../processor/internals/StateManagerUtilTest.java | 2 +
.../processor/internals/TaskManagerTest.java | 57 +++++++++++---------
6 files changed, 102 insertions(+), 77 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index f77a1f9632b..86725805ce5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -176,6 +176,7 @@ public class ProcessorStateManager implements StateManager {
private final FixedOrderMap<String, StateStoreMetadata> stores = new
FixedOrderMap<>();
private final FixedOrderMap<String, StateStore> globalStores = new
FixedOrderMap<>();
+ private final StateDirectory stateDirectory;
private final File baseDir;
private final OffsetCheckpoint checkpointFile;
@@ -211,6 +212,7 @@ public class ProcessorStateManager implements StateManager {
this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
this.checkpointFile = new
OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
+ this.stateDirectory = stateDirectory;
log.debug("Created state store manager for task {}", taskId);
}
@@ -300,6 +302,8 @@ public class ProcessorStateManager implements StateManager {
}
}
+ stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
+
if (!loadedCheckpoints.isEmpty()) {
log.warn("Some loaded checkpoint offsets cannot find their
corresponding state stores: {}", loadedCheckpoints);
}
@@ -462,10 +466,13 @@ public class ProcessorStateManager implements
StateManager {
}
storeMetadata.setOffset(batchEndOffset);
+
// If null means the lag for this partition is not known yet
if (optionalLag.isPresent()) {
storeMetadata.setEndOffset(optionalLag.getAsLong() +
batchEndOffset);
}
+
+ stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
}
}
@@ -647,6 +654,8 @@ public class ProcessorStateManager implements StateManager {
store.stateStore.name(), store.offset,
store.changelogPartition);
}
}
+
+ stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 6c4e97c101e..701ac8195d7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.ThreadCache;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -65,6 +66,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -124,6 +126,7 @@ public class StateDirectory implements AutoCloseable {
private final StreamsConfig config;
private final Set<TaskId> tasksInLocalState = new
ConcurrentSkipListSet<>();
+ private final Map<TaskId, Long> taskOffsetSums = new ConcurrentHashMap<>();
/**
* Ensures that the state base directory as well as the application's
sub-directory are created.
@@ -295,6 +298,44 @@ public class StateDirectory implements AutoCloseable {
}
}
+ public Map<TaskId, Long> taskOffsetSums(final Set<TaskId> tasks) {
+ return taskOffsetSums.entrySet().stream()
+ .filter(e -> tasks.contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
+ public void updateTaskOffsets(final TaskId taskId, final
Map<TopicPartition, Long> changelogOffsets) {
+ if (!changelogOffsets.isEmpty()) {
+ taskOffsetSums.put(taskId, sumOfChangelogOffsets(taskId,
changelogOffsets));
+ }
+ }
+
+ public void removeTaskOffsets(final TaskId taskId) {
+ taskOffsetSums.remove(taskId);
+ }
+
+ private long sumOfChangelogOffsets(final TaskId taskId, final
Map<TopicPartition, Long> changelogOffsets) {
+ long offsetSum = 0L;
+ for (final Map.Entry<TopicPartition, Long> changelogEntry :
changelogOffsets.entrySet()) {
+ final long offset = changelogEntry.getValue();
+
+ if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
+ if (offset < 0) {
+ throw new StreamsException(
+ new IllegalStateException("Expected not to get a
sentinel offset, but got: " + changelogEntry),
+ taskId);
+ }
+ offsetSum += offset;
+ if (offsetSum < 0) {
+ log.warn("Sum of changelog offsets for task {} overflowed,
pinning to Long.MAX_VALUE", taskId);
+ return Long.MAX_VALUE;
+ }
+ }
+ }
+
+ return offsetSum;
+ }
+
public UUID initializeProcessId() {
if (!hasPersistentStores) {
final UUID processId = UUID.randomUUID();
@@ -502,6 +543,7 @@ public class StateDirectory implements AutoCloseable {
public void close() {
if (hasPersistentStores) {
unlockStartupStores();
+ taskOffsetSums.clear();
try {
stateDirLock.release();
stateDirLockChannel.close();
@@ -582,6 +624,7 @@ public class StateDirectory implements AutoCloseable {
final long now = time.milliseconds();
final long lastModifiedMs =
taskDir.file().lastModified();
if (now - cleanupDelayMs > lastModifiedMs) {
+ removeTaskOffsets(id);
log.info("{} Deleting obsolete state directory {}
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
logPrefix(), dirName, id, now -
lastModifiedMs, cleanupDelayMs);
removeStartupState(id);
@@ -620,6 +663,9 @@ public class StateDirectory implements AutoCloseable {
);
if (namedTopologyDirs != null) {
for (final File namedTopologyDir : namedTopologyDirs) {
+ final String topologyName =
parseNamedTopologyFromDirectory(namedTopologyDir.getName());
+ final Set<TaskId> taskKeys = taskOffsetSums.keySet();
+ taskKeys.removeIf(taskId ->
taskId.topologyName().equals(topologyName));
final File[] contents = namedTopologyDir.listFiles();
if (contents != null && contents.length == 0) {
try {
@@ -657,6 +703,8 @@ public class StateDirectory implements AutoCloseable {
log.debug("Tried to clear out the local state for NamedTopology {}
but none was found", topologyName);
}
try {
+ final Set<TaskId> taskKeys = taskOffsetSums.keySet();
+ taskKeys.removeIf(taskId ->
taskId.topologyName().equals(topologyName));
Utils.delete(namedTopologyDir);
} catch (final IOException e) {
log.error("Hit an unexpected error while clearing local state for
topology " + topologyName, e);
@@ -670,6 +718,7 @@ public class StateDirectory implements AutoCloseable {
log.warn("Found some still-locked task directories when user
requested to cleaning up the state, "
+ "since Streams is not running any more these will be ignored
to complete the cleanup");
}
+ taskOffsetSums.clear();
final AtomicReference<Exception> firstException = new
AtomicReference<>();
for (final TaskDirectory taskDir : listAllTaskDirectories()) {
final String dirName = taskDir.file().getName();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index 6706ed89543..e2b92352bf2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -144,6 +144,7 @@ final class StateManagerUtil {
try {
if (wipeStateStore) {
log.debug("Wiping state stores for {} task {}",
taskType, id);
+ stateDirectory.removeTaskOffsets(id);
// we can just delete the whole dir of the task,
including the state store images and the checkpoint files,
// and then we write an empty checkpoint file
indicating that the previous close is graceful and we just
// need to re-bootstrap the restoration from the
beginning
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 3888e4384ba..1657b3a414b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -40,15 +40,12 @@ import
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ProcessId;
-import
org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
-import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import java.io.File;
-import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -1242,7 +1239,6 @@ public class TaskManager {
* Does not include stateless or non-logged tasks.
*/
public Map<TaskId, Long> taskOffsetSums() {
- final Map<TaskId, Long> taskOffsetSums = new HashMap<>();
// Not all tasks will create directories, and there may be directories
for tasks we don't currently own,
// so we consider all tasks that are either owned or on disk. This
includes stateless tasks, which should
@@ -1250,27 +1246,14 @@ public class TaskManager {
final Map<TaskId, Task> tasks = allTasks();
final Set<TaskId>
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =
union(HashSet::new, lockedTaskDirectories, tasks.keySet());
- for (final Task task : tasks.values()) {
- if (task.state() != State.CREATED && task.state() != State.CLOSED)
{
- final Map<TopicPartition, Long> changelogOffsets =
task.changelogOffsets();
- if (changelogOffsets.isEmpty()) {
- log.debug("Skipping to encode apparently stateless (or
non-logged) offset sum for task {}",
- task.id());
- } else {
- taskOffsetSums.put(task.id(),
sumOfChangelogOffsets(task.id(), changelogOffsets));
- }
-
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks.remove(task.id());
- }
- }
- for (final TaskId id :
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks) {
- final File checkpointFile = stateDirectory.checkpointFileFor(id);
- try {
- if (checkpointFile.exists()) {
- taskOffsetSums.put(id, sumOfChangelogOffsets(id, new
OffsetCheckpoint(checkpointFile).read()));
- }
- } catch (final IOException e) {
- log.warn(String.format("Exception caught while trying to read
checkpoint for task %s:", id), e);
+ final Map<TaskId, Long> taskOffsetSums =
stateDirectory.taskOffsetSums(lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks);
+
+ // overlay latest offsets from assigned tasks
+ for (final Task task : tasks.values()) {
+ // exclude stateless and non-logged tasks
+ if (task.isActive() && task.state() == State.RUNNING &&
!task.changelogPartitions().isEmpty()) {
+ taskOffsetSums.put(task.id(), Task.LATEST_OFFSET);
}
}
@@ -1289,7 +1272,7 @@ public class TaskManager {
lockedTaskDirectories.clear();
final Map<TaskId, Task> allTasks = allTasks();
- for (final TaskDirectory taskDir :
stateDirectory.listNonEmptyTaskDirectories()) {
+ for (final StateDirectory.TaskDirectory taskDir :
stateDirectory.listNonEmptyTaskDirectories()) {
final File dir = taskDir.file();
final String namedTopology = taskDir.namedTopology();
try {
@@ -1343,34 +1326,6 @@ public class TaskManager {
}
}
- private long sumOfChangelogOffsets(final TaskId id, final
Map<TopicPartition, Long> changelogOffsets) {
- long offsetSum = 0L;
- for (final Map.Entry<TopicPartition, Long> changelogEntry :
changelogOffsets.entrySet()) {
- final long offset = changelogEntry.getValue();
-
-
- if (offset == Task.LATEST_OFFSET) {
- // this condition can only be true for active tasks; never for
standby
- // for this case, the offset of all partitions is set to
`LATEST_OFFSET`
- // and we "forward" the sentinel value directly
- return Task.LATEST_OFFSET;
- } else if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
- if (offset < 0) {
- throw new StreamsException(
- new IllegalStateException("Expected not to get a
sentinel offset, but got: " + changelogEntry),
- id);
- }
- offsetSum += offset;
- if (offsetSum < 0) {
- log.warn("Sum of changelog offsets for task {} overflowed,
pinning to Long.MAX_VALUE", id);
- return Long.MAX_VALUE;
- }
- }
- }
-
- return offsetSum;
- }
-
private void closeTaskDirty(final Task task, final boolean
removeFromTasksRegistry) {
try {
// we call this function only to flush the case if necessary
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
index 5d0acf46bd5..1ee6594d119 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
@@ -171,6 +171,7 @@ public class StateManagerUtilTest {
"logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).removeTaskOffsets(taskId);
inOrder.verify(stateDirectory).unlock(taskId);
verifyNoMoreInteractions(stateManager, stateDirectory);
}
@@ -211,6 +212,7 @@ public class StateManagerUtilTest {
}
inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).removeTaskOffsets(taskId);
inOrder.verify(stateDirectory).unlock(taskId);
verifyNoMoreInteractions(stateManager, stateDirectory);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index ff2b76c5f5d..8f9d228c273 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -95,6 +95,7 @@ import static org.apache.kafka.common.utils.Utils.union;
import static
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
@@ -1836,6 +1837,19 @@ public class TaskManagerTest {
);
}
+ @Test
+ public void shouldNotComputeOffsetSumForRunningStatelessTask() {
+ final StreamTask runningStatelessTask =
statelessTask(taskId00).inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatelessTask)));
+
+ assertThat(
+ taskManager.taskOffsetSums(),
+ is(emptyMap())
+ );
+ }
+
@Test
public void shouldComputeOffsetSumForNonRunningActiveTask() throws
Exception {
final StreamTask restoringStatefulTask = statefulTask(taskId00,
taskId00ChangelogPartitions)
@@ -1852,6 +1866,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
+
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
}
@@ -1874,6 +1889,7 @@ public class TaskManagerTest {
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
taskManager.handleRebalanceStart(singleton("topic"));
+
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
}
@@ -1885,12 +1901,11 @@ public class TaskManagerTest {
when(restoringStandbyTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog,
changelogOffset)));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
- final Map<TopicPartition, Long> changelogOffsetInCheckpoint =
mkMap(mkEntry(t1p0changelog, 24L));
- writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask));
taskManager.handleRebalanceStart(singleton("topic"));
+
when(stateDirectory.taskOffsetSums(Collections.singleton(taskId00))).thenReturn(mkMap(mkEntry(taskId00,
changelogOffset)));
assertThat(taskManager.taskOffsetSums(), is(mkMap(mkEntry(taskId00,
changelogOffset))));
}
@@ -1916,6 +1931,12 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask,
restoringStatefulTask));
+ when(stateDirectory.taskOffsetSums(Set.of(taskId00, taskId01,
taskId02)))
+ .thenReturn(mkMap(
+ mkEntry(taskId00, changelogOffsetOfRunningTask),
+ mkEntry(taskId01,
changelogOffsetOfRestoringStatefulTask),
+ mkEntry(taskId02,
changelogOffsetOfRestoringStandbyTask)
+ ));
assertThat(
taskManager.taskOffsetSums(),
@@ -1942,12 +1963,12 @@ public class TaskManagerTest {
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId01,
restoringStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
- assertThat(
- taskManager.taskOffsetSums(),
- is(mkMap(
+ final Map<TaskId, Long> expectedOffsetSums = mkMap(
mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask)
- ))
);
+
+
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
+ assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
}
@Test
@@ -1976,20 +1997,17 @@ public class TaskManagerTest {
taskManager.handleRebalanceStart(singleton("topic"));
taskManager.handleAssignment(emptyMap(), taskId00Assignment);
+
when(stateDirectory.taskOffsetSums(any())).thenReturn(expectedOffsetSums);
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
}
@Test
public void shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws
Exception {
- final Map<TopicPartition, Long> changelogOffsets = mkMap(
- mkEntry(new TopicPartition("changelog", 0), 5L),
- mkEntry(new TopicPartition("changelog", 1), 10L)
- );
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00,
15L));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
- writeCheckpointFile(taskId00, changelogOffsets);
+
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
taskManager.handleRebalanceStart(singleton("topic"));
@@ -1999,10 +2017,6 @@ public class TaskManagerTest {
@ParameterizedTest
@EnumSource(value = State.class, names = {"CREATED", "CLOSED"})
public void
shouldComputeOffsetSumFromCheckpointFileForCreatedAndClosedTasks(final State
state) throws Exception {
- final Map<TopicPartition, Long> changelogOffsets = mkMap(
- mkEntry(new TopicPartition("changelog", 0), 5L),
- mkEntry(new TopicPartition("changelog", 1), 10L)
- );
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00,
15L));
final StreamTask task = statefulTask(taskId00,
taskId00ChangelogPartitions)
@@ -2012,15 +2026,17 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
task)));
+
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
- writeCheckpointFile(taskId00, changelogOffsets);
+
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
taskManager.handleRebalanceStart(singleton("topic"));
+
when(stateDirectory.taskOffsetSums(Collections.singleton(taskId00))).thenReturn(expectedOffsetSums);
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
}
@@ -2039,7 +2055,6 @@ public class TaskManagerTest {
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
expectDirectoryNotEmpty(taskId00);
-
when(stateDirectory.checkpointFileFor(taskId00)).thenReturn(getCheckpointFile(taskId00));
taskManager.handleRebalanceStart(singleton("topic"));
assertTrue(taskManager.taskOffsetSums().isEmpty());
@@ -2047,17 +2062,11 @@ public class TaskManagerTest {
@Test
public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws
Exception {
- final long largeOffset = Long.MAX_VALUE / 2;
- final Map<TopicPartition, Long> changelogOffsets = mkMap(
- mkEntry(new TopicPartition("changelog", 1), largeOffset),
- mkEntry(new TopicPartition("changelog", 2), largeOffset),
- mkEntry(new TopicPartition("changelog", 3), largeOffset)
- );
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00,
Long.MAX_VALUE));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
- writeCheckpointFile(taskId00, changelogOffsets);
+
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));