Repository: samza Updated Branches: refs/heads/master 032a16079 -> 01a1795e5
SAMZA-1768: Handle corrupted OFFSET file This patch addresses the following tickets: SAMZA-1778: SIGSEGV when reading properties (metrics) on a closed RocksDB store SAMZA-1777: Logged store OFFSET file write during flush should be atomic SAMZA-1768: Handle corrupted OFFSET file elegantly Author: xinyuiscool <[email protected]> Reviewers: Prateek M <[email protected]> Closes #588 from xinyuiscool/SAMZA-1768 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/01a1795e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/01a1795e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/01a1795e Branch: refs/heads/master Commit: 01a1795e55b9d5547e1a23575f5a39ceb1954915 Parents: 032a160 Author: Xinyu Liu <[email protected]> Authored: Mon Jul 30 11:46:39 2018 -0700 Committer: xiliu <[email protected]> Committed: Mon Jul 30 11:46:39 2018 -0700 ---------------------------------------------------------------------- .../samza/storage/StorageManagerUtil.java | 6 +++++- .../scala/org/apache/samza/util/FileUtil.scala | 13 +++++++++++- .../org/apache/samza/util/TestFileUtil.scala | 22 ++++++++++++++++++++ .../samza/storage/kv/RocksDbKeyValueStore.scala | 9 +++++++- 4 files changed, 47 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/01a1795e/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java index 731a84d..e7301ea 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java @@ -128,7 +128,11 @@ public class StorageManagerUtil { if (offsetFileRef.exists()) { LOG.info("Found offset file in storage partition directory: {}", storePath); - offset = FileUtil.readWithChecksum(offsetFileRef); + try { + offset = FileUtil.readWithChecksum(offsetFileRef); + } catch (Exception e) { + LOG.warn("Failed to read offset file in storage partition directory: {}", storePath, e); + } } else { LOG.info("No offset file found in storage partition directory: {}", storePath); } http://git-wip-us.apache.org/repos/asf/samza/blob/01a1795e/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index 46a2089..0845b5c 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -22,6 +22,7 @@ package org.apache.samza.util import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} +import java.nio.file._ import java.util.zip.CRC32 import org.apache.samza.util.Util.info @@ -35,10 +36,12 @@ object FileUtil { * */ def writeWithChecksum(file: File, data: String): Unit = { val checksum = getChecksum(data) + val tmpFilePath = file.getAbsolutePath + ".tmp" + val tmpFile = new File(tmpFilePath) var oos: ObjectOutputStream = null var fos: FileOutputStream = null try { - fos = new FileOutputStream(file) + fos = new FileOutputStream(tmpFile) oos = new ObjectOutputStream(fos) oos.writeLong(checksum) oos.writeUTF(data) @@ -46,6 +49,14 @@ object FileUtil { oos.close() fos.close() } + + //atomic swap of tmp and real offset file + try { + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING) + } catch { + case e: AtomicMoveNotSupportedException => + Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.REPLACE_EXISTING) + } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/01a1795e/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala index 5bb6da7..ddda268 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala @@ -49,6 +49,28 @@ class TestFileUtil { } @Test + def testWriteDataToFileWithExistingOffsetFile() { + // Invoke test + val file = new File(System.getProperty("java.io.tmpdir"), "test2") + // write the same file three times + FileUtil.writeWithChecksum(file, data) + FileUtil.writeWithChecksum(file, data) + FileUtil.writeWithChecksum(file, data) + + // Check that file exists + assertTrue("File was not created!", file.exists()) + val fis = new FileInputStream(file) + val ois = new ObjectInputStream(fis) + + // Check content of the file is as expected + assertEquals(checksum, ois.readLong()) + assertEquals(data, ois.readUTF()) + ois.close() + fis.close() + } + + + @Test def testReadDataFromFile() { // Setup val fos = new FileOutputStream(file) http://git-wip-us.apache.org/repos/asf/samza/blob/01a1795e/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index f25097c..836dab4 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -91,7 +91,14 @@ object RocksDbKeyValueStore extends Logging { .toSet (configuredMetrics ++ rocksDbMetrics) - .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property))) + .foreach(property => metrics.newGauge(property, () => + // Check isOwningHandle flag. The db is open iff the flag is true. + if (rocksDb.isOwningHandle) { + rocksDb.getProperty(property) + } else { + "0" + } + )) rocksDb } catch {
