Hey Lukas, It looks like the exception is actually thrown on get, not put:
at org.apache.samza.storage.kv.KeyValueStorageEngine.get( KeyValueStorageEngine.scala:44) 1. Are you running your job under YARN, or as a local job (ThreadJobFactory/ProcessJobFactory)? 2. What OS are you running on? 3. Could post a fully copy of your logs somewhere (github gist, pasteboard, or something)? 4. Also, what does this line say in your logs: info("Got storage engine base directory: %s" format storeBaseDir) It sounds like something is getting messed up with the directory where the RocksDB store is trying to keep its data. Cheers, Chris On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me> wrote: > Hello, > > I was setting up the key-value storage engine in Samza and ran into an > exception when querying the data. > > I added these properties to the config: > > > stores.engaged-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory > stores.engaged-store.changelog=kafka.engaged-store-changelog > # a custom data type with an appropriate Serde > stores.engaged-store.key.serde=UserAppPair > # wrote a Serde for Long using ByteBuffer > stores.engaged-store.msg.serde=Long > > I have no trouble initializing the storage engine with: > > val store = > context.getStore("engaged-store").asInstanceOf[KeyValueStore[UserAppPair, > Long]]; > > but when I query by the key when processing messages, it’s throwing an > exception: > > val key = new UserAppPair(userId, appId); > val value = store.get(key); > > Here’s the log: > > 2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for > localhost:9092 > 2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an > invalid or empty offset None for [Follows,0]. Attempting to use Kafka's > auto.offset.reset setting. This can result in data loss if processing > continues. > 2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is > defined for topic Follows > 2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest. > 2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for > localhost:9092 > 2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop. > 2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in > rocksdb. > 2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process > loop. > org.rocksdb.RocksDBException: IO error: directory: Invalid argument > at org.rocksdb.RocksDB.open(Native Method) > at org.rocksdb.RocksDB.open(RocksDB.java:133) > at > org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(RocksDbKeyValueStore.scala:85) > at > org.apache.samza.storage.kv.RocksDbKeyValueStore.db(RocksDbKeyValueStore.scala:85) > at > org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:92) > at > org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:80) > at > org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41) > at > org.apache.samza.storage.kv.SerializedKeyValueStore.get(SerializedKeyValueStore.scala:36) > at > org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90) > at > org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36) > at > org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44) > at > me.doubledutch.analytics.task.EngagedUsersTask.engaged(EngagedUsersTask.scala:66) > at > me.doubledutch.analytics.task.EngagedUsersTask.process(EngagedUsersTask.scala:100) > at > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:137) > at > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) > at > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136) > at > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:93) > at > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) > at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) > at > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) > at > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) > at > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down. > 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer > multiplexer. > 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for > localhost:9092 > 2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to > socket error: null > 2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt > exception in broker proxy thread. > 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt. > 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer > multiplexer. > 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance > stream tasks. > 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance > stores. > > > Same exception is thrown if I try to put a value in RocksDB. Has anyone > run into this problem before or has any pointers into solving it? > > Lukas