Repository: samza
Updated Branches:
  refs/heads/master 032a16079 -> 01a1795e5


SAMZA-1768: Handle corrupted OFFSET file

This patch addresses the following tickets:

SAMZA-1778: SIGSEGV when reading properties (metrics) on a closed RocksDB store
SAMZA-1777: Logged store OFFSET file write during flush should be atomic
SAMZA-1768: Handle corrupted OFFSET file elegantly

Author: xinyuiscool <[email protected]>

Reviewers: Prateek M <[email protected]>

Closes #588 from xinyuiscool/SAMZA-1768


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/01a1795e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/01a1795e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/01a1795e

Branch: refs/heads/master
Commit: 01a1795e55b9d5547e1a23575f5a39ceb1954915
Parents: 032a160
Author: Xinyu Liu <[email protected]>
Authored: Mon Jul 30 11:46:39 2018 -0700
Committer: xiliu <[email protected]>
Committed: Mon Jul 30 11:46:39 2018 -0700

----------------------------------------------------------------------
 .../samza/storage/StorageManagerUtil.java       |  6 +++++-
 .../scala/org/apache/samza/util/FileUtil.scala  | 13 +++++++++++-
 .../org/apache/samza/util/TestFileUtil.scala    | 22 ++++++++++++++++++++
 .../samza/storage/kv/RocksDbKeyValueStore.scala |  9 +++++++-
 4 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/01a1795e/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 731a84d..e7301ea 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -128,7 +128,11 @@ public class StorageManagerUtil {
 
     if (offsetFileRef.exists()) {
       LOG.info("Found offset file in storage partition directory: {}", 
storePath);
-      offset = FileUtil.readWithChecksum(offsetFileRef);
+      try {
+        offset = FileUtil.readWithChecksum(offsetFileRef);
+      } catch (Exception e) {
+        LOG.warn("Failed to read offset file in storage partition directory: 
{}", storePath, e);
+      }
     } else {
       LOG.info("No offset file found in storage partition directory: {}", 
storePath);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/01a1795e/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala 
b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
index 46a2089..0845b5c 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -22,6 +22,7 @@
 package org.apache.samza.util
 
 import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, 
ObjectOutputStream}
+import java.nio.file._
 import java.util.zip.CRC32
 
 import org.apache.samza.util.Util.info
@@ -35,10 +36,12 @@ object FileUtil {
     * */
   def writeWithChecksum(file: File, data: String): Unit = {
     val checksum = getChecksum(data)
+    val tmpFilePath = file.getAbsolutePath + ".tmp"
+    val tmpFile = new File(tmpFilePath)
     var oos: ObjectOutputStream = null
     var fos: FileOutputStream = null
     try {
-      fos = new FileOutputStream(file)
+      fos = new FileOutputStream(tmpFile)
       oos = new ObjectOutputStream(fos)
       oos.writeLong(checksum)
       oos.writeUTF(data)
@@ -46,6 +49,14 @@ object FileUtil {
       oos.close()
       fos.close()
     }
+
+    //atomic swap of tmp and real offset file
+    try {
+      Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING)
+    } catch {
+      case e: AtomicMoveNotSupportedException =>
+        Files.move(tmpFile.toPath, file.toPath, 
StandardCopyOption.REPLACE_EXISTING)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/01a1795e/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala 
b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
index 5bb6da7..ddda268 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
@@ -49,6 +49,28 @@ class TestFileUtil {
   }
 
   @Test
+  def testWriteDataToFileWithExistingOffsetFile() {
+    // Invoke test
+    val file = new File(System.getProperty("java.io.tmpdir"), "test2")
+    // write the same file three times
+    FileUtil.writeWithChecksum(file, data)
+    FileUtil.writeWithChecksum(file, data)
+    FileUtil.writeWithChecksum(file, data)
+
+    // Check that file exists
+    assertTrue("File was not created!", file.exists())
+    val fis = new FileInputStream(file)
+    val ois = new ObjectInputStream(fis)
+
+    // Check content of the file is as expected
+    assertEquals(checksum, ois.readLong())
+    assertEquals(data, ois.readUTF())
+    ois.close()
+    fis.close()
+  }
+
+
+  @Test
   def testReadDataFromFile() {
     // Setup
     val fos = new FileOutputStream(file)

http://git-wip-us.apache.org/repos/asf/samza/blob/01a1795e/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index f25097c..836dab4 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -91,7 +91,14 @@ object RocksDbKeyValueStore extends Logging {
         .toSet
 
       (configuredMetrics ++ rocksDbMetrics)
-        .foreach(property => metrics.newGauge(property, () => 
rocksDb.getProperty(property)))
+        .foreach(property => metrics.newGauge(property, () =>
+          // Check isOwningHandle flag. The db is open iff the flag is true.
+          if (rocksDb.isOwningHandle) {
+            rocksDb.getProperty(property)
+          } else {
+            "0"
+          }
+        ))
 
       rocksDb
     } catch {

Reply via email to