This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 605d55d KAFKA-6647: Do note delete the lock file while holding the
lock (#8267)
605d55d is described below
commit 605d55dc173156471fb05c40d715ab318ce74952
Author: Guozhang Wang <[email protected]>
AuthorDate: Sat Mar 14 13:49:08 2020 -0700
KAFKA-6647: Do note delete the lock file while holding the lock (#8267)
1. Inside StateDirectory#cleanRemovedTasks, skip deleting the lock file
(and hence the parent directory) until releasing the lock. And after the lock
is released only go ahead and delete the parent directory if manualUserCall ==
true. That is, this is triggered from KafkaStreams#cleanUp and users are
responsible to make sure that Streams instance is not started and hence there
are no other threads trying to grab that lock.
2. As a result, during scheduled cleanup the corresponding task.dir would
not be empty but be left with only the lock file, so effectively we still
achieve the goal of releasing disk spaces. For callers of listTaskDirectories
like KIP-441 (cc @ableegoldman to take a look) I've introduced a new
listNonEmptyTaskDirectories which excludes such dummy task.dirs with only the
lock file left.
3. Also fixed KAFKA-8999 along the way to expose the exception while
traversing the directory.
Reviewers: A. Sophie Blee-Goldman <[email protected]>, John Roesler
<[email protected]>
---
.../java/org/apache/kafka/common/utils/Utils.java | 41 ++++--
.../org/apache/kafka/streams/KafkaStreams.java | 4 +-
.../processor/internals/StateDirectory.java | 139 ++++++++++++---------
.../streams/processor/internals/TaskManager.java | 12 +-
.../processor/internals/StateDirectoryTest.java | 43 +++++--
.../processor/internals/TaskManagerTest.java | 6 +-
6 files changed, 160 insertions(+), 85 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 0d16c0a..e9d4cc4 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -736,29 +736,56 @@ public final class Utils {
/**
* Recursively delete the given file/directory and any subfiles (if any
exist)
*
- * @param file The root file at which to begin deleting
+ * @param rootFile The root file at which to begin deleting
*/
- public static void delete(final File file) throws IOException {
- if (file == null)
+ public static void delete(final File rootFile) throws IOException {
+ delete(rootFile, Collections.emptyList());
+ }
+
+ /**
+ * Recursively delete the subfiles (if any exist) of the passed in root
file that are not included
+ * in the list to keep
+ *
+ * @param rootFile The root file at which to begin deleting
+ * @param filesToKeep The subfiles to keep (note that if a subfile is to
be kept, so are all its parent
+ * files in its pat)h; if empty we would also delete
the root file
+ */
+ public static void delete(final File rootFile, final List<File>
filesToKeep) throws IOException {
+ if (rootFile == null)
return;
- Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
+ Files.walkFileTree(rootFile.toPath(), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFileFailed(Path path, IOException exc)
throws IOException {
// If the root path did not exist, ignore the error; otherwise
throw it.
- if (exc instanceof NoSuchFileException &&
path.toFile().equals(file))
+ if (exc instanceof NoSuchFileException &&
path.toFile().equals(rootFile))
return FileVisitResult.TERMINATE;
throw exc;
}
@Override
public FileVisitResult visitFile(Path path, BasicFileAttributes
attrs) throws IOException {
- Files.delete(path);
+ if (!filesToKeep.contains(path.toFile())) {
+ Files.delete(path);
+ }
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path path, IOException
exc) throws IOException {
- Files.delete(path);
+ // KAFKA-8999: if there's an exception thrown previously
already, we should throw it
+ if (exc != null) {
+ throw exc;
+ }
+
+ if (rootFile.toPath().equals(path)) {
+ // only delete the parent directory if there's nothing to
keep
+ if (filesToKeep.isEmpty()) {
+ Files.delete(path);
+ }
+ } else if (!filesToKeep.contains(path.toFile())) {
+ Files.delete(path);
+ }
+
return FileVisitResult.CONTINUE;
}
});
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 45d6acb..2a901ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -716,11 +716,11 @@ public class KafkaStreams implements AutoCloseable {
}
final ProcessorTopology globalTaskTopology =
internalTopologyBuilder.buildGlobalStateTopology();
final long cacheSizePerThread = totalCacheSize / (threads.length +
(globalTaskTopology == null ? 0 : 1));
- final boolean createStateDirectory =
taskTopology.hasPersistentLocalStore() ||
+ final boolean hasPersistentStores =
taskTopology.hasPersistentLocalStore() ||
(globalTaskTopology != null &&
globalTaskTopology.hasPersistentGlobalStore());
try {
- stateDirectory = new StateDirectory(config, time,
createStateDirectory);
+ stateDirectory = new StateDirectory(config, time,
hasPersistentStores);
} catch (final ProcessorStateException fatal) {
throw new StreamsException(fatal);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 71b8b95..41c7c4b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -33,6 +33,7 @@ import java.nio.channels.OverlappingFileLockException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
+import java.util.Collections;
import java.util.HashMap;
import java.util.regex.Pattern;
@@ -50,11 +51,12 @@ public class StateDirectory {
static final String LOCK_FILE_NAME = ".lock";
private static final Logger log =
LoggerFactory.getLogger(StateDirectory.class);
+ private final Time time;
+ private final String appId;
private final File stateDir;
- private final boolean createStateDirectory;
+ private final boolean hasPersistentStores;
private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
- private final Time time;
private FileChannel globalStateChannel;
private FileLock globalStateLock;
@@ -72,22 +74,27 @@ public class StateDirectory {
/**
* Ensures that the state base directory as well as the application's
sub-directory are created.
*
+ * @param config streams application configuration to read
the root state directory path
+ * @param time system timer used to execute periodic
cleanup procedure
+ * @param hasPersistentStores only when the application's topology does
have stores persisted on local file
+ * system, we would go ahead and auto-create
the corresponding application / task / store
+ * directories whenever necessary; otherwise no
directories would be created.
+ *
* @throws ProcessorStateException if the base state directory or
application state directory does not exist
- * and could not be created when
createStateDirectory is enabled.
+ * and could not be created when
hasPersistentStores is enabled.
*/
- public StateDirectory(final StreamsConfig config,
- final Time time,
- final boolean createStateDirectory) {
+ public StateDirectory(final StreamsConfig config, final Time time, final
boolean hasPersistentStores) {
this.time = time;
- this.createStateDirectory = createStateDirectory;
+ this.hasPersistentStores = hasPersistentStores;
+ this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final String stateDirName =
config.getString(StreamsConfig.STATE_DIR_CONFIG);
final File baseDir = new File(stateDirName);
- if (this.createStateDirectory && !baseDir.exists() &&
!baseDir.mkdirs()) {
+ if (this.hasPersistentStores && !baseDir.exists() &&
!baseDir.mkdirs()) {
throw new ProcessorStateException(
String.format("base state directory [%s] doesn't exist and
couldn't be created", stateDirName));
}
- stateDir = new File(baseDir,
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
- if (this.createStateDirectory && !stateDir.exists() &&
!stateDir.mkdir()) {
+ stateDir = new File(baseDir, appId);
+ if (this.hasPersistentStores && !stateDir.exists() &&
!stateDir.mkdir()) {
throw new ProcessorStateException(
String.format("state directory [%s] doesn't exist and couldn't
be created", stateDir.getPath()));
}
@@ -100,7 +107,7 @@ public class StateDirectory {
*/
public File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
- if (createStateDirectory && !taskDir.exists() && !taskDir.mkdir()) {
+ if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(
String.format("task directory [%s] doesn't exist and couldn't
be created", taskDir.getPath()));
}
@@ -120,9 +127,13 @@ public class StateDirectory {
boolean directoryForTaskIsEmpty(final TaskId taskId) {
final File taskDir = directoryForTask(taskId);
+ return taskDirEmpty(taskDir);
+ }
+
+ private boolean taskDirEmpty(final File taskDir) {
final File[] storeDirs = taskDir.listFiles(pathname ->
!pathname.getName().equals(LOCK_FILE_NAME) &&
- !pathname.getName().equals(CHECKPOINT_FILE_NAME));
+ !pathname.getName().equals(CHECKPOINT_FILE_NAME));
// if the task is stateless, storeDirs would be null
return storeDirs == null || storeDirs.length == 0;
@@ -135,7 +146,7 @@ public class StateDirectory {
*/
File globalStateDir() {
final File dir = new File(stateDir, "global");
- if (createStateDirectory && !dir.exists() && !dir.mkdir()) {
+ if (hasPersistentStores && !dir.exists() && !dir.mkdir()) {
throw new ProcessorStateException(
String.format("global state directory [%s] doesn't exist and
couldn't be created", dir.getPath()));
}
@@ -148,12 +159,12 @@ public class StateDirectory {
/**
* Get the lock for the {@link TaskId}s directory if it is available
- * @param taskId
+ * @param taskId task id
* @return true if successful
- * @throws IOException
+ * @throws IOException if the file cannot be created or file handle cannot
be grabbed, should be considered as fatal
*/
synchronized boolean lock(final TaskId taskId) throws IOException {
- if (!createStateDirectory) {
+ if (!hasPersistentStores) {
return true;
}
@@ -197,7 +208,7 @@ public class StateDirectory {
}
synchronized boolean lockGlobalState() throws IOException {
- if (!createStateDirectory) {
+ if (!hasPersistentStores) {
return true;
}
@@ -259,18 +270,21 @@ public class StateDirectory {
}
public synchronized void clean() {
+ // remove task dirs
try {
cleanRemovedTasks(0, true);
} catch (final Exception e) {
// this is already logged within cleanRemovedTasks
throw new StreamsException(e);
}
+ // remove global dir
try {
if (stateDir.exists()) {
Utils.delete(globalStateDir().getAbsoluteFile());
}
} catch (final IOException e) {
- log.error("{} Failed to delete global state directory due to an
unexpected exception", logPrefix(), e);
+ log.error("{} Failed to delete global state directory of {} due to
an unexpected exception",
+ appId, logPrefix(), e);
throw new StreamsException(e);
}
}
@@ -292,71 +306,82 @@ public class StateDirectory {
private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
final boolean manualUserCall)
throws Exception {
- for (final File taskDir : listTaskDirectories()) {
+ final File[] taskDirs = listAllTaskDirectories();
+ if (taskDirs == null || taskDirs.length == 0) {
+ return; // nothing to do
+ }
+
+ for (final File taskDir : taskDirs) {
final String dirName = taskDir.getName();
final TaskId id = TaskId.parse(dirName);
if (!locks.containsKey(id)) {
+ Exception exception = null;
try {
if (lock(id)) {
final long now = time.milliseconds();
final long lastModifiedMs = taskDir.lastModified();
- if (now > lastModifiedMs + cleanupDelayMs ||
manualUserCall) {
- if (!manualUserCall) {
- log.info(
- "{} Deleting obsolete state directory {}
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
- logPrefix(),
- dirName,
- id,
- now - lastModifiedMs,
- cleanupDelayMs);
- } else {
- log.info(
- "{} Deleting state directory {} for
task {} as user calling cleanup.",
- logPrefix(),
- dirName,
- id);
- }
- Utils.delete(taskDir);
+ if (now > lastModifiedMs + cleanupDelayMs) {
+ log.info("{} Deleting obsolete state directory {}
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
+ logPrefix(), dirName, id, now -
lastModifiedMs, cleanupDelayMs);
+
+ Utils.delete(taskDir,
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+ } else if (manualUserCall) {
+ log.info("{} Deleting state directory {} for task
{} as user calling cleanup.",
+ logPrefix(), dirName, id);
+
+ Utils.delete(taskDir,
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
}
}
- } catch (final OverlappingFileLockException e) {
- // locked by another thread
- if (manualUserCall) {
- log.error("{} Failed to get the state directory
lock.", logPrefix(), e);
- throw e;
- }
- } catch (final IOException e) {
- log.error("{} Failed to delete the state directory.",
logPrefix(), e);
- if (manualUserCall) {
- throw e;
- }
+ } catch (final OverlappingFileLockException | IOException e) {
+ exception = e;
} finally {
try {
unlock(id);
- } catch (final IOException e) {
- log.error("{} Failed to release the state directory
lock.", logPrefix());
+
+ // for manual user call, stream threads are not
running so it is safe to delete
+ // the whole directory
if (manualUserCall) {
- throw e;
+ Utils.delete(taskDir);
}
+ } catch (final IOException e) {
+ exception = e;
}
}
+
+ if (exception != null && manualUserCall) {
+ log.error("{} Failed to release the state directory
lock.", logPrefix());
+ throw exception;
+ }
}
}
}
/**
+ * List all of the task directories that are non-empty
+ * @return The list of all the non-empty local directories for stream tasks
+ */
+ File[] listNonEmptyTaskDirectories() {
+ final File[] taskDirectories = !stateDir.exists() ? new File[0] :
+ stateDir.listFiles(pathname -> {
+ if (!pathname.isDirectory() ||
!PATH_NAME.matcher(pathname.getName()).matches()) {
+ return false;
+ } else {
+ return !taskDirEmpty(pathname);
+ }
+ });
+
+ return taskDirectories == null ? new File[0] : taskDirectories;
+ }
+
+ /**
* List all of the task directories
* @return The list of all the existing local directories for stream tasks
*/
- File[] listTaskDirectories() {
- final File[] taskDirectories =
+ File[] listAllTaskDirectories() {
+ final File[] taskDirectories = !stateDir.exists() ? new File[0] :
stateDir.listFiles(pathname -> pathname.isDirectory() &&
PATH_NAME.matcher(pathname.getName()).matches());
- if (!stateDir.exists() || taskDirectories == null) {
- return new File[0];
- } else {
- return taskDirectories;
- }
+ return taskDirectories == null ? new File[0] : taskDirectories;
}
private FileChannel getOrCreateFileChannel(final TaskId taskId,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 2c80c34..b804a4a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -127,7 +127,7 @@ public class TaskManager {
void handleRebalanceStart(final Set<String> subscribedTopics) {
builder.addSubscribedTopicsFromMetadata(subscribedTopics, logPrefix);
- tryToLockAllTaskDirectories();
+ tryToLockAllNonEmptyTaskDirectories();
rebalanceInProgress = true;
}
@@ -379,7 +379,7 @@ public class TaskManager {
/**
* Compute the offset total summed across all stores in a task. Includes
offset sum for any tasks we own the
- * lock for, which includes assigned and unassigned tasks we locked in
{@link #tryToLockAllTaskDirectories()}
+ * lock for, which includes assigned and unassigned tasks we locked in
{@link #tryToLockAllNonEmptyTaskDirectories()}
*
* @return Map from task id to its total offset summed across all state
stores
*/
@@ -410,13 +410,13 @@ public class TaskManager {
}
/**
- * Makes a weak attempt to lock all task directories in the state dir. We
are responsible for computing and
+ * Makes a weak attempt to lock all non-empty task directories in the
state dir. We are responsible for computing and
* reporting the offset sum for any unassigned tasks we obtain the lock
for in the upcoming rebalance. Tasks
* that we locked but didn't own will be released at the end of the
rebalance (unless of course we were
* assigned the task as a result of the rebalance). This method should be
idempotent.
*/
- private void tryToLockAllTaskDirectories() {
- for (final File dir : stateDirectory.listTaskDirectories()) {
+ private void tryToLockAllNonEmptyTaskDirectories() {
+ for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) {
try {
final TaskId id = TaskId.parse(dir.getName());
try {
@@ -438,7 +438,7 @@ public class TaskManager {
/**
* We must release the lock for any unassigned tasks that we temporarily
locked in preparation for a
- * rebalance in {@link #tryToLockAllTaskDirectories()}.
+ * rebalance in {@link #tryToLockAllNonEmptyTaskDirectories()}.
*/
private void releaseLockedUnassignedTaskDirectories() {
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index e1ca9188..827557a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -249,18 +249,29 @@ public class StateDirectoryTest {
public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked()
throws Exception {
final TaskId task0 = new TaskId(0, 0);
final TaskId task1 = new TaskId(1, 0);
+ final TaskId task2 = new TaskId(2, 0);
try {
+ assertTrue(new File(directory.directoryForTask(task0),
"store").mkdir());
+ assertTrue(new File(directory.directoryForTask(task1),
"store").mkdir());
+ assertTrue(new File(directory.directoryForTask(task2),
"store").mkdir());
+
directory.lock(task0);
directory.lock(task1);
- directory.directoryForTask(new TaskId(2, 0));
- List<File> files =
Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
+ List<File> files =
Arrays.asList(Objects.requireNonNull(directory.listAllTaskDirectories()));
+ assertEquals(3, files.size());
+
+
+ files =
Arrays.asList(Objects.requireNonNull(directory.listNonEmptyTaskDirectories()));
assertEquals(3, files.size());
- time.sleep(1000);
+ time.sleep(5000);
directory.cleanRemovedTasks(0);
- files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
+ files =
Arrays.asList(Objects.requireNonNull(directory.listAllTaskDirectories()));
+ assertEquals(3, files.size());
+
+ files =
Arrays.asList(Objects.requireNonNull(directory.listNonEmptyTaskDirectories()));
assertEquals(2, files.size());
assertTrue(files.contains(new File(appDir, task0.toString())));
assertTrue(files.contains(new File(appDir, task1.toString())));
@@ -273,13 +284,19 @@ public class StateDirectoryTest {
@Test
public void
shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
final File dir = directory.directoryForTask(new TaskId(2, 0));
+ assertTrue(new File(dir, "store").mkdir());
+
final int cleanupDelayMs = 60000;
directory.cleanRemovedTasks(cleanupDelayMs);
assertTrue(dir.exists());
+ assertEquals(1, directory.listAllTaskDirectories().length);
+ assertEquals(1, directory.listNonEmptyTaskDirectories().length);
time.sleep(cleanupDelayMs + 1000);
directory.cleanRemovedTasks(cleanupDelayMs);
- assertFalse(dir.exists());
+ assertTrue(dir.exists());
+ assertEquals(1, directory.listAllTaskDirectories().length);
+ assertEquals(0, directory.listNonEmptyTaskDirectories().length);
}
@Test
@@ -290,15 +307,21 @@ public class StateDirectoryTest {
}
@Test
- public void shouldListAllTaskDirectories() {
+ public void shouldOnlyListNonEmptyTaskDirectories() {
TestUtils.tempDirectory(stateDir.toPath(), "foo");
final File taskDir1 = directory.directoryForTask(new TaskId(0, 0));
final File taskDir2 = directory.directoryForTask(new TaskId(0, 1));
- final List<File> dirs = Arrays.asList(directory.listTaskDirectories());
- assertEquals(2, dirs.size());
- assertTrue(dirs.contains(taskDir1));
- assertTrue(dirs.contains(taskDir2));
+ final File storeDir = new File(taskDir1, "store");
+ assertTrue(storeDir.mkdir());
+
+ assertEquals(Arrays.asList(taskDir1, taskDir2),
Arrays.asList(directory.listAllTaskDirectories()));
+ assertEquals(Collections.singletonList(taskDir1),
Arrays.asList(directory.listNonEmptyTaskDirectories()));
+
+ directory.cleanRemovedTasks(0L);
+
+ assertEquals(Arrays.asList(taskDir1, taskDir2),
Arrays.asList(directory.listAllTaskDirectories()));
+ assertEquals(Collections.emptyList(),
Arrays.asList(directory.listNonEmptyTaskDirectories()));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 62043e3..2f7681d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -168,7 +168,7 @@ public class TaskManagerTest {
@Test
public void shouldNotLockAnythingIfStateDirIsEmpty() {
- expect(stateDirectory.listTaskDirectories()).andReturn(new
File[0]).once();
+ expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(new
File[0]).once();
replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
@@ -999,7 +999,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andReturn(assignment);
consumer.pause(assignment);
expectLastCall();
- expect(stateDirectory.listTaskDirectories()).andReturn(new File[0]);
+ expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(new
File[0]);
replay(consumer, stateDirectory);
assertThat(taskManager.isRebalanceInProgress(), is(false));
taskManager.handleRebalanceStart(emptySet());
@@ -1665,7 +1665,7 @@ public class TaskManagerTest {
for (int i = 0; i < names.length; ++i) {
taskFolders[i] = testFolder.newFolder(names[i]);
}
-
expect(stateDirectory.listTaskDirectories()).andReturn(taskFolders).once();
+
expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(taskFolders).once();
}
private void writeCheckpointFile(final TaskId task, final
Map<TopicPartition, Long> offsets) throws IOException {