Hey Lukas, Could you try clearing out the state, and starting the job?
Cheers, Chris On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me> wrote: > This happens every time even if I spin up a new VM. Happens after a > restart as well. > > Lukas > > -----Original Message----- From: Chris Riccomini > Sent: Tuesday, February 17, 2015 11:01 AM > To: dev@samza.apache.org > Subject: Re: RocksDBException: IO error: directory: Invalid argument > > Hey Lukas, > > Interesting. Does this happen only after restarting your job? Or does it > happen the first time, as well? I'm wondering if this is the problem: > > options.setErrorIfExists(true) > > In RocksDbKeyValueStore.scala. I think this is set under the assumption > that the job is run in YARN. If you run locally, it seems to me that the > directory would continue to exist after a job is restarted. If you delete > your state directory, and restart your job, does the problem temporarily go > away until a subsequent restart happens? > > Cheers, > Chris > > On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me> > wrote: > > Hi Chris, >> >> 1. We're running locally using ProcessJobFactory >> 2. CentOS 7 x86_64 >> 3. >> startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db >> engaged-users.log: https://gist.github.com/imbusy/0b3d264a40ddf34ab8e7 >> engaged-users.properties: https://gist.github.com/ >> imbusy/d0019db29d7b68c60bfc >> >> Also note that the properties file sets the default offset to oldest, >> but the log file says that it's setting the offset to largest: "2015-02-17 >> 18:46:32 GetOffset [INFO] Got reset of type largest." >> >> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got >> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state" >> I checked the directory and it actually exists: >> >> du -h /vagrant/SamzaJobs/deploy/samza/state >> >> 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0 >> 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1 >> 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2 >> 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3 >> 36K /vagrant/SamzaJobs/deploy/samza/state/engaged-store >> 36K /vagrant/SamzaJobs/deploy/samza/state >> >> Lukas >> >> -----Original Message----- From: Chris Riccomini >> Sent: Monday, February 16, 2015 5:53 PM >> To: dev@samza.apache.org >> Subject: Re: RocksDBException: IO error: directory: Invalid argument >> >> >> Hey Lukas, >> >> It looks like the exception is actually thrown on get, not put: >> >> at org.apache.samza.storage.kv.KeyValueStorageEngine.get( >> KeyValueStorageEngine.scala:44) >> >> 1. Are you running your job under YARN, or as a local job >> (ThreadJobFactory/ProcessJobFactory)? >> 2. What OS are you running on? >> 3. Could post a fully copy of your logs somewhere (github gist, >> pasteboard, >> or something)? >> 4. Also, what does this line say in your logs: >> >> info("Got storage engine base directory: %s" format storeBaseDir) >> >> It sounds like something is getting messed up with the directory where the >> RocksDB store is trying to keep its data. >> >> Cheers, >> Chris >> >> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me> >> wrote: >> >> Hello, >> >>> >>> I was setting up the key-value storage engine in Samza and ran into an >>> exception when querying the data. >>> >>> I added these properties to the config: >>> >>> >>> stores.engaged-store.factory=org.apache.samza.storage.kv. >>> RocksDbKeyValueStorageEngineFactory >>> stores.engaged-store.changelog=kafka.engaged-store-changelog >>> # a custom data type with an appropriate Serde >>> stores.engaged-store.key.serde=UserAppPair >>> # wrote a Serde for Long using ByteBuffer >>> stores.engaged-store.msg.serde=Long >>> >>> I have no trouble initializing the storage engine with: >>> >>> val store = >>> context.getStore("engaged-store").asInstanceOf[ >>> KeyValueStore[UserAppPair, >>> Long]]; >>> >>> but when I query by the key when processing messages, it’s throwing an >>> exception: >>> >>> val key = new UserAppPair(userId, appId); >>> val value = store.get(key); >>> >>> Here’s the log: >>> >>> 2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for >>> localhost:9092 >>> 2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an >>> invalid or empty offset None for [Follows,0]. Attempting to use Kafka's >>> auto.offset.reset setting. This can result in data loss if processing >>> continues. >>> 2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is >>> defined for topic Follows >>> 2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest. >>> 2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for >>> localhost:9092 >>> 2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop. >>> 2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in >>> rocksdb. >>> 2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in >>> process >>> loop. >>> org.rocksdb.RocksDBException: IO error: directory: Invalid argument >>> at org.rocksdb.RocksDB.open(Native Method) >>> at org.rocksdb.RocksDB.open(RocksDB.java:133) >>> at >>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( >>> RocksDbKeyValueStore.scala:85) >>> at >>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db( >>> RocksDbKeyValueStore.scala:85) >>> at >>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >>> RocksDbKeyValueStore.scala:92) >>> at >>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >>> RocksDbKeyValueStore.scala:80) >>> at >>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41) >>> at >>> org.apache.samza.storage.kv.SerializedKeyValueStore.get( >>> SerializedKeyValueStore.scala:36) >>> at >>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90) >>> at >>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get( >>> NullSafeKeyValueStore.scala:36) >>> at >>> org.apache.samza.storage.kv.KeyValueStorageEngine.get( >>> KeyValueStorageEngine.scala:44) >>> at >>> me.doubledutch.analytics.task.EngagedUsersTask.engaged( >>> EngagedUsersTask.scala:66) >>> at >>> me.doubledutch.analytics.task.EngagedUsersTask.process( >>> EngagedUsersTask.scala:100) >>> at >>> org.apache.samza.container.TaskInstance$$anonfun$process$ >>> 1.apply$mcV$sp(TaskInstance.scala:137) >>> at >>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle( >>> TaskInstanceExceptionHandler.scala:54) >>> at >>> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136) >>> at >>> org.apache.samza.container.RunLoop$$anonfun$process$2. >>> apply(RunLoop.scala:93) >>> at >>> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >>> at org.apache.samza.container.RunLoop.updateTimer(RunLoop. >>> scala:36) >>> at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) >>> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) >>> at >>> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) >>> at >>> org.apache.samza.container.SamzaContainer$.safeMain( >>> SamzaContainer.scala:108) >>> at >>> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) >>> at >>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down. >>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer >>> multiplexer. >>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for >>> localhost:9092 >>> 2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due >>> to >>> socket error: null >>> 2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt >>> exception in broker proxy thread. >>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to >>> interrupt. >>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer >>> multiplexer. >>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance >>> stream tasks. >>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance >>> stores. >>> >>> >>> Same exception is thrown if I try to put a value in RocksDB. Has anyone >>> run into this problem before or has any pointers into solving it? >>> >>> Lukas >>> >>> >> >> >