Hey Lukas, Interesting. Does this happen only after restarting your job? Or does it happen the first time, as well? I'm wondering if this is the problem:
options.setErrorIfExists(true) In RocksDbKeyValueStore.scala. I think this is set under the assumption that the job is run in YARN. If you run locally, it seems to me that the directory would continue to exist after a job is restarted. If you delete your state directory, and restart your job, does the problem temporarily go away until a subsequent restart happens? Cheers, Chris On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me> wrote: > Hi Chris, > > 1. We're running locally using ProcessJobFactory > 2. CentOS 7 x86_64 > 3. > startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db > engaged-users.log: https://gist.github.com/imbusy/0b3d264a40ddf34ab8e7 > engaged-users.properties: https://gist.github.com/ > imbusy/d0019db29d7b68c60bfc > > Also note that the properties file sets the default offset to oldest, > but the log file says that it's setting the offset to largest: "2015-02-17 > 18:46:32 GetOffset [INFO] Got reset of type largest." > > 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got > storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state" > I checked the directory and it actually exists: > > du -h /vagrant/SamzaJobs/deploy/samza/state > > 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0 > 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1 > 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2 > 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3 > 36K /vagrant/SamzaJobs/deploy/samza/state/engaged-store > 36K /vagrant/SamzaJobs/deploy/samza/state > > Lukas > > -----Original Message----- From: Chris Riccomini > Sent: Monday, February 16, 2015 5:53 PM > To: dev@samza.apache.org > Subject: Re: RocksDBException: IO error: directory: Invalid argument > > > Hey Lukas, > > It looks like the exception is actually thrown on get, not put: > > at org.apache.samza.storage.kv.KeyValueStorageEngine.get( > KeyValueStorageEngine.scala:44) > > 1. Are you running your job under YARN, or as a local job > (ThreadJobFactory/ProcessJobFactory)? > 2. What OS are you running on? > 3. Could post a fully copy of your logs somewhere (github gist, pasteboard, > or something)? > 4. Also, what does this line say in your logs: > > info("Got storage engine base directory: %s" format storeBaseDir) > > It sounds like something is getting messed up with the directory where the > RocksDB store is trying to keep its data. > > Cheers, > Chris > > On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me> > wrote: > > Hello, >> >> I was setting up the key-value storage engine in Samza and ran into an >> exception when querying the data. >> >> I added these properties to the config: >> >> >> stores.engaged-store.factory=org.apache.samza.storage.kv. >> RocksDbKeyValueStorageEngineFactory >> stores.engaged-store.changelog=kafka.engaged-store-changelog >> # a custom data type with an appropriate Serde >> stores.engaged-store.key.serde=UserAppPair >> # wrote a Serde for Long using ByteBuffer >> stores.engaged-store.msg.serde=Long >> >> I have no trouble initializing the storage engine with: >> >> val store = >> context.getStore("engaged-store").asInstanceOf[KeyValueStore[UserAppPair, >> Long]]; >> >> but when I query by the key when processing messages, it’s throwing an >> exception: >> >> val key = new UserAppPair(userId, appId); >> val value = store.get(key); >> >> Here’s the log: >> >> 2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for >> localhost:9092 >> 2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an >> invalid or empty offset None for [Follows,0]. Attempting to use Kafka's >> auto.offset.reset setting. This can result in data loss if processing >> continues. >> 2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is >> defined for topic Follows >> 2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest. >> 2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for >> localhost:9092 >> 2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop. >> 2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in >> rocksdb. >> 2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process >> loop. >> org.rocksdb.RocksDBException: IO error: directory: Invalid argument >> at org.rocksdb.RocksDB.open(Native Method) >> at org.rocksdb.RocksDB.open(RocksDB.java:133) >> at >> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( >> RocksDbKeyValueStore.scala:85) >> at >> org.apache.samza.storage.kv.RocksDbKeyValueStore.db( >> RocksDbKeyValueStore.scala:85) >> at >> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >> RocksDbKeyValueStore.scala:92) >> at >> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >> RocksDbKeyValueStore.scala:80) >> at >> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41) >> at >> org.apache.samza.storage.kv.SerializedKeyValueStore.get( >> SerializedKeyValueStore.scala:36) >> at >> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90) >> at >> org.apache.samza.storage.kv.NullSafeKeyValueStore.get( >> NullSafeKeyValueStore.scala:36) >> at >> org.apache.samza.storage.kv.KeyValueStorageEngine.get( >> KeyValueStorageEngine.scala:44) >> at >> me.doubledutch.analytics.task.EngagedUsersTask.engaged( >> EngagedUsersTask.scala:66) >> at >> me.doubledutch.analytics.task.EngagedUsersTask.process( >> EngagedUsersTask.scala:100) >> at >> org.apache.samza.container.TaskInstance$$anonfun$process$ >> 1.apply$mcV$sp(TaskInstance.scala:137) >> at >> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle( >> TaskInstanceExceptionHandler.scala:54) >> at >> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136) >> at >> org.apache.samza.container.RunLoop$$anonfun$process$2. >> apply(RunLoop.scala:93) >> at >> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >> at org.apache.samza.container.RunLoop.updateTimer(RunLoop. >> scala:36) >> at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) >> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) >> at >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) >> at >> org.apache.samza.container.SamzaContainer$.safeMain( >> SamzaContainer.scala:108) >> at >> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) >> at >> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down. >> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer >> multiplexer. >> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for >> localhost:9092 >> 2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to >> socket error: null >> 2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt >> exception in broker proxy thread. >> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt. >> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer >> multiplexer. >> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance >> stream tasks. >> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance >> stores. >> >> >> Same exception is thrown if I try to put a value in RocksDB. Has anyone >> run into this problem before or has any pointers into solving it? >> >> Lukas >> > >