Hey Lukas,

Let me try and reproduce locally.

Cheers,
Chris

On Tue, Feb 17, 2015 at 2:15 PM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

> There is a symlink in the user's directory to /vagrant, but as I said
> before, the user has no problem reading or writing files.
>
> I added a line to the deploy script "rm -rf deploy/samza/state" just
> before run-job.sh and I monitored the directory - it was deleted before
> running the job. However, this did not solve the problem.
>
> I also tried deleting the folder as the first instruction in init() even
> before acquiring a handle to the store, but then I got an error that
> state/engagement-store/Partition 0 was not found.
>
> Lukas
>
> -----Original Message----- From: Chris Riccomini
> Sent: Tuesday, February 17, 2015 1:47 PM
> To: dev@samza.apache.org
> Cc: Chris Riccomini
> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>
> Hey Lukas,
>
> Hmm.
>
>  1. I'm running it as another user, but in the user's home directory so it
>>
> has no problem writing or reading files.
>
> If you're running from the user's home directory, how are the data files
> ending up in /vagrant? Samza uses:
>
>  val storeBaseDir = new File(System.getProperty("user.dir"), "state")
>
> To set the directory for the data.
>
>  Exception in thread "main" org.rocksdb.RocksDBException: Invalid
>>
> argument: /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0:
> exists (error_if_exists is true)
>
> Awesome! So, this *is* the issue that I was referring to initially
> (options.setErrorIfExists(true)).
> Can you set your start script to fully delete the path before the job
> starts? This will get fully restored when the changelog restoration
> happens.
>
> Cheers,
> Chris
>
> On Tue, Feb 17, 2015 at 1:38 PM, Lukas Steiblys <lu...@doubledutch.me>
> wrote:
>
>  Actually there's a symlink in the running user's home directory to
>> /vagrant where the jobs are executed, but even then, it doesn't have any
>> problems writing or reading the files.
>>
>> Lukas
>>
>> -----Original Message----- From: Lukas Steiblys
>> Sent: Tuesday, February 17, 2015 1:37 PM
>>
>> To: dev@samza.apache.org
>> Cc: Chris Riccomini
>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>
>> 1. I'm running it as another user, but in the user's home directory so it
>> has no problem writing or reading files.
>> 2. See below.
>> 3. I'm running Windows on my machine so I don't think I'll be able to run
>> it
>> outside the VM.
>>
>> I switched to root user, did "chmod -R a+rwx /vagrant", deleted "deploy"
>> folder, ran the job as root as well and it still failed. However, there
>> was
>> a slight change in the error message in stderr:
>>
>> Exception in thread "main" org.rocksdb.RocksDBException: Invalid argument:
>> /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0: exists
>> (error_if_exists is true)
>>    at org.rocksdb.RocksDB.open(Native Method)
>>    at org.rocksdb.RocksDB.open(RocksDB.java:133)
>>    at
>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
>> RocksDbKeyValueStore.scala:85)
>>
>> Even though the deploy folder was deleted before the job was run, it's
>> failing on the check?
>>
>> Lukas
>>
>> -----Original Message----- From: Chris Riccomini
>> Sent: Tuesday, February 17, 2015 1:02 PM
>> To: dev@samza.apache.org
>> Cc: Chris Riccomini
>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>
>> Hey Lucas,
>>
>> I'm wondering if this is a filesystem permission issue? This exception:
>>
>>  org.rocksdb.RocksDBException: IO error: directory: Invalid argument
>>
>> Looks like it's coming from this line:
>>
>>
>> https://github.com/facebook/rocksdb/blob/868bfa40336b99005beb9f4fc9cf2a
>> cc0d330ae1/util/env_posix.cc#L1016
>>
>> Which seems to be trying to fsync data to disk. According to:
>>
>>  http://docs.vagrantup.com/v2/synced-folders/basic_usage.html
>>
>> It sounds like the sync folder is set to be owned by the default Vagrant
>> SSH user.
>>
>> 1. Is this the user that you're running the Samza job as?
>> 2. Could you check the file permissions for /vagrant and all of its
>> subdirectories, and make sure that they match up with what you expect (+rw
>> for the Samza job's user)?
>> 3. If you try running the job outside of the VM, does it work?
>>
>> Cheers,
>> Chris
>>
>> On Tue, Feb 17, 2015 at 12:57 PM, Lukas Steiblys <lu...@doubledutch.me>
>> wrote:
>>
>>  Yeah, I made sure the state is clean. This is the first time I'm trying
>> to
>>
>>> use RocksDB. I haven't tried LevelDB yet though.
>>>
>>> Lukas
>>>
>>> -----Original Message----- From: Chris Riccomini
>>> Sent: Tuesday, February 17, 2015 12:34 PM
>>> To: dev@samza.apache.org
>>> Cc: Chris Riccomini
>>>
>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>
>>> Hey Lukas,
>>>
>>> Strange. Having a more detailed look at your logs.
>>>
>>> Note: /vagrant is a synced folder, and I think it *does* persist between
>>> VM
>>> restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the
>>> state
>>> should be empty.
>>>
>>> Cheers,
>>> Chris
>>>
>>> On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys <lu...@doubledutch.me>
>>> wrote:
>>>
>>>  It starts out with a fresh FS. I deleted all the state, but the job
>>> still
>>>
>>>  fails on the first get.
>>>>
>>>> Lukas
>>>>
>>>> -----Original Message----- From: Chris Riccomini
>>>> Sent: Tuesday, February 17, 2015 12:12 PM
>>>> To: Chris Riccomini
>>>> Cc: dev@samza.apache.org
>>>>
>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>
>>>> Hey Lukas,
>>>>
>>>>  This happens every time even if I spin up a new VM.
>>>>
>>>>
>>>>
>>>>>  Ah I might have misunderstood. Are your VMs started with a fresh FS?
>>>>>
>>>> You're
>>>> not using EBS or anything like that, are you?
>>>>
>>>> I want to see if you're getting hit by that setErrorIfExists line. If
>>>> you:
>>>>
>>>> 1. Stop your job.
>>>> 2. Clear the state from the FS.
>>>> 3. Start your job.
>>>>
>>>> Does it work?
>>>>
>>>> Cheers,
>>>> Chris
>>>>
>>>> On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <
>>>> criccom...@apache.org
>>>> >
>>>> wrote:
>>>>
>>>>  Hey Lukas,
>>>>
>>>>
>>>>  Could you try clearing out the state, and starting the job?
>>>>>
>>>>> Cheers,
>>>>> Chris
>>>>>
>>>>> On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me
>>>>> >
>>>>> wrote:
>>>>>
>>>>>  This happens every time even if I spin up a new VM. Happens after a
>>>>>
>>>>>  restart as well.
>>>>>
>>>>>>
>>>>>> Lukas
>>>>>>
>>>>>> -----Original Message----- From: Chris Riccomini
>>>>>> Sent: Tuesday, February 17, 2015 11:01 AM
>>>>>> To: dev@samza.apache.org
>>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>>>
>>>>>> Hey Lukas,
>>>>>>
>>>>>> Interesting. Does this happen only after restarting your job? Or does
>>>>>> it
>>>>>> happen the first time, as well? I'm wondering if this is the problem:
>>>>>>
>>>>>>    options.setErrorIfExists(true)
>>>>>>
>>>>>> In RocksDbKeyValueStore.scala. I think this is set under the
>>>>>> assumption
>>>>>> that the job is run in YARN. If you run locally, it seems to me that
>>>>>> the
>>>>>> directory would continue to exist after a job is restarted. If you
>>>>>> delete
>>>>>> your state directory, and restart your job, does the problem
>>>>>> temporarily
>>>>>> go
>>>>>> away until a subsequent restart happens?
>>>>>>
>>>>>> Cheers,
>>>>>> Chris
>>>>>>
>>>>>> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <
>>>>>> lu...@doubledutch.me
>>>>>> >
>>>>>> wrote:
>>>>>>
>>>>>>  Hi Chris,
>>>>>>
>>>>>>
>>>>>>  1. We're running locally using ProcessJobFactory
>>>>>>
>>>>>>> 2. CentOS 7 x86_64
>>>>>>> 3.
>>>>>>>    startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
>>>>>>>    engaged-users.log: https://gist.github.com/
>>>>>>> imbusy/0b3d264a40ddf34ab8e7
>>>>>>>    engaged-users.properties: https://gist.github.com/
>>>>>>> imbusy/d0019db29d7b68c60bfc
>>>>>>>
>>>>>>>    Also note that the properties file sets the default offset to
>>>>>>> oldest,
>>>>>>> but the log file says that it's setting the offset to largest:
>>>>>>> "2015-02-17
>>>>>>> 18:46:32 GetOffset [INFO] Got reset of type largest."
>>>>>>>
>>>>>>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
>>>>>>> storage engine base directory: /vagrant/SamzaJobs/deploy/
>>>>>>> samza/state"
>>>>>>>    I checked the directory and it actually exists:
>>>>>>>
>>>>>>> du -h /vagrant/SamzaJobs/deploy/samza/state
>>>>>>>
>>>>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition
>>>>>>> 0
>>>>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
>>>>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
>>>>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition
>>>>>>> 3
>>>>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
>>>>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state
>>>>>>>
>>>>>>> Lukas
>>>>>>>
>>>>>>> -----Original Message----- From: Chris Riccomini
>>>>>>> Sent: Monday, February 16, 2015 5:53 PM
>>>>>>> To: dev@samza.apache.org
>>>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>>>>
>>>>>>>
>>>>>>> Hey Lukas,
>>>>>>>
>>>>>>> It looks like the exception is actually thrown on get, not put:
>>>>>>>
>>>>>>>          at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>>>>> KeyValueStorageEngine.scala:44)
>>>>>>>
>>>>>>> 1. Are you running your job under YARN, or as a local job
>>>>>>> (ThreadJobFactory/ProcessJobFactory)?
>>>>>>> 2. What OS are you running on?
>>>>>>> 3. Could post a fully copy of your logs somewhere (github gist,
>>>>>>> pasteboard,
>>>>>>> or something)?
>>>>>>> 4.  Also, what does this line say in your logs:
>>>>>>>
>>>>>>>    info("Got storage engine base directory: %s" format storeBaseDir)
>>>>>>>
>>>>>>> It sounds like something is getting messed up with the directory
>>>>>>> where
>>>>>>> the
>>>>>>> RocksDB store is trying to keep its data.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Chris
>>>>>>>
>>>>>>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <
>>>>>>> lu...@doubledutch.me
>>>>>>> >
>>>>>>> wrote:
>>>>>>>
>>>>>>>  Hello,
>>>>>>>
>>>>>>>
>>>>>>>  I was setting up the key-value storage engine in Samza and ran into
>>>>>>> an
>>>>>>>
>>>>>>>  exception when querying the data.
>>>>>>>>
>>>>>>>> I added these properties to the config:
>>>>>>>>
>>>>>>>>
>>>>>>>> stores.engaged-store.factory=org.apache.samza.storage.kv.
>>>>>>>> RocksDbKeyValueStorageEngineFactory
>>>>>>>>     stores.engaged-store.changelog=kafka.engaged-store-changelog
>>>>>>>>     # a custom data type with an appropriate Serde
>>>>>>>>     stores.engaged-store.key.serde=UserAppPair
>>>>>>>>     # wrote a Serde for Long using ByteBuffer
>>>>>>>>     stores.engaged-store.msg.serde=Long
>>>>>>>>
>>>>>>>> I have no trouble initializing the storage engine with:
>>>>>>>>
>>>>>>>>     val store =
>>>>>>>> context.getStore("engaged-store").asInstanceOf[
>>>>>>>> KeyValueStore[UserAppPair,
>>>>>>>> Long]];
>>>>>>>>
>>>>>>>> but when I query by the key when processing messages, it’s throwing
>>>>>>>> an
>>>>>>>> exception:
>>>>>>>>
>>>>>>>>     val key = new UserAppPair(userId, appId);
>>>>>>>>     val value = store.get(key);
>>>>>>>>
>>>>>>>> Here’s the log:
>>>>>>>>
>>>>>>>>     2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
>>>>>>>> localhost:9092
>>>>>>>>     2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we
>>>>>>>> received
>>>>>>>> an
>>>>>>>> invalid or empty offset None for [Follows,0]. Attempting to use
>>>>>>>> Kafka's
>>>>>>>> auto.offset.reset setting. This can result in data loss if
>>>>>>>> processing
>>>>>>>> continues.
>>>>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Checking if
>>>>>>>> auto.offset.reset
>>>>>>>> is
>>>>>>>> defined for topic Follows
>>>>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
>>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
>>>>>>>> localhost:9092
>>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
>>>>>>>>     2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for
>>>>>>>> key
>>>>>>>> in
>>>>>>>> rocksdb.
>>>>>>>>     2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in
>>>>>>>> process
>>>>>>>> loop.
>>>>>>>>     org.rocksdb.RocksDBException: IO error: directory: Invalid
>>>>>>>> argument
>>>>>>>>         at org.rocksdb.RocksDB.open(Native Method)
>>>>>>>>         at org.rocksdb.RocksDB.open(RocksDB.java:133)
>>>>>>>>         at
>>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
>>>>>>>> RocksDbKeyValueStore.scala:85)
>>>>>>>>         at
>>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
>>>>>>>> RocksDbKeyValueStore.scala:85)
>>>>>>>>         at
>>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>>>>> RocksDbKeyValueStore.scala:92)
>>>>>>>>         at
>>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>>>>> RocksDbKeyValueStore.scala:80)
>>>>>>>>         at
>>>>>>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
>>>>>>>>         at
>>>>>>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get(
>>>>>>>> SerializedKeyValueStore.scala:36)
>>>>>>>>         at
>>>>>>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
>>>>>>>>         at
>>>>>>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
>>>>>>>> NullSafeKeyValueStore.scala:36)
>>>>>>>>         at
>>>>>>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>>>>>> KeyValueStorageEngine.scala:44)
>>>>>>>>         at
>>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged(
>>>>>>>> EngagedUsersTask.scala:66)
>>>>>>>>         at
>>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.process(
>>>>>>>> EngagedUsersTask.scala:100)
>>>>>>>>         at
>>>>>>>> org.apache.samza.container.TaskInstance$$anonfun$process$
>>>>>>>> 1.apply$mcV$sp(TaskInstance.scala:137)
>>>>>>>>         at
>>>>>>>> org.apache.samza.container.TaskInstanceExceptionHandler.
>>>>>>>> maybeHandle(
>>>>>>>> TaskInstanceExceptionHandler.scala:54)
>>>>>>>>         at
>>>>>>>> org.apache.samza.container.TaskInstance.process(
>>>>>>>> TaskInstance.scala:136)
>>>>>>>>         at
>>>>>>>> org.apache.samza.container.RunLoop$$anonfun$process$2.
>>>>>>>> apply(RunLoop.scala:93)
>>>>>>>>         at
>>>>>>>> org.apache.samza.util.TimerUtils$class.updateTimer(
>>>>>>>> TimerUtils.scala:37)
>>>>>>>>         at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
>>>>>>>> scala:36)
>>>>>>>>         at org.apache.samza.container.
>>>>>>>> RunLoop.process(RunLoop.scala:
>>>>>>>> 79)
>>>>>>>>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>>>>>>>         at
>>>>>>>> org.apache.samza.container.SamzaContainer.run(
>>>>>>>> SamzaContainer.scala:556)
>>>>>>>>         at
>>>>>>>> org.apache.samza.container.SamzaContainer$.safeMain(
>>>>>>>> SamzaContainer.scala:108)
>>>>>>>>         at
>>>>>>>> org.apache.samza.container.SamzaContainer$.main(
>>>>>>>> SamzaContainer.scala:87)
>>>>>>>>         at
>>>>>>>> org.apache.samza.container.SamzaContainer.main(
>>>>>>>> SamzaContainer.scala)
>>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
>>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
>>>>>>>> multiplexer.
>>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy
>>>>>>>> for
>>>>>>>> localhost:9092
>>>>>>>>     2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect
>>>>>>>> due
>>>>>>>> to
>>>>>>>> socket error: null
>>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
>>>>>>>> exception in broker proxy thread.
>>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
>>>>>>>> interrupt.
>>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
>>>>>>>> multiplexer.
>>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>>>>> instance
>>>>>>>> stream tasks.
>>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>>>>> instance
>>>>>>>> stores.
>>>>>>>>
>>>>>>>>
>>>>>>>> Same exception is thrown if I try to put a value in RocksDB. Has
>>>>>>>> anyone
>>>>>>>> run into this problem before or has any pointers into solving it?
>>>>>>>>
>>>>>>>> Lukas
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to