Hello, I was setting up the key-value storage engine in Samza and ran into an exception when querying the data.
I added these properties to the config: stores.engaged-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory stores.engaged-store.changelog=kafka.engaged-store-changelog # a custom data type with an appropriate Serde stores.engaged-store.key.serde=UserAppPair # wrote a Serde for Long using ByteBuffer stores.engaged-store.msg.serde=Long I have no trouble initializing the storage engine with: val store = context.getStore("engaged-store").asInstanceOf[KeyValueStore[UserAppPair, Long]]; but when I query by the key when processing messages, it’s throwing an exception: val key = new UserAppPair(userId, appId); val value = store.get(key); Here’s the log: 2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for localhost:9092 2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an invalid or empty offset None for [Follows,0]. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues. 2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is defined for topic Follows 2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest. 2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for localhost:9092 2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop. 2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in rocksdb. 2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process loop. org.rocksdb.RocksDBException: IO error: directory: Invalid argument at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:133) at org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(RocksDbKeyValueStore.scala:85) at org.apache.samza.storage.kv.RocksDbKeyValueStore.db(RocksDbKeyValueStore.scala:85) at org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:92) at org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:80) at org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41) at org.apache.samza.storage.kv.SerializedKeyValueStore.get(SerializedKeyValueStore.scala:36) at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90) at org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36) at org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44) at me.doubledutch.analytics.task.EngagedUsersTask.engaged(EngagedUsersTask.scala:66) at me.doubledutch.analytics.task.EngagedUsersTask.process(EngagedUsersTask.scala:100) at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:137) at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136) at org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:93) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down. 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer multiplexer. 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for localhost:9092 2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to socket error: null 2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt exception in broker proxy thread. 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt. 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer multiplexer. 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance stream tasks. 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance stores. Same exception is thrown if I try to put a value in RocksDB. Has anyone run into this problem before or has any pointers into solving it? Lukas