Hello,

I was setting up the key-value storage engine in Samza and ran into an 
exception when querying the data. 

I added these properties to the config:

    
stores.engaged-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
    stores.engaged-store.changelog=kafka.engaged-store-changelog
    # a custom data type with an appropriate Serde
    stores.engaged-store.key.serde=UserAppPair
    # wrote a Serde for Long using ByteBuffer
    stores.engaged-store.msg.serde=Long

I have no trouble initializing the storage engine with:

    val store = 
context.getStore("engaged-store").asInstanceOf[KeyValueStore[UserAppPair, 
Long]];

but when I query by the key when processing messages, it’s throwing an 
exception:

    val key = new UserAppPair(userId, appId);
    val value = store.get(key);

Here’s the log:

    2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for 
localhost:9092
    2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an 
invalid or empty offset None for [Follows,0]. Attempting to use Kafka's 
auto.offset.reset setting. This can result in data loss if processing continues.
    2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is 
defined for topic Follows
    2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
    2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for 
localhost:9092
    2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
    2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in 
rocksdb.
    2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process loop.
    org.rocksdb.RocksDBException: IO error: directory: Invalid argument
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:133)
        at 
org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(RocksDbKeyValueStore.scala:85)
        at 
org.apache.samza.storage.kv.RocksDbKeyValueStore.db(RocksDbKeyValueStore.scala:85)
        at 
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:92)
        at 
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:80)
        at org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
        at 
org.apache.samza.storage.kv.SerializedKeyValueStore.get(SerializedKeyValueStore.scala:36)
        at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
        at 
org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
        at 
org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
        at 
me.doubledutch.analytics.task.EngagedUsersTask.engaged(EngagedUsersTask.scala:66)
        at 
me.doubledutch.analytics.task.EngagedUsersTask.process(EngagedUsersTask.scala:100)
        at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:137)
        at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
        at 
org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
        at 
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:93)
        at 
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
        at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
        at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
        at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer 
multiplexer.
    2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for 
localhost:9092
    2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to 
socket error: null
    2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt exception in 
broker proxy thread.
    2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer 
multiplexer.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance 
stream tasks.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance 
stores.


Same exception is thrown if I try to put a value in RocksDB. Has anyone run 
into this problem before or has any pointers into solving it?

Lukas

Reply via email to