Repository: samza Updated Branches: refs/heads/master 9f30ef10b -> bb8a78a85
SAMZA-557: Reuse local state in SamzaContainer on clean shutdown Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bb8a78a8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bb8a78a8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bb8a78a8 Branch: refs/heads/master Commit: bb8a78a85666325641e75fe466476094de716555 Parents: 9f30ef1 Author: Navina Ramesh <[email protected]> Authored: Wed May 6 17:22:41 2015 -0700 Committer: Yan Fang <[email protected]> Committed: Wed May 6 17:22:41 2015 -0700 ---------------------------------------------------------------------- .../samza/config/ShellCommandConfig.scala | 8 + .../apache/samza/container/SamzaContainer.scala | 36 +++- .../samza/storage/TaskStorageManager.scala | 83 +++++++-- .../main/scala/org/apache/samza/util/Util.scala | 59 +++++- .../samza/storage/TestTaskStorageManager.scala | 186 +++++++++++++++++++ .../scala/org/apache/samza/util/TestUtil.scala | 54 ++++-- .../samza/storage/kv/RocksDbKeyValueStore.scala | 1 - 7 files changed, 391 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala index e94a473..f505322 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala @@ -46,6 +46,14 @@ object ShellCommandConfig { */ val ENV_JAVA_HOME = "JAVA_HOME" + /* + * The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza + * containers. For example, when using YARN, it has to be set in all NMs and passed to the containers. + * If this environment variable is not set, the path defaults to current working directory (which is the same as the + * path for persisting non-logged data stores) + */ + val ENV_LOGGED_STORE_BASE_DIR = "LOGGED_STORE_BASE_DIR" + val COMMAND_SHELL_EXECUTE = "task.execute" val TASK_JVM_OPTS = "task.opts" val TASK_JAVA_HOME = "task.java.home" http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index ac4793a..e8e830e 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -369,12 +369,6 @@ object SamzaContainer extends Logging { metrics = systemProducersMetrics, dropSerializationError = dropSerializationError) - // TODO not sure how we should make this config based, or not. Kind of - // strange, since it has some dynamic directories when used with YARN. - val storeBaseDir = new File(System.getProperty("user.dir"), "state") - - info("Got storage engine base directory: %s" format storeBaseDir) - val storageEngineFactories = config .getStoreNames .map(storeName => { @@ -443,6 +437,24 @@ object SamzaContainer extends Logging { info("Got store consumers: %s" format storeConsumers) + // TODO not sure how we should make this config based, or not. Kind of + // strange, since it has some dynamic directories when used with YARN. + val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state") + info("Got default storage engine base directory: %s" format defaultStoreBaseDir) + + var loggedStorageBaseDir: File = null + if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) { + val jobNameAndId = Util.getJobNameAndId(config) + loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2) + } else { + warn("No override was provided for logged store base directory. This disables local state re-use on " + + "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " + + "variable in all machines running the Samza container") + loggedStorageBaseDir = defaultStoreBaseDir + } + + info("Got base directory for logged data stores: %s" format loggedStorageBaseDir) + val taskStores = storageEngineFactories .map { case (storeName, storageEngineFactory) => @@ -459,10 +471,15 @@ object SamzaContainer extends Logging { case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("No class defined for serde: %s." format msgSerde)) case _ => null } - val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) + val storeBaseDir = if(changeLogSystemStreamPartition != null) { + TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName) + } + else { + TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName) + } val storageEngine = storageEngineFactory.getStorageEngine( storeName, - storePartitionDir, + storeBaseDir, keySerde, msgSerde, collector, @@ -481,7 +498,8 @@ object SamzaContainer extends Logging { changeLogSystemStreams = changeLogSystemStreams, maxChangeLogStreamPartitions, streamMetadataCache = streamMetadataCache, - storeBaseDir = storeBaseDir, + storeBaseDir = defaultStoreBaseDir, + loggedStoreBaseDir = loggedStorageBaseDir, partition = taskModel.getChangelogPartition, systemAdmins = systemAdmins) http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index f68a7fe..aeba61a 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -19,8 +19,9 @@ package org.apache.samza.storage -import java.io.File -import scala.collection.Map +import java.io._ +import java.util +import scala.collection.{JavaConversions, Map} import org.apache.samza.util.Logging import org.apache.samza.Partition import org.apache.samza.system._ @@ -50,31 +51,74 @@ class TaskStorageManager( changeLogStreamPartitions: Int, streamMetadataCache: StreamMetadataCache, storeBaseDir: File = new File(System.getProperty("user.dir"), "state"), + loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"), partition: Partition, systemAdmins: Map[String, SystemAdmin]) extends Logging { var taskStoresToRestore = taskStores var changeLogOldestOffsets: Map[SystemStream, String] = Map() + val fileOffset: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]() + val offsetFileName = "OFFSET" def apply(storageEngineName: String) = taskStores(storageEngineName) def init { cleanBaseDirs + setupBaseDirs createStreams startConsumers restoreStores stopConsumers } + private def setupBaseDirs { + debug("Setting up base directories for stores.") + + val loggedStores = changeLogSystemStreams.keySet + + (taskStores.keySet -- loggedStores) + .foreach(TaskStorageManager.getStorePartitionDir(storeBaseDir, _, taskName).mkdirs) + + loggedStores.foreach(storeName => { + val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + if(!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs + }) + } + private def cleanBaseDirs { debug("Cleaning base directories for stores.") + taskStores.keys.foreach(storeName => { val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) + info("Got logged storage partition directory as %s" format storagePartitionDir.toPath.toString) - debug("Cleaning %s for store %s." format (storagePartitionDir, storeName)) + if(storagePartitionDir.exists()) { + debug("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString) + Util.rm(storagePartitionDir) + } - Util.rm(storagePartitionDir) - storagePartitionDir.mkdirs + val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + info("Got logged storage partition directory as %s" format loggedStoragePartitionDir.toPath.toString) + + var deleteLoggedStoragePartitionDir = false + + val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName) + if(offsetFileRef.exists()) { + debug("Found offset file in partition directory: %s" format offsetFileRef.toPath.toString) + val offset = Util.readDataFromFile(offsetFileRef) + if(offset != null && !offset.isEmpty) { + fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset) + } else { + deleteLoggedStoragePartitionDir = true + } + offsetFileRef.delete() + } else { + info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString) + deleteLoggedStoragePartitionDir = true + } + if(deleteLoggedStoragePartitionDir && loggedStoragePartitionDir.exists()) { + Util.rm(loggedStoragePartitionDir) + } }) } @@ -82,7 +126,9 @@ class TaskStorageManager( info("Creating streams that are not present for changelog") for ((storeName, systemStream) <- changeLogSystemStreams) { - var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream)) + val systemAdmin = systemAdmins + .getOrElse(systemStream.getSystem, + throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream)) systemAdmin.createChangelogStream(systemStream.getStream, changeLogStreamPartitions) } @@ -99,15 +145,16 @@ class TaskStorageManager( for ((storeName, systemStream) <- changeLogSystemStreams) { val systemStreamPartition = new SystemStreamPartition(systemStream, partition) val consumer = storeConsumers(storeName) - val offset = changeLogOldestOffsets.getOrElse(systemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition)) + val offset = + Option(fileOffset.get(systemStreamPartition)) + .getOrElse(changeLogOldestOffsets + .getOrElse(systemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition))) if (offset != null) { info("Registering change log consumer with offset %s for %s." format (offset, systemStreamPartition)) - consumer.register(systemStreamPartition, offset) } else { info("Skipping change log restoration for %s because stream appears to be empty (offset was null)." format systemStreamPartition) - taskStoresToRestore -= storeName } } @@ -123,7 +170,7 @@ class TaskStorageManager( val systemStream = changeLogSystemStreams(storeName) val systemStreamPartition = new SystemStreamPartition(systemStream, partition) val systemConsumer = storeConsumers(storeName) - val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition); + val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition) store.restore(systemConsumerIterator) } } @@ -143,8 +190,22 @@ class TaskStorageManager( def stop() { debug("Stopping stores.") - taskStores.values.foreach(_.stop) + + debug("Persisting logged key value stores") + changeLogSystemStreams.foreach { case (store, systemStream) => { + val streamToMetadata = systemAdmins(systemStream.getSystem) + .getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream))) + val sspMetadata = streamToMetadata + .get(systemStream.getStream) + .getSystemStreamPartitionMetadata + .get(partition) + val newestOffset = sspMetadata.getNewestOffset + + val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, taskName), offsetFileName) + Util.writeDataToFile(offsetFile, newestOffset) + info("Successfully stored offset %s for store %s in OFFSET file " format (newestOffset, store)) + }} } http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 8a83566..2feb65b 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -20,8 +20,9 @@ package org.apache.samza.util import java.net.{HttpURLConnection, URL} -import java.io.{InputStream, BufferedReader, File, InputStreamReader} +import java.io._ import java.lang.management.ManagementFactory +import java.util.zip.CRC32 import org.apache.samza.{SamzaException, Partition} import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream} import java.util.Random @@ -237,4 +238,60 @@ object Util extends Logging { new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)), new Partition(Integer.parseInt(ssp.substring(lastIdx + 1)))) } + + /** + * Method to generate the CRC32 checksum code for any given data + * @param data The string for which checksum has to be generated + * @return long type value representing the checksum + * */ + def getChecksumValue(data: String) = { + val crc = new CRC32 + crc.update(data.getBytes) + crc.getValue + } + + /** + * Method that always writes checksum & data to a file + * Checksum is pre-fixed to the data and is a 32-bit long type data. + * @param file The file handle to write to + * @param data The data to be written to the file + * */ + def writeDataToFile(file: File, data: String) = { + val checksum = getChecksumValue(data) + var oos: ObjectOutputStream = null + var fos: FileOutputStream = null + try { + fos = new FileOutputStream(file) + oos = new ObjectOutputStream(fos) + oos.writeLong(checksum) + oos.writeUTF(data) + } finally { + oos.close() + fos.close() + } + } + + /** + * Method to read from a file that has a checksum prepended to the data + * @param file The file handle to read from + * */ + def readDataFromFile(file: File) = { + var fis: FileInputStream = null + var ois: ObjectInputStream = null + try { + fis = new FileInputStream(file) + ois = new ObjectInputStream(fis) + val checksumFromFile = ois.readLong() + val data = ois.readUTF() + if(checksumFromFile == getChecksumValue(data)) { + data + } else { + info("Checksum match failed. Data in file is corrupted. Skipping content.") + null + } + } finally { + ois.close() + fis.close() + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala new file mode 100644 index 0000000..6491d09 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage + + +import java.io.File +import org.junit.{After, Before, Test} +import org.junit.Assert._ +import org.scalatest.mock.MockitoSugar +import org.mockito.Mockito._ +import org.mockito.Matchers._ +import scala.collection.JavaConversions + +import org.apache.samza.container.TaskName +import org.apache.samza.util.Util +import org.apache.samza.system._ +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.Partition + +class TestTaskStorageManager extends MockitoSugar { + + val store = "store1" + val loggedStore = "loggedStore1" + val taskName = new TaskName("testTask") + + @Before + def setupTestDirs() { + TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store , taskName) + .mkdirs() + TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + .mkdirs() + } + + @After + def tearDownTestDirs() { + Util.rm(TaskStorageManagerBuilder.defaultStoreBaseDir) + Util.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir) + } + + @Test + def testCleanBaseDirs() { + val checkFilePath1 = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName), "check") + checkFilePath1.createNewFile() + val checkFilePath2 = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "check") + checkFilePath2.createNewFile() + + val taskStorageManager = new TaskStorageManagerBuilder() + .addStore(store) + .addStore(loggedStore) + .build + + //Invoke test method + val cleanDirMethod = taskStorageManager + .getClass + .getDeclaredMethod("cleanBaseDirs", + new Array[java.lang.Class[_]](0):_*) + cleanDirMethod.setAccessible(true) + cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*) + + assertTrue("check file was found in store partition directory. Clean up failed!", !checkFilePath1.exists()) + assertTrue("check file was found in logged store partition directory. Clean up failed!", !checkFilePath2.exists()) + } + + @Test + def testCleanBaseDirsWithOffsetFileForLoggedStore() { + val checkFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET") + Util.writeDataToFile(checkFilePath, "100") + + val taskStorageManager = new TaskStorageManagerBuilder() + .addStore(loggedStore) + .build + + val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs", + new Array[java.lang.Class[_]](0):_*) + cleanDirMethod.setAccessible(true) + cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*) + + assertTrue("Offset file was not removed. Clean up failed!", !checkFilePath.exists()) + assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffset.get(new SystemStreamPartition("kafka", "testStream", new Partition(0)))) + } + + @Test + def testStopCreatesOffsetFileForLoggedStore() { + val partition = new Partition(0) + + val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET") + + val mockSystemAdmin = mock[SystemAdmin] + val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "100", "101"))))) + val myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata) + when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap) + + //Build TaskStorageManager + val taskStorageManager = new TaskStorageManagerBuilder() + .addStore(loggedStore) + .setSystemAdmin("kafka", mockSystemAdmin) + .setPartition(partition) + .build + + //Invoke test method + val stopMethod = taskStorageManager.getClass.getDeclaredMethod("stop", new Array[java.lang.Class[_]](0):_*) + stopMethod.setAccessible(true) + stopMethod.invoke(taskStorageManager, new Array[Object](0):_*) + + //Check conditions + assertTrue("Offset file doesn't exist!", offsetFilePath.exists()) + assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath)) + } +} + +object TaskStorageManagerBuilder { + val defaultStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "store") + val defaultLoggedStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore") +} + +class TaskStorageManagerBuilder extends MockitoSugar { + var taskStores: Map[String, StorageEngine] = Map() + var storeConsumers: Map[String, SystemConsumer] = Map() + var changeLogSystemStreams: Map[String, SystemStream] = Map() + val streamMetadataCache = mock[StreamMetadataCache] + var partition: Partition = new Partition(0) + var systemAdmins: Map[String, SystemAdmin] = Map("kafka" -> mock[SystemAdmin]) + var taskName: TaskName = new TaskName("testTask") + var storeBaseDir: File = TaskStorageManagerBuilder.defaultStoreBaseDir + var loggedStoreBaseDir: File = TaskStorageManagerBuilder.defaultLoggedStoreBaseDir + var changeLogStreamPartitions: Int = 1 + + def addStore(storeName: String): TaskStorageManagerBuilder = { + taskStores = taskStores ++ Map(storeName -> mock[StorageEngine]) + storeConsumers = storeConsumers ++ Map(storeName -> mock[SystemConsumer]) + changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new SystemStream("kafka", "testStream")) + this + } + + def setPartition(p: Partition) = { + partition = p + this + } + + def setChangeLogSystemStreams(storeName: String, systemStream: SystemStream) = { + changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> systemStream) + this + } + + def setSystemAdmin(system: String, systemAdmin: SystemAdmin) = { + systemAdmins = systemAdmins ++ Map(system -> systemAdmin) + this + } + + def setTaskName(tn: TaskName) = { + taskName = tn + this + } + + def build: TaskStorageManager = { + new TaskStorageManager( + taskName = taskName, + taskStores = taskStores, + storeConsumers = storeConsumers, + changeLogSystemStreams = changeLogSystemStreams, + changeLogStreamPartitions = changeLogStreamPartitions, + streamMetadataCache = streamMetadataCache, + storeBaseDir = storeBaseDir, + loggedStoreBaseDir = loggedStoreBaseDir, + partition = partition, + systemAdmins = systemAdmins + ) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index b75f440..ead6f94 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -19,22 +19,48 @@ package org.apache.samza.util -import org.apache.samza.Partition -import org.apache.samza.config.Config -import org.apache.samza.config.Config -import org.apache.samza.config.MapConfig -import org.apache.samza.container.TaskName -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.system.SystemFactory -import org.apache.samza.system.SystemFactory -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.Util._ +import java.io._ import org.junit.Assert._ import org.junit.Test -import scala.collection.JavaConversions._ -import scala.util.Random - class TestUtil { + + val data = "100" + val checksum = Util.getChecksumValue(data) + val file = new File(System.getProperty("java.io.tmpdir"), "test") + + @Test + def testWriteDataToFile() { + // Invoke test + Util.writeDataToFile(file, data) + + // Check that file exists + assertTrue("File was not created!", file.exists()) + val fis = new FileInputStream(file) + val ois = new ObjectInputStream(fis) + + // Check content of the file is as expected + assertEquals(checksum, ois.readLong()) + assertEquals(data, ois.readUTF()) + ois.close() + fis.close() + } + + @Test + def testReadDataFromFile() { + // Setup + val fos = new FileOutputStream(file) + val oos = new ObjectOutputStream(fos) + oos.writeLong(checksum) + oos.writeUTF(data) + oos.close() + fos.close() + + // Invoke test + val result = Util.readDataFromFile(file) + + // Check data returned + assertEquals(data, result) + + } } http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 1b44a51..dd20f17 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -68,7 +68,6 @@ object RocksDbKeyValueStore extends Logging { options.setMaxWriteBufferNumber(storeConfig.get("rocksdb.num.write.buffers", "3").toInt) options.setCreateIfMissing(true) - options.setErrorIfExists(true) options }
