Hey Lukas, I agree. The error was failing when trying to fsync the directory. Given that the directory is a synced folder, maybe there's something weird going on with the FS that RocksDB doesn't like.
Cheers, Chris On Wed, Feb 18, 2015 at 11:41 AM, Lukas Steiblys <lu...@doubledutch.me> wrote: > I tried running everything from the /vagrant directory directly and it > fails as well so this might actually be a synced folder issue. > > Lukas > > From: Lukas Steiblys > Sent: Wednesday, February 18, 2015 1:46 AM > To: dev@samza.apache.org > Subject: Re: RocksDBException: IO error: directory: Invalid argument > > The symlink is to the synced folder /vagrant from the running user's home > directory. That's essentially where all the project files are and where the > job is run from. > > There are a couple of hardcoded paths in the setup so it might not be easy > to run the job from /vagrant directly, but I can try. > > All other Samza jobs I've built so far work fine with this setup. > > > Lukas > > On Tuesday, February 17, 2015, Chris Riccomini <criccom...@apache.org> > wrote: > > Hey Lukas, > > > I made a copy of the synced folder instead of having a symbolic link > and > that also solved the problem > > It sounds like you're having some sort of permission issue or symbolic > link > issue. Where is the sym link pointing from/to? I just want to rule out > the > case that RocksDB JNI or Samza aren't working with state stores that > have a > symlinked directory. > > Cheers, > Chris > > On Tue, Feb 17, 2015 at 3:52 PM, Lukas Steiblys <lu...@doubledutch.me> > wrote: > > > I made a copy of the synced folder instead of having a symbolic link > and > > that also solved the problem, but it's not an ideal solution. > > > > Lukas > > > > -----Original Message----- From: Lukas Steiblys > > Sent: Tuesday, February 17, 2015 3:25 PM > > > > To: dev@samza.apache.org > > Subject: Re: RocksDBException: IO error: directory: Invalid argument > > > > I deployed it to one of our VMs in Rackspace and it worked fine. > > > > Lukas > > > > -----Original Message----- From: Ruslan Khafizov > > Sent: Tuesday, February 17, 2015 3:11 PM > > To: dev@samza.apache.org > > Subject: Re: RocksDBException: IO error: directory: Invalid argument > > > > On Wed, Feb 18, 2015 at 5:37 AM, Lukas Steiblys <lu...@doubledutch.me> > > wrote: > > > >> 1. I'm running it as another user, but in the user's home directory > so it > >> has no problem writing or reading files. > >> 2. See below. > >> 3. I'm running Windows on my machine so I don't think I'll be able to > run > >> it > >> outside the VM. > >> > > Can you try to run it inside VM filesystem? Without using vagrant sync > > folder. > > Just to rule out guest/host sync issues. > > > >> > >> I switched to root user, did "chmod -R a+rwx /vagrant", deleted > "deploy" > >> folder, ran the job as root as well and it still failed. However, > there > >> was > >> a slight change in the error message in stderr: > >> > >> Exception in thread "main" org.rocksdb.RocksDBException: Invalid > argument: > >> /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0: > exists > >> (error_if_exists is true) > >> at org.rocksdb.RocksDB.open(Native Method) > >> at org.rocksdb.RocksDB.open(RocksDB.java:133) > >> at > >> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( > >> RocksDbKeyValueStore.scala:85) > >> > >> Even though the deploy folder was deleted before the job was run, it's > >> failing on the check? > >> > >> Lukas > >> > >> -----Original Message----- From: Chris Riccomini > >> Sent: Tuesday, February 17, 2015 1:02 PM > >> > >> To: dev@samza.apache.org > >> Cc: Chris Riccomini > >> Subject: Re: RocksDBException: IO error: directory: Invalid argument > >> > >> Hey Lucas, > >> > >> I'm wondering if this is a filesystem permission issue? This > exception: > >> > >> org.rocksdb.RocksDBException: IO error: directory: Invalid argument > >> > >> Looks like it's coming from this line: > >> > >> > >> > https://github.com/facebook/rocksdb/blob/868bfa40336b99005beb9f4fc9cf2a > >> cc0d330ae1/util/env_posix.cc#L1016 > >> > >> Which seems to be trying to fsync data to disk. According to: > >> > >> http://docs.vagrantup.com/v2/synced-folders/basic_usage.html > >> > >> It sounds like the sync folder is set to be owned by the default > Vagrant > >> SSH user. > >> > >> 1. Is this the user that you're running the Samza job as? > >> 2. Could you check the file permissions for /vagrant and all of its > >> subdirectories, and make sure that they match up with what you expect > (+rw > >> for the Samza job's user)? > >> 3. If you try running the job outside of the VM, does it work? > >> > >> Cheers, > >> Chris > >> > >> On Tue, Feb 17, 2015 at 12:57 PM, Lukas Steiblys < > lu...@doubledutch.me> > >> wrote: > >> > >> Yeah, I made sure the state is clean. This is the first time I'm > trying > >>> to > >>> use RocksDB. I haven't tried LevelDB yet though. > >>> > >>> Lukas > >>> > >>> -----Original Message----- From: Chris Riccomini > >>> Sent: Tuesday, February 17, 2015 12:34 PM > >>> To: dev@samza.apache.org > >>> Cc: Chris Riccomini > >>> > >>> Subject: Re: RocksDBException: IO error: directory: Invalid argument > >>> > >>> Hey Lukas, > >>> > >>> Strange. Having a more detailed look at your logs. > >>> > >>> Note: /vagrant is a synced folder, and I think it *does* persist > between > >>> VM > >>> restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the > >>> state > >>> should be empty. > >>> > >>> Cheers, > >>> Chris > >>> > >>> On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys < > lu...@doubledutch.me> > >>> wrote: > >>> > >>> It starts out with a fresh FS. I deleted all the state, but the job > >>> still > >>> > >>>> > >>>> fails on the first get. > >>>> > >>>> Lukas > >>>> > >>>> -----Original Message----- From: Chris Riccomini > >>>> Sent: Tuesday, February 17, 2015 12:12 PM > >>>> To: Chris Riccomini > >>>> Cc: dev@samza.apache.org > >>>> > >>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument > >>>> > >>>> Hey Lukas, > >>>> > >>>> This happens every time even if I spin up a new VM. > >>>> > >>>> > >>>>> > >>>>> Ah I might have misunderstood. Are your VMs started with a fresh > FS? > >>>> You're > >>>> not using EBS or anything like that, are you? > >>>> > >>>> I want to see if you're getting hit by that setErrorIfExists line. > If > >>>> you: > >>>> > >>>> 1. Stop your job. > >>>> 2. Clear the state from the FS. > >>>> 3. Start your job. > >>>> > >>>> Does it work? > >>>> > >>>> Cheers, > >>>> Chris > >>>> > >>>> On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini < > >>>> criccom...@apache.org> > >>>> wrote: > >>>> > >>>> Hey Lukas, > >>>> > >>>> > >>>>> Could you try clearing out the state, and starting the job? > >>>>> > >>>>> Cheers, > >>>>> Chris > >>>>> > >>>>> On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys < > lu...@doubledutch.me > >>>>> > > >>>>> wrote: > >>>>> > >>>>> This happens every time even if I spin up a new VM. Happens after > a > >>>>> > >>>>> restart as well. > >>>>>> > >>>>>> Lukas > >>>>>> > >>>>>> -----Original Message----- From: Chris Riccomini > >>>>>> Sent: Tuesday, February 17, 2015 11:01 AM > >>>>>> To: dev@samza.apache.org > >>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid > argument > >>>>>> > >>>>>> Hey Lukas, > >>>>>> > >>>>>> Interesting. Does this happen only after restarting your job? Or > does > >>>>>> it > >>>>>> happen the first time, as well? I'm wondering if this is the > problem: > >>>>>> > >>>>>> options.setErrorIfExists(true) > >>>>>> > >>>>>> In RocksDbKeyValueStore.scala. I think this is set under the > >>>>>> assumption > >>>>>> that the job is run in YARN. If you run locally, it seems to me > that > >>>>>> the > >>>>>> directory would continue to exist after a job is restarted. If you > >>>>>> delete > >>>>>> your state directory, and restart your job, does the problem > >>>>>> temporarily > >>>>>> go > >>>>>> away until a subsequent restart happens? > >>>>>> > >>>>>> Cheers, > >>>>>> Chris > >>>>>> > >>>>>> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys < > >>>>>> lu...@doubledutch.me> > >>>>>> wrote: > >>>>>> > >>>>>> Hi Chris, > >>>>>> > >>>>>> > >>>>>> 1. We're running locally using ProcessJobFactory > >>>>>>> 2. CentOS 7 x86_64 > >>>>>>> 3. > >>>>>>> startup.log: > https://gist.github.com/imbusy/0592a9c52a96fcce48db > >>>>>>> engaged-users.log: https://gist.github.com/ > >>>>>>> imbusy/0b3d264a40ddf34ab8e7 > >>>>>>> engaged-users.properties: https://gist.github.com/ > >>>>>>> imbusy/d0019db29d7b68c60bfc > >>>>>>> > >>>>>>> Also note that the properties file sets the default offset to > >>>>>>> oldest, > >>>>>>> but the log file says that it's setting the offset to largest: > >>>>>>> "2015-02-17 > >>>>>>> 18:46:32 GetOffset [INFO] Got reset of type largest." > >>>>>>> > >>>>>>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ > [INFO] Got > >>>>>>> storage engine base directory: /vagrant/SamzaJobs/deploy/ > >>>>>>> samza/state" > >>>>>>> I checked the directory and it actually exists: > >>>>>>> > >>>>>>> du -h /vagrant/SamzaJobs/deploy/samza/state > >>>>>>> > >>>>>>> 16K > /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition > >>>>>>> 0 > >>>>>>> 0 > /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1 > >>>>>>> 0 > /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2 > >>>>>>> 16K > /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition > >>>>>>> 3 > >>>>>>> 36K /vagrant/SamzaJobs/deploy/samza/state/engaged-store > >>>>>>> 36K /vagrant/SamzaJobs/deploy/samza/state > >>>>>>> > >>>>>>> Lukas > >>>>>>> > >>>>>>> -----Original Message----- From: Chris Riccomini > >>>>>>> Sent: Monday, February 16, 2015 5:53 PM > >>>>>>> To: dev@samza.apache.org > >>>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid > argument > >>>>>>> > >>>>>>> > >>>>>>> Hey Lukas, > >>>>>>> > >>>>>>> It looks like the exception is actually thrown on get, not put: > >>>>>>> > >>>>>>> at > org.apache.samza.storage.kv.KeyValueStorageEngine.get( > >>>>>>> KeyValueStorageEngine.scala:44) > >>>>>>> > >>>>>>> 1. Are you running your job under YARN, or as a local job > >>>>>>> (ThreadJobFactory/ProcessJobFactory)? > >>>>>>> 2. What OS are you running on? > >>>>>>> 3. Could post a fully copy of your logs somewhere (github gist, > >>>>>>> pasteboard, > >>>>>>> or something)? > >>>>>>> 4. Also, what does this line say in your logs: > >>>>>>> > >>>>>>> info("Got storage engine base directory: %s" format > storeBaseDir) > >>>>>>> > >>>>>>> It sounds like something is getting messed up with the directory > >>>>>>> where > >>>>>>> the > >>>>>>> RocksDB store is trying to keep its data. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Chris > >>>>>>> > >>>>>>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys < > >>>>>>> lu...@doubledutch.me> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hello, > >>>>>>> > >>>>>>> > >>>>>>> I was setting up the key-value storage engine in Samza and ran > into > >>>>>>> an > >>>>>>> > >>>>>>>> > >>>>>>>> exception when querying the data. > >>>>>>>> > >>>>>>>> I added these properties to the config: > >>>>>>>> > >>>>>>>> > >>>>>>>> stores.engaged-store.factory=org.apache.samza.storage.kv. > >>>>>>>> RocksDbKeyValueStorageEngineFactory > >>>>>>>> stores.engaged-store.changelog=kafka.engaged-store-changelog > >>>>>>>> # a custom data type with an appropriate Serde > >>>>>>>> stores.engaged-store.key.serde=UserAppPair > >>>>>>>> # wrote a Serde for Long using ByteBuffer > >>>>>>>> stores.engaged-store.msg.serde=Long > >>>>>>>> > >>>>>>>> I have no trouble initializing the storage engine with: > >>>>>>>> > >>>>>>>> val store = > >>>>>>>> context.getStore("engaged-store").asInstanceOf[ > >>>>>>>> KeyValueStore[UserAppPair, > >>>>>>>> Long]]; > >>>>>>>> > >>>>>>>> but when I query by the key when processing messages, it’s > throwing > >>>>>>>> an > >>>>>>>> exception: > >>>>>>>> > >>>>>>>> val key = new UserAppPair(userId, appId); > >>>>>>>> val value = store.get(key); > >>>>>>>> > >>>>>>>> Here’s the log: > >>>>>>>> > >>>>>>>> 2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy > for > >>>>>>>> localhost:9092 > >>>>>>>> 2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we > >>>>>>>> received > >>>>>>>> an > >>>>>>>> invalid or empty offset None for [Follows,0]. Attempting to use > >>>>>>>> Kafka's > >>>>>>>> auto.offset.reset setting. This can result in data loss if > >>>>>>>> processing > >>>>>>>> continues. > >>>>>>>> 2015-02-16 23:30:18 GetOffset [INFO] Checking if > >>>>>>>> auto.offset.reset > >>>>>>>> is > >>>>>>>> defined for topic Follows > >>>>>>>> 2015-02-16 23:30:18 GetOffset [INFO] Got reset of type > largest. > >>>>>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy > for > >>>>>>>> localhost:9092 > >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop. > >>>>>>>> 2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query > for > >>>>>>>> key > >>>>>>>> in > >>>>>>>> rocksdb. > >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception > in > >>>>>>>> process > >>>>>>>> loop. > >>>>>>>> org.rocksdb.RocksDBException: IO error: directory: Invalid > >>>>>>>> argument > >>>>>>>> at org.rocksdb.RocksDB.open(Native Method) > >>>>>>>> at org.rocksdb.RocksDB.open(RocksDB.java:133) > >>>>>>>> at > >>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( > >>>>>>>> RocksDbKeyValueStore.scala:85) > >>>>>>>> at > >>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db( > >>>>>>>> RocksDbKeyValueStore.scala:85) > >>>>>>>> at > >>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( > >>>>>>>> RocksDbKeyValueStore.scala:92) > >>>>>>>> at > >>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( > >>>>>>>> RocksDbKeyValueStore.scala:80) > >>>>>>>> at > >>>>>>>> > org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41) > >>>>>>>> at > >>>>>>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get( > >>>>>>>> SerializedKeyValueStore.scala:36) > >>>>>>>> at > >>>>>>>> > org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90) > >>>>>>>> at > >>>>>>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get( > >>>>>>>> NullSafeKeyValueStore.scala:36) > >>>>>>>> at > >>>>>>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get( > >>>>>>>> KeyValueStorageEngine.scala:44) > >>>>>>>> at > >>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged( > >>>>>>>> EngagedUsersTask.scala:66) > >>>>>>>> at > >>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.process( > >>>>>>>> EngagedUsersTask.scala:100) > >>>>>>>> at > >>>>>>>> org.apache.samza.container.TaskInstance$$anonfun$process$ > >>>>>>>> 1.apply$mcV$sp(TaskInstance.scala:137) > >>>>>>>> at > >>>>>>>> org.apache.samza.container.TaskInstanceExceptionHandler. > >>>>>>>> maybeHandle( > >>>>>>>> TaskInstanceExceptionHandler.scala:54) > >>>>>>>> at > >>>>>>>> org.apache.samza.container.TaskInstance.process( > >>>>>>>> TaskInstance.scala:136) > >>>>>>>> at > >>>>>>>> org.apache.samza.container.RunLoop$$anonfun$process$2. > >>>>>>>> apply(RunLoop.scala:93) > >>>>>>>> at > >>>>>>>> org.apache.samza.util.TimerUtils$class.updateTimer( > >>>>>>>> TimerUtils.scala:37) > >>>>>>>> at > org.apache.samza.container.RunLoop.updateTimer(RunLoop. > >>>>>>>> scala:36) > >>>>>>>> at org.apache.samza.container. > >>>>>>>> RunLoop.process(RunLoop.scala: > >>>>>>>> 79) > >>>>>>>> at > org.apache.samza.container.RunLoop.run(RunLoop.scala:65) > >>>>>>>> at > >>>>>>>> org.apache.samza.container.SamzaContainer.run( > >>>>>>>> SamzaContainer.scala:556) > >>>>>>>> at > >>>>>>>> org.apache.samza.container.SamzaContainer$.safeMain( > >>>>>>>> SamzaContainer.scala:108) > >>>>>>>> at > >>>>>>>> org.apache.samza.container.SamzaContainer$.main( > >>>>>>>> SamzaContainer.scala:87) > >>>>>>>> at > >>>>>>>> org.apache.samza.container.SamzaContainer.main( > >>>>>>>> SamzaContainer.scala) > >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down. > >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down > consumer > >>>>>>>> multiplexer. > >>>>>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down > BrokerProxy > >>>>>>>> for > >>>>>>>> localhost:9092 > >>>>>>>> 2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] > Reconnect > >>>>>>>> due > >>>>>>>> to > >>>>>>>> socket error: null > >>>>>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by > interrupt > >>>>>>>> exception in broker proxy thread. > >>>>>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to > >>>>>>>> interrupt. > >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down > producer > >>>>>>>> multiplexer. > >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task > >>>>>>>> instance > >>>>>>>> stream tasks. > >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task > >>>>>>>> instance > >>>>>>>> stores. > >>>>>>>> > >>>>>>>> > >>>>>>>> Same exception is thrown if I try to put a value in RocksDB. Has > >>>>>>>> anyone > >>>>>>>> run into this problem before or has any pointers into solving > it? > >>>>>>>> > >>>>>>>> Lukas > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >