Hey Lukas,

Hmm.

> 1. I'm running it as another user, but in the user's home directory so it
has no problem writing or reading files.

If you're running from the user's home directory, how are the data files
ending up in /vagrant? Samza uses:

  val storeBaseDir = new File(System.getProperty("user.dir"), "state")

To set the directory for the data.

> Exception in thread "main" org.rocksdb.RocksDBException: Invalid
argument: /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0:
exists (error_if_exists is true)

Awesome! So, this *is* the issue that I was referring to initially
(options.setErrorIfExists(true)).
Can you set your start script to fully delete the path before the job
starts? This will get fully restored when the changelog restoration happens.

Cheers,
Chris

On Tue, Feb 17, 2015 at 1:38 PM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

> Actually there's a symlink in the running user's home directory to
> /vagrant where the jobs are executed, but even then, it doesn't have any
> problems writing or reading the files.
>
> Lukas
>
> -----Original Message----- From: Lukas Steiblys
> Sent: Tuesday, February 17, 2015 1:37 PM
>
> To: dev@samza.apache.org
> Cc: Chris Riccomini
> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>
> 1. I'm running it as another user, but in the user's home directory so it
> has no problem writing or reading files.
> 2. See below.
> 3. I'm running Windows on my machine so I don't think I'll be able to run
> it
> outside the VM.
>
> I switched to root user, did "chmod -R a+rwx /vagrant", deleted "deploy"
> folder, ran the job as root as well and it still failed. However, there was
> a slight change in the error message in stderr:
>
> Exception in thread "main" org.rocksdb.RocksDBException: Invalid argument:
> /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0: exists
> (error_if_exists is true)
>    at org.rocksdb.RocksDB.open(Native Method)
>    at org.rocksdb.RocksDB.open(RocksDB.java:133)
>    at
> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
> RocksDbKeyValueStore.scala:85)
>
> Even though the deploy folder was deleted before the job was run, it's
> failing on the check?
>
> Lukas
>
> -----Original Message----- From: Chris Riccomini
> Sent: Tuesday, February 17, 2015 1:02 PM
> To: dev@samza.apache.org
> Cc: Chris Riccomini
> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>
> Hey Lucas,
>
> I'm wondering if this is a filesystem permission issue? This exception:
>
>  org.rocksdb.RocksDBException: IO error: directory: Invalid argument
>
> Looks like it's coming from this line:
>
>
> https://github.com/facebook/rocksdb/blob/868bfa40336b99005beb9f4fc9cf2a
> cc0d330ae1/util/env_posix.cc#L1016
>
> Which seems to be trying to fsync data to disk. According to:
>
>  http://docs.vagrantup.com/v2/synced-folders/basic_usage.html
>
> It sounds like the sync folder is set to be owned by the default Vagrant
> SSH user.
>
> 1. Is this the user that you're running the Samza job as?
> 2. Could you check the file permissions for /vagrant and all of its
> subdirectories, and make sure that they match up with what you expect (+rw
> for the Samza job's user)?
> 3. If you try running the job outside of the VM, does it work?
>
> Cheers,
> Chris
>
> On Tue, Feb 17, 2015 at 12:57 PM, Lukas Steiblys <lu...@doubledutch.me>
> wrote:
>
>  Yeah, I made sure the state is clean. This is the first time I'm trying to
>> use RocksDB. I haven't tried LevelDB yet though.
>>
>> Lukas
>>
>> -----Original Message----- From: Chris Riccomini
>> Sent: Tuesday, February 17, 2015 12:34 PM
>> To: dev@samza.apache.org
>> Cc: Chris Riccomini
>>
>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>
>> Hey Lukas,
>>
>> Strange. Having a more detailed look at your logs.
>>
>> Note: /vagrant is a synced folder, and I think it *does* persist between
>> VM
>> restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the state
>> should be empty.
>>
>> Cheers,
>> Chris
>>
>> On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys <lu...@doubledutch.me>
>> wrote:
>>
>>  It starts out with a fresh FS. I deleted all the state, but the job still
>>
>>> fails on the first get.
>>>
>>> Lukas
>>>
>>> -----Original Message----- From: Chris Riccomini
>>> Sent: Tuesday, February 17, 2015 12:12 PM
>>> To: Chris Riccomini
>>> Cc: dev@samza.apache.org
>>>
>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>
>>> Hey Lukas,
>>>
>>>  This happens every time even if I spin up a new VM.
>>>
>>>
>>>>
>>>>  Ah I might have misunderstood. Are your VMs started with a fresh FS?
>>> You're
>>> not using EBS or anything like that, are you?
>>>
>>> I want to see if you're getting hit by that setErrorIfExists line. If
>>> you:
>>>
>>> 1. Stop your job.
>>> 2. Clear the state from the FS.
>>> 3. Start your job.
>>>
>>> Does it work?
>>>
>>> Cheers,
>>> Chris
>>>
>>> On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccom...@apache.org
>>> >
>>> wrote:
>>>
>>>  Hey Lukas,
>>>
>>>
>>>> Could you try clearing out the state, and starting the job?
>>>>
>>>> Cheers,
>>>> Chris
>>>>
>>>> On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me>
>>>> wrote:
>>>>
>>>>  This happens every time even if I spin up a new VM. Happens after a
>>>>
>>>>  restart as well.
>>>>>
>>>>> Lukas
>>>>>
>>>>> -----Original Message----- From: Chris Riccomini
>>>>> Sent: Tuesday, February 17, 2015 11:01 AM
>>>>> To: dev@samza.apache.org
>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>>
>>>>> Hey Lukas,
>>>>>
>>>>> Interesting. Does this happen only after restarting your job? Or does
>>>>> it
>>>>> happen the first time, as well? I'm wondering if this is the problem:
>>>>>
>>>>>    options.setErrorIfExists(true)
>>>>>
>>>>> In RocksDbKeyValueStore.scala. I think this is set under the assumption
>>>>> that the job is run in YARN. If you run locally, it seems to me that
>>>>> the
>>>>> directory would continue to exist after a job is restarted. If you
>>>>> delete
>>>>> your state directory, and restart your job, does the problem
>>>>> temporarily
>>>>> go
>>>>> away until a subsequent restart happens?
>>>>>
>>>>> Cheers,
>>>>> Chris
>>>>>
>>>>> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me
>>>>> >
>>>>> wrote:
>>>>>
>>>>>  Hi Chris,
>>>>>
>>>>>
>>>>>  1. We're running locally using ProcessJobFactory
>>>>>> 2. CentOS 7 x86_64
>>>>>> 3.
>>>>>>    startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
>>>>>>    engaged-users.log: https://gist.github.com/
>>>>>> imbusy/0b3d264a40ddf34ab8e7
>>>>>>    engaged-users.properties: https://gist.github.com/
>>>>>> imbusy/d0019db29d7b68c60bfc
>>>>>>
>>>>>>    Also note that the properties file sets the default offset to
>>>>>> oldest,
>>>>>> but the log file says that it's setting the offset to largest:
>>>>>> "2015-02-17
>>>>>> 18:46:32 GetOffset [INFO] Got reset of type largest."
>>>>>>
>>>>>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
>>>>>> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
>>>>>>    I checked the directory and it actually exists:
>>>>>>
>>>>>> du -h /vagrant/SamzaJobs/deploy/samza/state
>>>>>>
>>>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition
>>>>>> 0
>>>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
>>>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
>>>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition
>>>>>> 3
>>>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
>>>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state
>>>>>>
>>>>>> Lukas
>>>>>>
>>>>>> -----Original Message----- From: Chris Riccomini
>>>>>> Sent: Monday, February 16, 2015 5:53 PM
>>>>>> To: dev@samza.apache.org
>>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>>>
>>>>>>
>>>>>> Hey Lukas,
>>>>>>
>>>>>> It looks like the exception is actually thrown on get, not put:
>>>>>>
>>>>>>          at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>>>> KeyValueStorageEngine.scala:44)
>>>>>>
>>>>>> 1. Are you running your job under YARN, or as a local job
>>>>>> (ThreadJobFactory/ProcessJobFactory)?
>>>>>> 2. What OS are you running on?
>>>>>> 3. Could post a fully copy of your logs somewhere (github gist,
>>>>>> pasteboard,
>>>>>> or something)?
>>>>>> 4.  Also, what does this line say in your logs:
>>>>>>
>>>>>>    info("Got storage engine base directory: %s" format storeBaseDir)
>>>>>>
>>>>>> It sounds like something is getting messed up with the directory where
>>>>>> the
>>>>>> RocksDB store is trying to keep its data.
>>>>>>
>>>>>> Cheers,
>>>>>> Chris
>>>>>>
>>>>>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me
>>>>>> >
>>>>>> wrote:
>>>>>>
>>>>>>  Hello,
>>>>>>
>>>>>>
>>>>>>  I was setting up the key-value storage engine in Samza and ran into
>>>>>> an
>>>>>>
>>>>>>> exception when querying the data.
>>>>>>>
>>>>>>> I added these properties to the config:
>>>>>>>
>>>>>>>
>>>>>>> stores.engaged-store.factory=org.apache.samza.storage.kv.
>>>>>>> RocksDbKeyValueStorageEngineFactory
>>>>>>>     stores.engaged-store.changelog=kafka.engaged-store-changelog
>>>>>>>     # a custom data type with an appropriate Serde
>>>>>>>     stores.engaged-store.key.serde=UserAppPair
>>>>>>>     # wrote a Serde for Long using ByteBuffer
>>>>>>>     stores.engaged-store.msg.serde=Long
>>>>>>>
>>>>>>> I have no trouble initializing the storage engine with:
>>>>>>>
>>>>>>>     val store =
>>>>>>> context.getStore("engaged-store").asInstanceOf[
>>>>>>> KeyValueStore[UserAppPair,
>>>>>>> Long]];
>>>>>>>
>>>>>>> but when I query by the key when processing messages, it’s throwing
>>>>>>> an
>>>>>>> exception:
>>>>>>>
>>>>>>>     val key = new UserAppPair(userId, appId);
>>>>>>>     val value = store.get(key);
>>>>>>>
>>>>>>> Here’s the log:
>>>>>>>
>>>>>>>     2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
>>>>>>> localhost:9092
>>>>>>>     2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we
>>>>>>> received
>>>>>>> an
>>>>>>> invalid or empty offset None for [Follows,0]. Attempting to use
>>>>>>> Kafka's
>>>>>>> auto.offset.reset setting. This can result in data loss if processing
>>>>>>> continues.
>>>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Checking if
>>>>>>> auto.offset.reset
>>>>>>> is
>>>>>>> defined for topic Follows
>>>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
>>>>>>> localhost:9092
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
>>>>>>>     2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for
>>>>>>> key
>>>>>>> in
>>>>>>> rocksdb.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in
>>>>>>> process
>>>>>>> loop.
>>>>>>>     org.rocksdb.RocksDBException: IO error: directory: Invalid
>>>>>>> argument
>>>>>>>         at org.rocksdb.RocksDB.open(Native Method)
>>>>>>>         at org.rocksdb.RocksDB.open(RocksDB.java:133)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
>>>>>>> RocksDbKeyValueStore.scala:85)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
>>>>>>> RocksDbKeyValueStore.scala:85)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>>>> RocksDbKeyValueStore.scala:92)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>>>> RocksDbKeyValueStore.scala:80)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get(
>>>>>>> SerializedKeyValueStore.scala:36)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
>>>>>>> NullSafeKeyValueStore.scala:36)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>>>>> KeyValueStorageEngine.scala:44)
>>>>>>>         at
>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged(
>>>>>>> EngagedUsersTask.scala:66)
>>>>>>>         at
>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.process(
>>>>>>> EngagedUsersTask.scala:100)
>>>>>>>         at
>>>>>>> org.apache.samza.container.TaskInstance$$anonfun$process$
>>>>>>> 1.apply$mcV$sp(TaskInstance.scala:137)
>>>>>>>         at
>>>>>>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
>>>>>>> TaskInstanceExceptionHandler.scala:54)
>>>>>>>         at
>>>>>>> org.apache.samza.container.TaskInstance.process(
>>>>>>> TaskInstance.scala:136)
>>>>>>>         at
>>>>>>> org.apache.samza.container.RunLoop$$anonfun$process$2.
>>>>>>> apply(RunLoop.scala:93)
>>>>>>>         at
>>>>>>> org.apache.samza.util.TimerUtils$class.updateTimer(
>>>>>>> TimerUtils.scala:37)
>>>>>>>         at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
>>>>>>> scala:36)
>>>>>>>         at org.apache.samza.container.RunLoop.process(RunLoop.scala:
>>>>>>> 79)
>>>>>>>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>>>>>>         at
>>>>>>> org.apache.samza.container.SamzaContainer.run(
>>>>>>> SamzaContainer.scala:556)
>>>>>>>         at
>>>>>>> org.apache.samza.container.SamzaContainer$.safeMain(
>>>>>>> SamzaContainer.scala:108)
>>>>>>>         at
>>>>>>> org.apache.samza.container.SamzaContainer$.main(
>>>>>>> SamzaContainer.scala:87)
>>>>>>>         at
>>>>>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
>>>>>>> multiplexer.
>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy
>>>>>>> for
>>>>>>> localhost:9092
>>>>>>>     2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect
>>>>>>> due
>>>>>>> to
>>>>>>> socket error: null
>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
>>>>>>> exception in broker proxy thread.
>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
>>>>>>> interrupt.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
>>>>>>> multiplexer.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>>>> instance
>>>>>>> stream tasks.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>>>> instance
>>>>>>> stores.
>>>>>>>
>>>>>>>
>>>>>>> Same exception is thrown if I try to put a value in RocksDB. Has
>>>>>>> anyone
>>>>>>> run into this problem before or has any pointers into solving it?
>>>>>>>
>>>>>>> Lukas
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to