On Wed, Feb 18, 2015 at 5:37 AM, Lukas Steiblys <lu...@doubledutch.me> wrote:
> 1. I'm running it as another user, but in the user's home directory so it
> has no problem writing or reading files.
> 2. See below.
> 3. I'm running Windows on my machine so I don't think I'll be able to run it
> outside the VM.
Can you try to run it inside VM filesystem? Without using vagrant sync folder.
Just to rule out guest/host sync issues.
>
> I switched to root user, did "chmod -R a+rwx /vagrant", deleted "deploy"
> folder, ran the job as root as well and it still failed. However, there was
> a slight change in the error message in stderr:
>
> Exception in thread "main" org.rocksdb.RocksDBException: Invalid argument:
> /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0: exists
> (error_if_exists is true)
>    at org.rocksdb.RocksDB.open(Native Method)
>    at org.rocksdb.RocksDB.open(RocksDB.java:133)
>    at
> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(RocksDbKeyValueStore.scala:85)
>
> Even though the deploy folder was deleted before the job was run, it's
> failing on the check?
>
> Lukas
>
> -----Original Message----- From: Chris Riccomini
> Sent: Tuesday, February 17, 2015 1:02 PM
>
> To: dev@samza.apache.org
> Cc: Chris Riccomini
> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>
> Hey Lucas,
>
> I'm wondering if this is a filesystem permission issue? This exception:
>
>  org.rocksdb.RocksDBException: IO error: directory: Invalid argument
>
> Looks like it's coming from this line:
>
>
> https://github.com/facebook/rocksdb/blob/868bfa40336b99005beb9f4fc9cf2acc0d330ae1/util/env_posix.cc#L1016
>
> Which seems to be trying to fsync data to disk. According to:
>
>  http://docs.vagrantup.com/v2/synced-folders/basic_usage.html
>
> It sounds like the sync folder is set to be owned by the default Vagrant
> SSH user.
>
> 1. Is this the user that you're running the Samza job as?
> 2. Could you check the file permissions for /vagrant and all of its
> subdirectories, and make sure that they match up with what you expect (+rw
> for the Samza job's user)?
> 3. If you try running the job outside of the VM, does it work?
>
> Cheers,
> Chris
>
> On Tue, Feb 17, 2015 at 12:57 PM, Lukas Steiblys <lu...@doubledutch.me>
> wrote:
>
>> Yeah, I made sure the state is clean. This is the first time I'm trying to
>> use RocksDB. I haven't tried LevelDB yet though.
>>
>> Lukas
>>
>> -----Original Message----- From: Chris Riccomini
>> Sent: Tuesday, February 17, 2015 12:34 PM
>> To: dev@samza.apache.org
>> Cc: Chris Riccomini
>>
>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>
>> Hey Lukas,
>>
>> Strange. Having a more detailed look at your logs.
>>
>> Note: /vagrant is a synced folder, and I think it *does* persist between
>> VM
>> restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the state
>> should be empty.
>>
>> Cheers,
>> Chris
>>
>> On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys <lu...@doubledutch.me>
>> wrote:
>>
>>  It starts out with a fresh FS. I deleted all the state, but the job still
>>>
>>> fails on the first get.
>>>
>>> Lukas
>>>
>>> -----Original Message----- From: Chris Riccomini
>>> Sent: Tuesday, February 17, 2015 12:12 PM
>>> To: Chris Riccomini
>>> Cc: dev@samza.apache.org
>>>
>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>
>>> Hey Lukas,
>>>
>>>  This happens every time even if I spin up a new VM.
>>>
>>>>
>>>>
>>> Ah I might have misunderstood. Are your VMs started with a fresh FS?
>>> You're
>>> not using EBS or anything like that, are you?
>>>
>>> I want to see if you're getting hit by that setErrorIfExists line. If
>>> you:
>>>
>>> 1. Stop your job.
>>> 2. Clear the state from the FS.
>>> 3. Start your job.
>>>
>>> Does it work?
>>>
>>> Cheers,
>>> Chris
>>>
>>> On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccom...@apache.org>
>>> wrote:
>>>
>>>  Hey Lukas,
>>>
>>>>
>>>> Could you try clearing out the state, and starting the job?
>>>>
>>>> Cheers,
>>>> Chris
>>>>
>>>> On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me>
>>>> wrote:
>>>>
>>>>  This happens every time even if I spin up a new VM. Happens after a
>>>>
>>>>> restart as well.
>>>>>
>>>>> Lukas
>>>>>
>>>>> -----Original Message----- From: Chris Riccomini
>>>>> Sent: Tuesday, February 17, 2015 11:01 AM
>>>>> To: dev@samza.apache.org
>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>>
>>>>> Hey Lukas,
>>>>>
>>>>> Interesting. Does this happen only after restarting your job? Or does
>>>>> it
>>>>> happen the first time, as well? I'm wondering if this is the problem:
>>>>>
>>>>>    options.setErrorIfExists(true)
>>>>>
>>>>> In RocksDbKeyValueStore.scala. I think this is set under the assumption
>>>>> that the job is run in YARN. If you run locally, it seems to me that
>>>>> the
>>>>> directory would continue to exist after a job is restarted. If you
>>>>> delete
>>>>> your state directory, and restart your job, does the problem
>>>>> temporarily
>>>>> go
>>>>> away until a subsequent restart happens?
>>>>>
>>>>> Cheers,
>>>>> Chris
>>>>>
>>>>> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me>
>>>>> wrote:
>>>>>
>>>>>  Hi Chris,
>>>>>
>>>>>
>>>>>> 1. We're running locally using ProcessJobFactory
>>>>>> 2. CentOS 7 x86_64
>>>>>> 3.
>>>>>>    startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
>>>>>>    engaged-users.log: https://gist.github.com/
>>>>>> imbusy/0b3d264a40ddf34ab8e7
>>>>>>    engaged-users.properties: https://gist.github.com/
>>>>>> imbusy/d0019db29d7b68c60bfc
>>>>>>
>>>>>>    Also note that the properties file sets the default offset to
>>>>>> oldest,
>>>>>> but the log file says that it's setting the offset to largest:
>>>>>> "2015-02-17
>>>>>> 18:46:32 GetOffset [INFO] Got reset of type largest."
>>>>>>
>>>>>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
>>>>>> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
>>>>>>    I checked the directory and it actually exists:
>>>>>>
>>>>>> du -h /vagrant/SamzaJobs/deploy/samza/state
>>>>>>
>>>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0
>>>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
>>>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
>>>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3
>>>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
>>>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state
>>>>>>
>>>>>> Lukas
>>>>>>
>>>>>> -----Original Message----- From: Chris Riccomini
>>>>>> Sent: Monday, February 16, 2015 5:53 PM
>>>>>> To: dev@samza.apache.org
>>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>>>
>>>>>>
>>>>>> Hey Lukas,
>>>>>>
>>>>>> It looks like the exception is actually thrown on get, not put:
>>>>>>
>>>>>>          at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>>>> KeyValueStorageEngine.scala:44)
>>>>>>
>>>>>> 1. Are you running your job under YARN, or as a local job
>>>>>> (ThreadJobFactory/ProcessJobFactory)?
>>>>>> 2. What OS are you running on?
>>>>>> 3. Could post a fully copy of your logs somewhere (github gist,
>>>>>> pasteboard,
>>>>>> or something)?
>>>>>> 4.  Also, what does this line say in your logs:
>>>>>>
>>>>>>    info("Got storage engine base directory: %s" format storeBaseDir)
>>>>>>
>>>>>> It sounds like something is getting messed up with the directory where
>>>>>> the
>>>>>> RocksDB store is trying to keep its data.
>>>>>>
>>>>>> Cheers,
>>>>>> Chris
>>>>>>
>>>>>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me>
>>>>>> wrote:
>>>>>>
>>>>>>  Hello,
>>>>>>
>>>>>>
>>>>>>  I was setting up the key-value storage engine in Samza and ran into
>>>>>> an
>>>>>>>
>>>>>>> exception when querying the data.
>>>>>>>
>>>>>>> I added these properties to the config:
>>>>>>>
>>>>>>>
>>>>>>> stores.engaged-store.factory=org.apache.samza.storage.kv.
>>>>>>> RocksDbKeyValueStorageEngineFactory
>>>>>>>     stores.engaged-store.changelog=kafka.engaged-store-changelog
>>>>>>>     # a custom data type with an appropriate Serde
>>>>>>>     stores.engaged-store.key.serde=UserAppPair
>>>>>>>     # wrote a Serde for Long using ByteBuffer
>>>>>>>     stores.engaged-store.msg.serde=Long
>>>>>>>
>>>>>>> I have no trouble initializing the storage engine with:
>>>>>>>
>>>>>>>     val store =
>>>>>>> context.getStore("engaged-store").asInstanceOf[
>>>>>>> KeyValueStore[UserAppPair,
>>>>>>> Long]];
>>>>>>>
>>>>>>> but when I query by the key when processing messages, it’s throwing
>>>>>>> an
>>>>>>> exception:
>>>>>>>
>>>>>>>     val key = new UserAppPair(userId, appId);
>>>>>>>     val value = store.get(key);
>>>>>>>
>>>>>>> Here’s the log:
>>>>>>>
>>>>>>>     2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
>>>>>>> localhost:9092
>>>>>>>     2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we
>>>>>>> received
>>>>>>> an
>>>>>>> invalid or empty offset None for [Follows,0]. Attempting to use
>>>>>>> Kafka's
>>>>>>> auto.offset.reset setting. This can result in data loss if processing
>>>>>>> continues.
>>>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Checking if
>>>>>>> auto.offset.reset
>>>>>>> is
>>>>>>> defined for topic Follows
>>>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
>>>>>>> localhost:9092
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
>>>>>>>     2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for
>>>>>>> key
>>>>>>> in
>>>>>>> rocksdb.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in
>>>>>>> process
>>>>>>> loop.
>>>>>>>     org.rocksdb.RocksDBException: IO error: directory: Invalid
>>>>>>> argument
>>>>>>>         at org.rocksdb.RocksDB.open(Native Method)
>>>>>>>         at org.rocksdb.RocksDB.open(RocksDB.java:133)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
>>>>>>> RocksDbKeyValueStore.scala:85)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
>>>>>>> RocksDbKeyValueStore.scala:85)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>>>> RocksDbKeyValueStore.scala:92)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>>>> RocksDbKeyValueStore.scala:80)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get(
>>>>>>> SerializedKeyValueStore.scala:36)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
>>>>>>> NullSafeKeyValueStore.scala:36)
>>>>>>>         at
>>>>>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>>>>> KeyValueStorageEngine.scala:44)
>>>>>>>         at
>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged(
>>>>>>> EngagedUsersTask.scala:66)
>>>>>>>         at
>>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.process(
>>>>>>> EngagedUsersTask.scala:100)
>>>>>>>         at
>>>>>>> org.apache.samza.container.TaskInstance$$anonfun$process$
>>>>>>> 1.apply$mcV$sp(TaskInstance.scala:137)
>>>>>>>         at
>>>>>>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
>>>>>>> TaskInstanceExceptionHandler.scala:54)
>>>>>>>         at
>>>>>>> org.apache.samza.container.TaskInstance.process(
>>>>>>> TaskInstance.scala:136)
>>>>>>>         at
>>>>>>> org.apache.samza.container.RunLoop$$anonfun$process$2.
>>>>>>> apply(RunLoop.scala:93)
>>>>>>>         at
>>>>>>> org.apache.samza.util.TimerUtils$class.updateTimer(
>>>>>>> TimerUtils.scala:37)
>>>>>>>         at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
>>>>>>> scala:36)
>>>>>>>         at org.apache.samza.container.RunLoop.process(RunLoop.scala:
>>>>>>> 79)
>>>>>>>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>>>>>>         at
>>>>>>> org.apache.samza.container.SamzaContainer.run(
>>>>>>> SamzaContainer.scala:556)
>>>>>>>         at
>>>>>>> org.apache.samza.container.SamzaContainer$.safeMain(
>>>>>>> SamzaContainer.scala:108)
>>>>>>>         at
>>>>>>> org.apache.samza.container.SamzaContainer$.main(
>>>>>>> SamzaContainer.scala:87)
>>>>>>>         at
>>>>>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
>>>>>>> multiplexer.
>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy
>>>>>>> for
>>>>>>> localhost:9092
>>>>>>>     2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect
>>>>>>> due
>>>>>>> to
>>>>>>> socket error: null
>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
>>>>>>> exception in broker proxy thread.
>>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
>>>>>>> interrupt.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
>>>>>>> multiplexer.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>>>> instance
>>>>>>> stream tasks.
>>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>>>> instance
>>>>>>> stores.
>>>>>>>
>>>>>>>
>>>>>>> Same exception is thrown if I try to put a value in RocksDB. Has
>>>>>>> anyone
>>>>>>> run into this problem before or has any pointers into solving it?
>>>>>>>
>>>>>>> Lukas
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to