Hey Lukas, Let me try and reproduce locally.
Cheers, Chris On Tue, Feb 17, 2015 at 2:15 PM, Lukas Steiblys <lu...@doubledutch.me> wrote: > There is a symlink in the user's directory to /vagrant, but as I said > before, the user has no problem reading or writing files. > > I added a line to the deploy script "rm -rf deploy/samza/state" just > before run-job.sh and I monitored the directory - it was deleted before > running the job. However, this did not solve the problem. > > I also tried deleting the folder as the first instruction in init() even > before acquiring a handle to the store, but then I got an error that > state/engagement-store/Partition 0 was not found. > > Lukas > > -----Original Message----- From: Chris Riccomini > Sent: Tuesday, February 17, 2015 1:47 PM > To: dev@samza.apache.org > Cc: Chris Riccomini > Subject: Re: RocksDBException: IO error: directory: Invalid argument > > Hey Lukas, > > Hmm. > > 1. I'm running it as another user, but in the user's home directory so it >> > has no problem writing or reading files. > > If you're running from the user's home directory, how are the data files > ending up in /vagrant? Samza uses: > > val storeBaseDir = new File(System.getProperty("user.dir"), "state") > > To set the directory for the data. > > Exception in thread "main" org.rocksdb.RocksDBException: Invalid >> > argument: /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0: > exists (error_if_exists is true) > > Awesome! So, this *is* the issue that I was referring to initially > (options.setErrorIfExists(true)). > Can you set your start script to fully delete the path before the job > starts? This will get fully restored when the changelog restoration > happens. > > Cheers, > Chris > > On Tue, Feb 17, 2015 at 1:38 PM, Lukas Steiblys <lu...@doubledutch.me> > wrote: > > Actually there's a symlink in the running user's home directory to >> /vagrant where the jobs are executed, but even then, it doesn't have any >> problems writing or reading the files. >> >> Lukas >> >> -----Original Message----- From: Lukas Steiblys >> Sent: Tuesday, February 17, 2015 1:37 PM >> >> To: dev@samza.apache.org >> Cc: Chris Riccomini >> Subject: Re: RocksDBException: IO error: directory: Invalid argument >> >> 1. I'm running it as another user, but in the user's home directory so it >> has no problem writing or reading files. >> 2. See below. >> 3. I'm running Windows on my machine so I don't think I'll be able to run >> it >> outside the VM. >> >> I switched to root user, did "chmod -R a+rwx /vagrant", deleted "deploy" >> folder, ran the job as root as well and it still failed. However, there >> was >> a slight change in the error message in stderr: >> >> Exception in thread "main" org.rocksdb.RocksDBException: Invalid argument: >> /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0: exists >> (error_if_exists is true) >> at org.rocksdb.RocksDB.open(Native Method) >> at org.rocksdb.RocksDB.open(RocksDB.java:133) >> at >> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( >> RocksDbKeyValueStore.scala:85) >> >> Even though the deploy folder was deleted before the job was run, it's >> failing on the check? >> >> Lukas >> >> -----Original Message----- From: Chris Riccomini >> Sent: Tuesday, February 17, 2015 1:02 PM >> To: dev@samza.apache.org >> Cc: Chris Riccomini >> Subject: Re: RocksDBException: IO error: directory: Invalid argument >> >> Hey Lucas, >> >> I'm wondering if this is a filesystem permission issue? This exception: >> >> org.rocksdb.RocksDBException: IO error: directory: Invalid argument >> >> Looks like it's coming from this line: >> >> >> https://github.com/facebook/rocksdb/blob/868bfa40336b99005beb9f4fc9cf2a >> cc0d330ae1/util/env_posix.cc#L1016 >> >> Which seems to be trying to fsync data to disk. According to: >> >> http://docs.vagrantup.com/v2/synced-folders/basic_usage.html >> >> It sounds like the sync folder is set to be owned by the default Vagrant >> SSH user. >> >> 1. Is this the user that you're running the Samza job as? >> 2. Could you check the file permissions for /vagrant and all of its >> subdirectories, and make sure that they match up with what you expect (+rw >> for the Samza job's user)? >> 3. If you try running the job outside of the VM, does it work? >> >> Cheers, >> Chris >> >> On Tue, Feb 17, 2015 at 12:57 PM, Lukas Steiblys <lu...@doubledutch.me> >> wrote: >> >> Yeah, I made sure the state is clean. This is the first time I'm trying >> to >> >>> use RocksDB. I haven't tried LevelDB yet though. >>> >>> Lukas >>> >>> -----Original Message----- From: Chris Riccomini >>> Sent: Tuesday, February 17, 2015 12:34 PM >>> To: dev@samza.apache.org >>> Cc: Chris Riccomini >>> >>> Subject: Re: RocksDBException: IO error: directory: Invalid argument >>> >>> Hey Lukas, >>> >>> Strange. Having a more detailed look at your logs. >>> >>> Note: /vagrant is a synced folder, and I think it *does* persist between >>> VM >>> restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the >>> state >>> should be empty. >>> >>> Cheers, >>> Chris >>> >>> On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys <lu...@doubledutch.me> >>> wrote: >>> >>> It starts out with a fresh FS. I deleted all the state, but the job >>> still >>> >>> fails on the first get. >>>> >>>> Lukas >>>> >>>> -----Original Message----- From: Chris Riccomini >>>> Sent: Tuesday, February 17, 2015 12:12 PM >>>> To: Chris Riccomini >>>> Cc: dev@samza.apache.org >>>> >>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument >>>> >>>> Hey Lukas, >>>> >>>> This happens every time even if I spin up a new VM. >>>> >>>> >>>> >>>>> Ah I might have misunderstood. Are your VMs started with a fresh FS? >>>>> >>>> You're >>>> not using EBS or anything like that, are you? >>>> >>>> I want to see if you're getting hit by that setErrorIfExists line. If >>>> you: >>>> >>>> 1. Stop your job. >>>> 2. Clear the state from the FS. >>>> 3. Start your job. >>>> >>>> Does it work? >>>> >>>> Cheers, >>>> Chris >>>> >>>> On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini < >>>> criccom...@apache.org >>>> > >>>> wrote: >>>> >>>> Hey Lukas, >>>> >>>> >>>> Could you try clearing out the state, and starting the job? >>>>> >>>>> Cheers, >>>>> Chris >>>>> >>>>> On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me >>>>> > >>>>> wrote: >>>>> >>>>> This happens every time even if I spin up a new VM. Happens after a >>>>> >>>>> restart as well. >>>>> >>>>>> >>>>>> Lukas >>>>>> >>>>>> -----Original Message----- From: Chris Riccomini >>>>>> Sent: Tuesday, February 17, 2015 11:01 AM >>>>>> To: dev@samza.apache.org >>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument >>>>>> >>>>>> Hey Lukas, >>>>>> >>>>>> Interesting. Does this happen only after restarting your job? Or does >>>>>> it >>>>>> happen the first time, as well? I'm wondering if this is the problem: >>>>>> >>>>>> options.setErrorIfExists(true) >>>>>> >>>>>> In RocksDbKeyValueStore.scala. I think this is set under the >>>>>> assumption >>>>>> that the job is run in YARN. If you run locally, it seems to me that >>>>>> the >>>>>> directory would continue to exist after a job is restarted. If you >>>>>> delete >>>>>> your state directory, and restart your job, does the problem >>>>>> temporarily >>>>>> go >>>>>> away until a subsequent restart happens? >>>>>> >>>>>> Cheers, >>>>>> Chris >>>>>> >>>>>> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys < >>>>>> lu...@doubledutch.me >>>>>> > >>>>>> wrote: >>>>>> >>>>>> Hi Chris, >>>>>> >>>>>> >>>>>> 1. We're running locally using ProcessJobFactory >>>>>> >>>>>>> 2. CentOS 7 x86_64 >>>>>>> 3. >>>>>>> startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db >>>>>>> engaged-users.log: https://gist.github.com/ >>>>>>> imbusy/0b3d264a40ddf34ab8e7 >>>>>>> engaged-users.properties: https://gist.github.com/ >>>>>>> imbusy/d0019db29d7b68c60bfc >>>>>>> >>>>>>> Also note that the properties file sets the default offset to >>>>>>> oldest, >>>>>>> but the log file says that it's setting the offset to largest: >>>>>>> "2015-02-17 >>>>>>> 18:46:32 GetOffset [INFO] Got reset of type largest." >>>>>>> >>>>>>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got >>>>>>> storage engine base directory: /vagrant/SamzaJobs/deploy/ >>>>>>> samza/state" >>>>>>> I checked the directory and it actually exists: >>>>>>> >>>>>>> du -h /vagrant/SamzaJobs/deploy/samza/state >>>>>>> >>>>>>> 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition >>>>>>> 0 >>>>>>> 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1 >>>>>>> 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2 >>>>>>> 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition >>>>>>> 3 >>>>>>> 36K /vagrant/SamzaJobs/deploy/samza/state/engaged-store >>>>>>> 36K /vagrant/SamzaJobs/deploy/samza/state >>>>>>> >>>>>>> Lukas >>>>>>> >>>>>>> -----Original Message----- From: Chris Riccomini >>>>>>> Sent: Monday, February 16, 2015 5:53 PM >>>>>>> To: dev@samza.apache.org >>>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument >>>>>>> >>>>>>> >>>>>>> Hey Lukas, >>>>>>> >>>>>>> It looks like the exception is actually thrown on get, not put: >>>>>>> >>>>>>> at org.apache.samza.storage.kv.KeyValueStorageEngine.get( >>>>>>> KeyValueStorageEngine.scala:44) >>>>>>> >>>>>>> 1. Are you running your job under YARN, or as a local job >>>>>>> (ThreadJobFactory/ProcessJobFactory)? >>>>>>> 2. What OS are you running on? >>>>>>> 3. Could post a fully copy of your logs somewhere (github gist, >>>>>>> pasteboard, >>>>>>> or something)? >>>>>>> 4. Also, what does this line say in your logs: >>>>>>> >>>>>>> info("Got storage engine base directory: %s" format storeBaseDir) >>>>>>> >>>>>>> It sounds like something is getting messed up with the directory >>>>>>> where >>>>>>> the >>>>>>> RocksDB store is trying to keep its data. >>>>>>> >>>>>>> Cheers, >>>>>>> Chris >>>>>>> >>>>>>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys < >>>>>>> lu...@doubledutch.me >>>>>>> > >>>>>>> wrote: >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> >>>>>>> I was setting up the key-value storage engine in Samza and ran into >>>>>>> an >>>>>>> >>>>>>> exception when querying the data. >>>>>>>> >>>>>>>> I added these properties to the config: >>>>>>>> >>>>>>>> >>>>>>>> stores.engaged-store.factory=org.apache.samza.storage.kv. >>>>>>>> RocksDbKeyValueStorageEngineFactory >>>>>>>> stores.engaged-store.changelog=kafka.engaged-store-changelog >>>>>>>> # a custom data type with an appropriate Serde >>>>>>>> stores.engaged-store.key.serde=UserAppPair >>>>>>>> # wrote a Serde for Long using ByteBuffer >>>>>>>> stores.engaged-store.msg.serde=Long >>>>>>>> >>>>>>>> I have no trouble initializing the storage engine with: >>>>>>>> >>>>>>>> val store = >>>>>>>> context.getStore("engaged-store").asInstanceOf[ >>>>>>>> KeyValueStore[UserAppPair, >>>>>>>> Long]]; >>>>>>>> >>>>>>>> but when I query by the key when processing messages, it’s throwing >>>>>>>> an >>>>>>>> exception: >>>>>>>> >>>>>>>> val key = new UserAppPair(userId, appId); >>>>>>>> val value = store.get(key); >>>>>>>> >>>>>>>> Here’s the log: >>>>>>>> >>>>>>>> 2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for >>>>>>>> localhost:9092 >>>>>>>> 2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we >>>>>>>> received >>>>>>>> an >>>>>>>> invalid or empty offset None for [Follows,0]. Attempting to use >>>>>>>> Kafka's >>>>>>>> auto.offset.reset setting. This can result in data loss if >>>>>>>> processing >>>>>>>> continues. >>>>>>>> 2015-02-16 23:30:18 GetOffset [INFO] Checking if >>>>>>>> auto.offset.reset >>>>>>>> is >>>>>>>> defined for topic Follows >>>>>>>> 2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest. >>>>>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for >>>>>>>> localhost:9092 >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop. >>>>>>>> 2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for >>>>>>>> key >>>>>>>> in >>>>>>>> rocksdb. >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in >>>>>>>> process >>>>>>>> loop. >>>>>>>> org.rocksdb.RocksDBException: IO error: directory: Invalid >>>>>>>> argument >>>>>>>> at org.rocksdb.RocksDB.open(Native Method) >>>>>>>> at org.rocksdb.RocksDB.open(RocksDB.java:133) >>>>>>>> at >>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( >>>>>>>> RocksDbKeyValueStore.scala:85) >>>>>>>> at >>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db( >>>>>>>> RocksDbKeyValueStore.scala:85) >>>>>>>> at >>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >>>>>>>> RocksDbKeyValueStore.scala:92) >>>>>>>> at >>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >>>>>>>> RocksDbKeyValueStore.scala:80) >>>>>>>> at >>>>>>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41) >>>>>>>> at >>>>>>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get( >>>>>>>> SerializedKeyValueStore.scala:36) >>>>>>>> at >>>>>>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90) >>>>>>>> at >>>>>>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get( >>>>>>>> NullSafeKeyValueStore.scala:36) >>>>>>>> at >>>>>>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get( >>>>>>>> KeyValueStorageEngine.scala:44) >>>>>>>> at >>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged( >>>>>>>> EngagedUsersTask.scala:66) >>>>>>>> at >>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.process( >>>>>>>> EngagedUsersTask.scala:100) >>>>>>>> at >>>>>>>> org.apache.samza.container.TaskInstance$$anonfun$process$ >>>>>>>> 1.apply$mcV$sp(TaskInstance.scala:137) >>>>>>>> at >>>>>>>> org.apache.samza.container.TaskInstanceExceptionHandler. >>>>>>>> maybeHandle( >>>>>>>> TaskInstanceExceptionHandler.scala:54) >>>>>>>> at >>>>>>>> org.apache.samza.container.TaskInstance.process( >>>>>>>> TaskInstance.scala:136) >>>>>>>> at >>>>>>>> org.apache.samza.container.RunLoop$$anonfun$process$2. >>>>>>>> apply(RunLoop.scala:93) >>>>>>>> at >>>>>>>> org.apache.samza.util.TimerUtils$class.updateTimer( >>>>>>>> TimerUtils.scala:37) >>>>>>>> at org.apache.samza.container.RunLoop.updateTimer(RunLoop. >>>>>>>> scala:36) >>>>>>>> at org.apache.samza.container. >>>>>>>> RunLoop.process(RunLoop.scala: >>>>>>>> 79) >>>>>>>> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) >>>>>>>> at >>>>>>>> org.apache.samza.container.SamzaContainer.run( >>>>>>>> SamzaContainer.scala:556) >>>>>>>> at >>>>>>>> org.apache.samza.container.SamzaContainer$.safeMain( >>>>>>>> SamzaContainer.scala:108) >>>>>>>> at >>>>>>>> org.apache.samza.container.SamzaContainer$.main( >>>>>>>> SamzaContainer.scala:87) >>>>>>>> at >>>>>>>> org.apache.samza.container.SamzaContainer.main( >>>>>>>> SamzaContainer.scala) >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down. >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer >>>>>>>> multiplexer. >>>>>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy >>>>>>>> for >>>>>>>> localhost:9092 >>>>>>>> 2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect >>>>>>>> due >>>>>>>> to >>>>>>>> socket error: null >>>>>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt >>>>>>>> exception in broker proxy thread. >>>>>>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to >>>>>>>> interrupt. >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer >>>>>>>> multiplexer. >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task >>>>>>>> instance >>>>>>>> stream tasks. >>>>>>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task >>>>>>>> instance >>>>>>>> stores. >>>>>>>> >>>>>>>> >>>>>>>> Same exception is thrown if I try to put a value in RocksDB. Has >>>>>>>> anyone >>>>>>>> run into this problem before or has any pointers into solving it? >>>>>>>> >>>>>>>> Lukas >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >