I could try and built a test job if you can’t reproduce it locally. Lukas
From: Chris Riccomini Sent: Tuesday, February 17, 2015 2:19 PM To: Lukas Steiblys Cc: dev@samza.apache.org ; Chris Riccomini Subject: Re: RocksDBException: IO error: directory: Invalid argument Hey Lukas, Let me try and reproduce locally. Cheers, Chris On Tue, Feb 17, 2015 at 2:15 PM, Lukas Steiblys <lu...@doubledutch.me> wrote: There is a symlink in the user's directory to /vagrant, but as I said before, the user has no problem reading or writing files. I added a line to the deploy script "rm -rf deploy/samza/state" just before run-job.sh and I monitored the directory - it was deleted before running the job. However, this did not solve the problem. I also tried deleting the folder as the first instruction in init() even before acquiring a handle to the store, but then I got an error that state/engagement-store/Partition 0 was not found. Lukas -----Original Message----- From: Chris Riccomini Sent: Tuesday, February 17, 2015 1:47 PM To: dev@samza.apache.org Cc: Chris Riccomini Subject: Re: RocksDBException: IO error: directory: Invalid argument Hey Lukas, Hmm. 1. I'm running it as another user, but in the user's home directory so it has no problem writing or reading files. If you're running from the user's home directory, how are the data files ending up in /vagrant? Samza uses: val storeBaseDir = new File(System.getProperty("user.dir"), "state") To set the directory for the data. Exception in thread "main" org.rocksdb.RocksDBException: Invalid argument: /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0: exists (error_if_exists is true) Awesome! So, this *is* the issue that I was referring to initially (options.setErrorIfExists(true)). Can you set your start script to fully delete the path before the job starts? This will get fully restored when the changelog restoration happens. Cheers, Chris On Tue, Feb 17, 2015 at 1:38 PM, Lukas Steiblys <lu...@doubledutch.me> wrote: Actually there's a symlink in the running user's home directory to /vagrant where the jobs are executed, but even then, it doesn't have any problems writing or reading the files. Lukas -----Original Message----- From: Lukas Steiblys Sent: Tuesday, February 17, 2015 1:37 PM To: dev@samza.apache.org Cc: Chris Riccomini Subject: Re: RocksDBException: IO error: directory: Invalid argument 1. I'm running it as another user, but in the user's home directory so it has no problem writing or reading files. 2. See below. 3. I'm running Windows on my machine so I don't think I'll be able to run it outside the VM. I switched to root user, did "chmod -R a+rwx /vagrant", deleted "deploy" folder, ran the job as root as well and it still failed. However, there was a slight change in the error message in stderr: Exception in thread "main" org.rocksdb.RocksDBException: Invalid argument: /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0: exists (error_if_exists is true) at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:133) at org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( RocksDbKeyValueStore.scala:85) Even though the deploy folder was deleted before the job was run, it's failing on the check? Lukas -----Original Message----- From: Chris Riccomini Sent: Tuesday, February 17, 2015 1:02 PM To: dev@samza.apache.org Cc: Chris Riccomini Subject: Re: RocksDBException: IO error: directory: Invalid argument Hey Lucas, I'm wondering if this is a filesystem permission issue? This exception: org.rocksdb.RocksDBException: IO error: directory: Invalid argument Looks like it's coming from this line: https://github.com/facebook/rocksdb/blob/868bfa40336b99005beb9f4fc9cf2a cc0d330ae1/util/env_posix.cc#L1016 Which seems to be trying to fsync data to disk. According to: http://docs.vagrantup.com/v2/synced-folders/basic_usage.html It sounds like the sync folder is set to be owned by the default Vagrant SSH user. 1. Is this the user that you're running the Samza job as? 2. Could you check the file permissions for /vagrant and all of its subdirectories, and make sure that they match up with what you expect (+rw for the Samza job's user)? 3. If you try running the job outside of the VM, does it work? Cheers, Chris On Tue, Feb 17, 2015 at 12:57 PM, Lukas Steiblys <lu...@doubledutch.me> wrote: Yeah, I made sure the state is clean. This is the first time I'm trying to use RocksDB. I haven't tried LevelDB yet though. Lukas -----Original Message----- From: Chris Riccomini Sent: Tuesday, February 17, 2015 12:34 PM To: dev@samza.apache.org Cc: Chris Riccomini Subject: Re: RocksDBException: IO error: directory: Invalid argument Hey Lukas, Strange. Having a more detailed look at your logs. Note: /vagrant is a synced folder, and I think it *does* persist between VM restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the state should be empty. Cheers, Chris On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys <lu...@doubledutch.me> wrote: It starts out with a fresh FS. I deleted all the state, but the job still fails on the first get. Lukas -----Original Message----- From: Chris Riccomini Sent: Tuesday, February 17, 2015 12:12 PM To: Chris Riccomini Cc: dev@samza.apache.org Subject: Re: RocksDBException: IO error: directory: Invalid argument Hey Lukas, This happens every time even if I spin up a new VM. Ah I might have misunderstood. Are your VMs started with a fresh FS? You're not using EBS or anything like that, are you? I want to see if you're getting hit by that setErrorIfExists line. If you: 1. Stop your job. 2. Clear the state from the FS. 3. Start your job. Does it work? Cheers, Chris On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccom...@apache.org > wrote: Hey Lukas, Could you try clearing out the state, and starting the job? Cheers, Chris On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me> wrote: This happens every time even if I spin up a new VM. Happens after a restart as well. Lukas -----Original Message----- From: Chris Riccomini Sent: Tuesday, February 17, 2015 11:01 AM To: dev@samza.apache.org Subject: Re: RocksDBException: IO error: directory: Invalid argument Hey Lukas, Interesting. Does this happen only after restarting your job? Or does it happen the first time, as well? I'm wondering if this is the problem: options.setErrorIfExists(true) In RocksDbKeyValueStore.scala. I think this is set under the assumption that the job is run in YARN. If you run locally, it seems to me that the directory would continue to exist after a job is restarted. If you delete your state directory, and restart your job, does the problem temporarily go away until a subsequent restart happens? Cheers, Chris On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me > wrote: Hi Chris, 1. We're running locally using ProcessJobFactory 2. CentOS 7 x86_64 3. startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db engaged-users.log: https://gist.github.com/ imbusy/0b3d264a40ddf34ab8e7 engaged-users.properties: https://gist.github.com/ imbusy/d0019db29d7b68c60bfc Also note that the properties file sets the default offset to oldest, but the log file says that it's setting the offset to largest: "2015-02-17 18:46:32 GetOffset [INFO] Got reset of type largest." 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state" I checked the directory and it actually exists: du -h /vagrant/SamzaJobs/deploy/samza/state 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3 36K /vagrant/SamzaJobs/deploy/samza/state/engaged-store 36K /vagrant/SamzaJobs/deploy/samza/state Lukas -----Original Message----- From: Chris Riccomini Sent: Monday, February 16, 2015 5:53 PM To: dev@samza.apache.org Subject: Re: RocksDBException: IO error: directory: Invalid argument Hey Lukas, It looks like the exception is actually thrown on get, not put: at org.apache.samza.storage.kv.KeyValueStorageEngine.get( KeyValueStorageEngine.scala:44) 1. Are you running your job under YARN, or as a local job (ThreadJobFactory/ProcessJobFactory)? 2. What OS are you running on? 3. Could post a fully copy of your logs somewhere (github gist, pasteboard, or something)? 4. Also, what does this line say in your logs: info("Got storage engine base directory: %s" format storeBaseDir) It sounds like something is getting messed up with the directory where the RocksDB store is trying to keep its data. Cheers, Chris On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me > wrote: Hello, I was setting up the key-value storage engine in Samza and ran into an exception when querying the data. I added these properties to the config: stores.engaged-store.factory=org.apache.samza.storage.kv. RocksDbKeyValueStorageEngineFactory stores.engaged-store.changelog=kafka.engaged-store-changelog # a custom data type with an appropriate Serde stores.engaged-store.key.serde=UserAppPair # wrote a Serde for Long using ByteBuffer stores.engaged-store.msg.serde=Long I have no trouble initializing the storage engine with: val store = context.getStore("engaged-store").asInstanceOf[ KeyValueStore[UserAppPair, Long]]; but when I query by the key when processing messages, it’s throwing an exception: val key = new UserAppPair(userId, appId); val value = store.get(key); Here’s the log: 2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for localhost:9092 2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an invalid or empty offset None for [Follows,0]. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues. 2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is defined for topic Follows 2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest. 2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for localhost:9092 2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop. 2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in rocksdb. 2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process loop. org.rocksdb.RocksDBException: IO error: directory: Invalid argument at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:133) at org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( RocksDbKeyValueStore.scala:85) at org.apache.samza.storage.kv.RocksDbKeyValueStore.db( RocksDbKeyValueStore.scala:85) at org.apache.samza.storage.kv.RocksDbKeyValueStore.get( RocksDbKeyValueStore.scala:92) at org.apache.samza.storage.kv.RocksDbKeyValueStore.get( RocksDbKeyValueStore.scala:80) at org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41) at org.apache.samza.storage.kv.SerializedKeyValueStore.get( SerializedKeyValueStore.scala:36) at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90) at org.apache.samza.storage.kv.NullSafeKeyValueStore.get( NullSafeKeyValueStore.scala:36) at org.apache.samza.storage.kv.KeyValueStorageEngine.get( KeyValueStorageEngine.scala:44) at me.doubledutch.analytics.task.EngagedUsersTask.engaged( EngagedUsersTask.scala:66) at me.doubledutch.analytics.task.EngagedUsersTask.process( EngagedUsersTask.scala:100) at org.apache.samza.container.TaskInstance$$anonfun$process$ 1.apply$mcV$sp(TaskInstance.scala:137) at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle( TaskInstanceExceptionHandler.scala:54) at org.apache.samza.container.TaskInstance.process( TaskInstance.scala:136) at org.apache.samza.container.RunLoop$$anonfun$process$2. apply(RunLoop.scala:93) at org.apache.samza.util.TimerUtils$class.updateTimer( TimerUtils.scala:37) at org.apache.samza.container.RunLoop.updateTimer(RunLoop. scala:36) at org.apache.samza.container.RunLoop.process(RunLoop.scala: 79) at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) at org.apache.samza.container.SamzaContainer.run( SamzaContainer.scala:556) at org.apache.samza.container.SamzaContainer$.safeMain( SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main( SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down. 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer multiplexer. 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for localhost:9092 2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to socket error: null 2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt exception in broker proxy thread. 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt. 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer multiplexer. 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance stream tasks. 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance stores. Same exception is thrown if I try to put a value in RocksDB. Has anyone run into this problem before or has any pointers into solving it? Lukas