Hey Lukas, Strange. Having a more detailed look at your logs.
Note: /vagrant is a synced folder, and I think it *does* persist between VM restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the state should be empty. Cheers, Chris On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys <lu...@doubledutch.me> wrote: > It starts out with a fresh FS. I deleted all the state, but the job still > fails on the first get. > > Lukas > > -----Original Message----- From: Chris Riccomini > Sent: Tuesday, February 17, 2015 12:12 PM > To: Chris Riccomini > Cc: dev@samza.apache.org > > Subject: Re: RocksDBException: IO error: directory: Invalid argument > > Hey Lukas, > > This happens every time even if I spin up a new VM. >> > > Ah I might have misunderstood. Are your VMs started with a fresh FS? You're > not using EBS or anything like that, are you? > > I want to see if you're getting hit by that setErrorIfExists line. If you: > > 1. Stop your job. > 2. Clear the state from the FS. > 3. Start your job. > > Does it work? > > Cheers, > Chris > > On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccom...@apache.org> > wrote: > > Hey Lukas, >> >> Could you try clearing out the state, and starting the job? >> >> Cheers, >> Chris >> >> On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me> >> wrote: >> >> This happens every time even if I spin up a new VM. Happens after a >>> restart as well. >>> >>> Lukas >>> >>> -----Original Message----- From: Chris Riccomini >>> Sent: Tuesday, February 17, 2015 11:01 AM >>> To: dev@samza.apache.org >>> Subject: Re: RocksDBException: IO error: directory: Invalid argument >>> >>> Hey Lukas, >>> >>> Interesting. Does this happen only after restarting your job? Or does it >>> happen the first time, as well? I'm wondering if this is the problem: >>> >>> options.setErrorIfExists(true) >>> >>> In RocksDbKeyValueStore.scala. I think this is set under the assumption >>> that the job is run in YARN. If you run locally, it seems to me that the >>> directory would continue to exist after a job is restarted. If you delete >>> your state directory, and restart your job, does the problem temporarily >>> go >>> away until a subsequent restart happens? >>> >>> Cheers, >>> Chris >>> >>> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me> >>> wrote: >>> >>> Hi Chris, >>> >>>> >>>> 1. We're running locally using ProcessJobFactory >>>> 2. CentOS 7 x86_64 >>>> 3. >>>> startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db >>>> engaged-users.log: https://gist.github.com/ >>>> imbusy/0b3d264a40ddf34ab8e7 >>>> engaged-users.properties: https://gist.github.com/ >>>> imbusy/d0019db29d7b68c60bfc >>>> >>>> Also note that the properties file sets the default offset to oldest, >>>> but the log file says that it's setting the offset to largest: >>>> "2015-02-17 >>>> 18:46:32 GetOffset [INFO] Got reset of type largest." >>>> >>>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got >>>> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state" >>>> I checked the directory and it actually exists: >>>> >>>> du -h /vagrant/SamzaJobs/deploy/samza/state >>>> >>>> 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0 >>>> 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1 >>>> 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2 >>>> 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3 >>>> 36K /vagrant/SamzaJobs/deploy/samza/state/engaged-store >>>> 36K /vagrant/SamzaJobs/deploy/samza/state >>>> >>>> Lukas >>>> >>>> -----Original Message----- From: Chris Riccomini >>>> Sent: Monday, February 16, 2015 5:53 PM >>>> To: dev@samza.apache.org >>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument >>>> >>>> >>>> Hey Lukas, >>>> >>>> It looks like the exception is actually thrown on get, not put: >>>> >>>> at org.apache.samza.storage.kv.KeyValueStorageEngine.get( >>>> KeyValueStorageEngine.scala:44) >>>> >>>> 1. Are you running your job under YARN, or as a local job >>>> (ThreadJobFactory/ProcessJobFactory)? >>>> 2. What OS are you running on? >>>> 3. Could post a fully copy of your logs somewhere (github gist, >>>> pasteboard, >>>> or something)? >>>> 4. Also, what does this line say in your logs: >>>> >>>> info("Got storage engine base directory: %s" format storeBaseDir) >>>> >>>> It sounds like something is getting messed up with the directory where >>>> the >>>> RocksDB store is trying to keep its data. >>>> >>>> Cheers, >>>> Chris >>>> >>>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me> >>>> wrote: >>>> >>>> Hello, >>>> >>>> >>>>> I was setting up the key-value storage engine in Samza and ran into an >>>>> exception when querying the data. >>>>> >>>>> I added these properties to the config: >>>>> >>>>> >>>>> stores.engaged-store.factory=org.apache.samza.storage.kv. >>>>> RocksDbKeyValueStorageEngineFactory >>>>> stores.engaged-store.changelog=kafka.engaged-store-changelog >>>>> # a custom data type with an appropriate Serde >>>>> stores.engaged-store.key.serde=UserAppPair >>>>> # wrote a Serde for Long using ByteBuffer >>>>> stores.engaged-store.msg.serde=Long >>>>> >>>>> I have no trouble initializing the storage engine with: >>>>> >>>>> val store = >>>>> context.getStore("engaged-store").asInstanceOf[ >>>>> KeyValueStore[UserAppPair, >>>>> Long]]; >>>>> >>>>> but when I query by the key when processing messages, it’s throwing an >>>>> exception: >>>>> >>>>> val key = new UserAppPair(userId, appId); >>>>> val value = store.get(key); >>>>> >>>>> Here’s the log: >>>>> >>>>> 2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for >>>>> localhost:9092 >>>>> 2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received >>>>> an >>>>> invalid or empty offset None for [Follows,0]. Attempting to use Kafka's >>>>> auto.offset.reset setting. This can result in data loss if processing >>>>> continues. >>>>> 2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset >>>>> is >>>>> defined for topic Follows >>>>> 2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest. >>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for >>>>> localhost:9092 >>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop. >>>>> 2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key >>>>> in >>>>> rocksdb. >>>>> 2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in >>>>> process >>>>> loop. >>>>> org.rocksdb.RocksDBException: IO error: directory: Invalid argument >>>>> at org.rocksdb.RocksDB.open(Native Method) >>>>> at org.rocksdb.RocksDB.open(RocksDB.java:133) >>>>> at >>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( >>>>> RocksDbKeyValueStore.scala:85) >>>>> at >>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db( >>>>> RocksDbKeyValueStore.scala:85) >>>>> at >>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >>>>> RocksDbKeyValueStore.scala:92) >>>>> at >>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >>>>> RocksDbKeyValueStore.scala:80) >>>>> at >>>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41) >>>>> at >>>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get( >>>>> SerializedKeyValueStore.scala:36) >>>>> at >>>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90) >>>>> at >>>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get( >>>>> NullSafeKeyValueStore.scala:36) >>>>> at >>>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get( >>>>> KeyValueStorageEngine.scala:44) >>>>> at >>>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged( >>>>> EngagedUsersTask.scala:66) >>>>> at >>>>> me.doubledutch.analytics.task.EngagedUsersTask.process( >>>>> EngagedUsersTask.scala:100) >>>>> at >>>>> org.apache.samza.container.TaskInstance$$anonfun$process$ >>>>> 1.apply$mcV$sp(TaskInstance.scala:137) >>>>> at >>>>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle( >>>>> TaskInstanceExceptionHandler.scala:54) >>>>> at >>>>> org.apache.samza.container.TaskInstance.process( >>>>> TaskInstance.scala:136) >>>>> at >>>>> org.apache.samza.container.RunLoop$$anonfun$process$2. >>>>> apply(RunLoop.scala:93) >>>>> at >>>>> org.apache.samza.util.TimerUtils$class.updateTimer( >>>>> TimerUtils.scala:37) >>>>> at org.apache.samza.container.RunLoop.updateTimer(RunLoop. >>>>> scala:36) >>>>> at org.apache.samza.container.RunLoop.process(RunLoop.scala: >>>>> 79) >>>>> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) >>>>> at >>>>> org.apache.samza.container.SamzaContainer.run( >>>>> SamzaContainer.scala:556) >>>>> at >>>>> org.apache.samza.container.SamzaContainer$.safeMain( >>>>> SamzaContainer.scala:108) >>>>> at >>>>> org.apache.samza.container.SamzaContainer$.main( >>>>> SamzaContainer.scala:87) >>>>> at >>>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down. >>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer >>>>> multiplexer. >>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy >>>>> for >>>>> localhost:9092 >>>>> 2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due >>>>> to >>>>> socket error: null >>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt >>>>> exception in broker proxy thread. >>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to >>>>> interrupt. >>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer >>>>> multiplexer. >>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task >>>>> instance >>>>> stream tasks. >>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task >>>>> instance >>>>> stores. >>>>> >>>>> >>>>> Same exception is thrown if I try to put a value in RocksDB. Has anyone >>>>> run into this problem before or has any pointers into solving it? >>>>> >>>>> Lukas >>>>> >>>>> >>>>> >>>> >>>> >>> >> >