This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a2aa4b  KAFKA-6499: Do not write offset checkpoint file with empty 
offset map (#4492)
4a2aa4b is described below

commit 4a2aa4bb6700c02b573c3ae71a40bdb8df5ab3a7
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu Feb 1 10:11:29 2018 -0800

    KAFKA-6499: Do not write offset checkpoint file with empty offset map 
(#4492)
    
    * In Checkpoint.write(), if the offset map passed in is empty, skip the 
writing of the file which would only contain version number and the empty size. 
From the reading pov, it is the same as no file existed.
    * Add related unit tests.
    * Minor fixes on log4j messages.
    
    Reviewers: Bill Bejeck <[email protected]>, Damian Guy 
<[email protected]>, Matthias J. Sax <[email protected]>
---
 .../processor/internals/AbstractStateManager.java   |  2 +-
 .../processor/internals/GlobalStateManagerImpl.java |  2 +-
 .../processor/internals/ProcessorStateManager.java  | 13 +++++++------
 .../streams/state/internals/OffsetCheckpoint.java   |  5 +++++
 .../internals/ProcessorStateManagerTest.java        |  6 +++---
 .../state/internals/OffsetCheckpointTest.java       | 21 +++++++++++++++++++--
 6 files changed, 36 insertions(+), 13 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index d387762..b270e03 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -65,7 +65,7 @@ abstract class AbstractStateManager implements StateManager {
         try {
             checkpoint.write(checkpointableOffsets);
         } catch (final IOException fatalException) {
-            log.error("Failed to update checkpoint file for global stores.", 
fatalException);
+            log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
             throw new StreamsException("Failed to reinitialize global store.", 
fatalException);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 2d4ee8f..56e6bed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -339,7 +339,7 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
             try {
                 checkpoint.write(checkpointableOffsets);
             } catch (IOException e) {
-                log.warn("Failed to write offsets checkpoint for global 
globalStores", e);
+                log.warn("Failed to write offset checkpoint file to {} for 
global stores: {}", checkpoint, e);
             }
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1ee0e14..e7a23bd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -294,7 +294,6 @@ public class ProcessorStateManager extends 
AbstractStateManager {
     // write the checkpoint
     @Override
     public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
-        log.trace("Writing checkpoint: {}", ackedOffsets);
         checkpointableOffsets.putAll(changelogReader.restoredOffsets());
         for (final StateStore store : stores.values()) {
             final String storeName = store.name();
@@ -311,14 +310,16 @@ public class ProcessorStateManager extends 
AbstractStateManager {
                 }
             }
         }
-        // write the checkpoint file before closing, to indicate clean shutdown
+        // write the checkpoint file before closing
+        if (checkpoint == null) {
+            checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
+        }
+
+        log.trace("Writing checkpoint: {}", checkpointableOffsets);
         try {
-            if (checkpoint == null) {
-                checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
-            }
             checkpoint.write(checkpointableOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write checkpoint file to {}:", new 
File(baseDir, CHECKPOINT_FILE_NAME), e);
+            log.warn("Failed to write offset checkpoint file to {}: {}", 
checkpoint, e);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 8c14737..9f0e1f8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -66,6 +66,11 @@ public class OffsetCheckpoint {
      * @throws IOException if any file operation fails with an IO exception
      */
     public void write(final Map<TopicPartition, Long> offsets) throws 
IOException {
+        // if there is no offsets, skip writing the file to save disk IOs
+        if (offsets.isEmpty()) {
+            return;
+        }
+
         synchronized (lock) {
             // write to temp file and then swap with the existing file
             final File temp = new File(file.getAbsolutePath() + ".tmp");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ab9abc3..31f07cc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -309,8 +309,8 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
         try {
-            // make sure the checkpoint file isn't deleted
-            assertTrue(checkpointFile.exists());
+            // make sure the checkpoint file is not written yet
+            assertFalse(checkpointFile.exists());
 
             stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
             stateMgr.register(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
@@ -630,7 +630,7 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws 
IOException {
-        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        checkpoint.write(Collections.singletonMap(new 
TopicPartition(persistentStoreTopicName, 1), 123L));
         assertTrue(checkpointFile.exists());
 
         ProcessorStateManager stateManager = null;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index 3b78d05..54cd3df 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,8 +35,8 @@ public class OffsetCheckpointTest {
 
     @Test
     public void testReadWrite() throws IOException {
-        File f = TestUtils.tempFile();
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+        final File f = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
 
         try {
             Map<TopicPartition, Long> offsets = new HashMap<>();
@@ -56,4 +57,20 @@ public class OffsetCheckpointTest {
             checkpoint.delete();
         }
     }
+
+    @Test
+    public void shouldNotWriteCheckpointWhenNoOffsets() throws IOException {
+        // we do not need to worry about file name uniqueness since this file 
should not be created
+        final File f = new File(TestUtils.tempDirectory().getAbsolutePath(), 
"kafka.tmp");
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+
+        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+
+        assertFalse(f.exists());
+
+        assertEquals(Collections.<TopicPartition, Long>emptyMap(), 
checkpoint.read());
+
+        // deleting a non-exist checkpoint file should be fine
+        checkpoint.delete();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to