This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a2aa4b KAFKA-6499: Do not write offset checkpoint file with empty
offset map (#4492)
4a2aa4b is described below
commit 4a2aa4bb6700c02b573c3ae71a40bdb8df5ab3a7
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu Feb 1 10:11:29 2018 -0800
KAFKA-6499: Do not write offset checkpoint file with empty offset map
(#4492)
* In Checkpoint.write(), if the offset map passed in is empty, skip the
writing of the file which would only contain version number and the empty size.
From the reading pov, it is the same as no file existed.
* Add related unit tests.
* Minor fixes on log4j messages.
Reviewers: Bill Bejeck <[email protected]>, Damian Guy
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../processor/internals/AbstractStateManager.java | 2 +-
.../processor/internals/GlobalStateManagerImpl.java | 2 +-
.../processor/internals/ProcessorStateManager.java | 13 +++++++------
.../streams/state/internals/OffsetCheckpoint.java | 5 +++++
.../internals/ProcessorStateManagerTest.java | 6 +++---
.../state/internals/OffsetCheckpointTest.java | 21 +++++++++++++++++++--
6 files changed, 36 insertions(+), 13 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index d387762..b270e03 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -65,7 +65,7 @@ abstract class AbstractStateManager implements StateManager {
try {
checkpoint.write(checkpointableOffsets);
} catch (final IOException fatalException) {
- log.error("Failed to update checkpoint file for global stores.",
fatalException);
+ log.error("Failed to write offset checkpoint file to {} while
re-initializing {}: {}", checkpoint, stateStores, fatalException);
throw new StreamsException("Failed to reinitialize global store.",
fatalException);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 2d4ee8f..56e6bed 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -339,7 +339,7 @@ public class GlobalStateManagerImpl extends
AbstractStateManager implements Glob
try {
checkpoint.write(checkpointableOffsets);
} catch (IOException e) {
- log.warn("Failed to write offsets checkpoint for global
globalStores", e);
+ log.warn("Failed to write offset checkpoint file to {} for
global stores: {}", checkpoint, e);
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1ee0e14..e7a23bd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -294,7 +294,6 @@ public class ProcessorStateManager extends
AbstractStateManager {
// write the checkpoint
@Override
public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
- log.trace("Writing checkpoint: {}", ackedOffsets);
checkpointableOffsets.putAll(changelogReader.restoredOffsets());
for (final StateStore store : stores.values()) {
final String storeName = store.name();
@@ -311,14 +310,16 @@ public class ProcessorStateManager extends
AbstractStateManager {
}
}
}
- // write the checkpoint file before closing, to indicate clean shutdown
+ // write the checkpoint file before closing
+ if (checkpoint == null) {
+ checkpoint = new OffsetCheckpoint(new File(baseDir,
CHECKPOINT_FILE_NAME));
+ }
+
+ log.trace("Writing checkpoint: {}", checkpointableOffsets);
try {
- if (checkpoint == null) {
- checkpoint = new OffsetCheckpoint(new File(baseDir,
CHECKPOINT_FILE_NAME));
- }
checkpoint.write(checkpointableOffsets);
} catch (final IOException e) {
- log.warn("Failed to write checkpoint file to {}:", new
File(baseDir, CHECKPOINT_FILE_NAME), e);
+ log.warn("Failed to write offset checkpoint file to {}: {}",
checkpoint, e);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 8c14737..9f0e1f8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -66,6 +66,11 @@ public class OffsetCheckpoint {
* @throws IOException if any file operation fails with an IO exception
*/
public void write(final Map<TopicPartition, Long> offsets) throws
IOException {
+ // if there is no offsets, skip writing the file to save disk IOs
+ if (offsets.isEmpty()) {
+ return;
+ }
+
synchronized (lock) {
// write to temp file and then swap with the existing file
final File temp = new File(file.getAbsolutePath() + ".tmp");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ab9abc3..31f07cc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -309,8 +309,8 @@ public class ProcessorStateManagerTest {
false,
logContext);
try {
- // make sure the checkpoint file isn't deleted
- assertTrue(checkpointFile.exists());
+ // make sure the checkpoint file is not written yet
+ assertFalse(checkpointFile.exists());
stateMgr.register(persistentStore,
persistentStore.stateRestoreCallback);
stateMgr.register(nonPersistentStore,
nonPersistentStore.stateRestoreCallback);
@@ -630,7 +630,7 @@ public class ProcessorStateManagerTest {
@Test
public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws
IOException {
- checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+ checkpoint.write(Collections.singletonMap(new
TopicPartition(persistentStoreTopicName, 1), 123L));
assertTrue(checkpointFile.exists());
ProcessorStateManager stateManager = null;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index 3b78d05..54cd3df 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -34,8 +35,8 @@ public class OffsetCheckpointTest {
@Test
public void testReadWrite() throws IOException {
- File f = TestUtils.tempFile();
- OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+ final File f = TestUtils.tempFile();
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
try {
Map<TopicPartition, Long> offsets = new HashMap<>();
@@ -56,4 +57,20 @@ public class OffsetCheckpointTest {
checkpoint.delete();
}
}
+
+ @Test
+ public void shouldNotWriteCheckpointWhenNoOffsets() throws IOException {
+ // we do not need to worry about file name uniqueness since this file
should not be created
+ final File f = new File(TestUtils.tempDirectory().getAbsolutePath(),
"kafka.tmp");
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+
+ checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+
+ assertFalse(f.exists());
+
+ assertEquals(Collections.<TopicPartition, Long>emptyMap(),
checkpoint.read());
+
+ // deleting a non-exist checkpoint file should be fine
+ checkpoint.delete();
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].