Hey Lukas,

Strange. Having a more detailed look at your logs.

Note: /vagrant is a synced folder, and I think it *does* persist between VM
restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the state
should be empty.

Cheers,
Chris

On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

> It starts out with a fresh FS. I deleted all the state, but the job still
> fails on the first get.
>
> Lukas
>
> -----Original Message----- From: Chris Riccomini
> Sent: Tuesday, February 17, 2015 12:12 PM
> To: Chris Riccomini
> Cc: dev@samza.apache.org
>
> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>
> Hey Lukas,
>
>  This happens every time even if I spin up a new VM.
>>
>
> Ah I might have misunderstood. Are your VMs started with a fresh FS? You're
> not using EBS or anything like that, are you?
>
> I want to see if you're getting hit by that setErrorIfExists line. If you:
>
> 1. Stop your job.
> 2. Clear the state from the FS.
> 3. Start your job.
>
> Does it work?
>
> Cheers,
> Chris
>
> On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccom...@apache.org>
> wrote:
>
>  Hey Lukas,
>>
>> Could you try clearing out the state, and starting the job?
>>
>> Cheers,
>> Chris
>>
>> On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me>
>> wrote:
>>
>>  This happens every time even if I spin up a new VM. Happens after a
>>> restart as well.
>>>
>>> Lukas
>>>
>>> -----Original Message----- From: Chris Riccomini
>>> Sent: Tuesday, February 17, 2015 11:01 AM
>>> To: dev@samza.apache.org
>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>
>>> Hey Lukas,
>>>
>>> Interesting. Does this happen only after restarting your job? Or does it
>>> happen the first time, as well? I'm wondering if this is the problem:
>>>
>>>    options.setErrorIfExists(true)
>>>
>>> In RocksDbKeyValueStore.scala. I think this is set under the assumption
>>> that the job is run in YARN. If you run locally, it seems to me that the
>>> directory would continue to exist after a job is restarted. If you delete
>>> your state directory, and restart your job, does the problem temporarily
>>> go
>>> away until a subsequent restart happens?
>>>
>>> Cheers,
>>> Chris
>>>
>>> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me>
>>> wrote:
>>>
>>>  Hi Chris,
>>>
>>>>
>>>> 1. We're running locally using ProcessJobFactory
>>>> 2. CentOS 7 x86_64
>>>> 3.
>>>>    startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
>>>>    engaged-users.log: https://gist.github.com/
>>>> imbusy/0b3d264a40ddf34ab8e7
>>>>    engaged-users.properties: https://gist.github.com/
>>>> imbusy/d0019db29d7b68c60bfc
>>>>
>>>>    Also note that the properties file sets the default offset to oldest,
>>>> but the log file says that it's setting the offset to largest:
>>>> "2015-02-17
>>>> 18:46:32 GetOffset [INFO] Got reset of type largest."
>>>>
>>>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
>>>> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
>>>>    I checked the directory and it actually exists:
>>>>
>>>> du -h /vagrant/SamzaJobs/deploy/samza/state
>>>>
>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0
>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3
>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state
>>>>
>>>> Lukas
>>>>
>>>> -----Original Message----- From: Chris Riccomini
>>>> Sent: Monday, February 16, 2015 5:53 PM
>>>> To: dev@samza.apache.org
>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>
>>>>
>>>> Hey Lukas,
>>>>
>>>> It looks like the exception is actually thrown on get, not put:
>>>>
>>>>          at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>> KeyValueStorageEngine.scala:44)
>>>>
>>>> 1. Are you running your job under YARN, or as a local job
>>>> (ThreadJobFactory/ProcessJobFactory)?
>>>> 2. What OS are you running on?
>>>> 3. Could post a fully copy of your logs somewhere (github gist,
>>>> pasteboard,
>>>> or something)?
>>>> 4.  Also, what does this line say in your logs:
>>>>
>>>>    info("Got storage engine base directory: %s" format storeBaseDir)
>>>>
>>>> It sounds like something is getting messed up with the directory where
>>>> the
>>>> RocksDB store is trying to keep its data.
>>>>
>>>> Cheers,
>>>> Chris
>>>>
>>>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me>
>>>> wrote:
>>>>
>>>>  Hello,
>>>>
>>>>
>>>>> I was setting up the key-value storage engine in Samza and ran into an
>>>>> exception when querying the data.
>>>>>
>>>>> I added these properties to the config:
>>>>>
>>>>>
>>>>> stores.engaged-store.factory=org.apache.samza.storage.kv.
>>>>> RocksDbKeyValueStorageEngineFactory
>>>>>     stores.engaged-store.changelog=kafka.engaged-store-changelog
>>>>>     # a custom data type with an appropriate Serde
>>>>>     stores.engaged-store.key.serde=UserAppPair
>>>>>     # wrote a Serde for Long using ByteBuffer
>>>>>     stores.engaged-store.msg.serde=Long
>>>>>
>>>>> I have no trouble initializing the storage engine with:
>>>>>
>>>>>     val store =
>>>>> context.getStore("engaged-store").asInstanceOf[
>>>>> KeyValueStore[UserAppPair,
>>>>> Long]];
>>>>>
>>>>> but when I query by the key when processing messages, it’s throwing an
>>>>> exception:
>>>>>
>>>>>     val key = new UserAppPair(userId, appId);
>>>>>     val value = store.get(key);
>>>>>
>>>>> Here’s the log:
>>>>>
>>>>>     2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
>>>>> localhost:9092
>>>>>     2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received
>>>>> an
>>>>> invalid or empty offset None for [Follows,0]. Attempting to use Kafka's
>>>>> auto.offset.reset setting. This can result in data loss if processing
>>>>> continues.
>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset
>>>>> is
>>>>> defined for topic Follows
>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
>>>>> localhost:9092
>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
>>>>>     2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key
>>>>> in
>>>>> rocksdb.
>>>>>     2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in
>>>>> process
>>>>> loop.
>>>>>     org.rocksdb.RocksDBException: IO error: directory: Invalid argument
>>>>>         at org.rocksdb.RocksDB.open(Native Method)
>>>>>         at org.rocksdb.RocksDB.open(RocksDB.java:133)
>>>>>         at
>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
>>>>> RocksDbKeyValueStore.scala:85)
>>>>>         at
>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
>>>>> RocksDbKeyValueStore.scala:85)
>>>>>         at
>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>> RocksDbKeyValueStore.scala:92)
>>>>>         at
>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>> RocksDbKeyValueStore.scala:80)
>>>>>         at
>>>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
>>>>>         at
>>>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get(
>>>>> SerializedKeyValueStore.scala:36)
>>>>>         at
>>>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
>>>>>         at
>>>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
>>>>> NullSafeKeyValueStore.scala:36)
>>>>>         at
>>>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>>> KeyValueStorageEngine.scala:44)
>>>>>         at
>>>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged(
>>>>> EngagedUsersTask.scala:66)
>>>>>         at
>>>>> me.doubledutch.analytics.task.EngagedUsersTask.process(
>>>>> EngagedUsersTask.scala:100)
>>>>>         at
>>>>> org.apache.samza.container.TaskInstance$$anonfun$process$
>>>>> 1.apply$mcV$sp(TaskInstance.scala:137)
>>>>>         at
>>>>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
>>>>> TaskInstanceExceptionHandler.scala:54)
>>>>>         at
>>>>> org.apache.samza.container.TaskInstance.process(
>>>>> TaskInstance.scala:136)
>>>>>         at
>>>>> org.apache.samza.container.RunLoop$$anonfun$process$2.
>>>>> apply(RunLoop.scala:93)
>>>>>         at
>>>>> org.apache.samza.util.TimerUtils$class.updateTimer(
>>>>> TimerUtils.scala:37)
>>>>>         at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
>>>>> scala:36)
>>>>>         at org.apache.samza.container.RunLoop.process(RunLoop.scala:
>>>>> 79)
>>>>>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>>>>         at
>>>>> org.apache.samza.container.SamzaContainer.run(
>>>>> SamzaContainer.scala:556)
>>>>>         at
>>>>> org.apache.samza.container.SamzaContainer$.safeMain(
>>>>> SamzaContainer.scala:108)
>>>>>         at
>>>>> org.apache.samza.container.SamzaContainer$.main(
>>>>> SamzaContainer.scala:87)
>>>>>         at
>>>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
>>>>> multiplexer.
>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy
>>>>> for
>>>>> localhost:9092
>>>>>     2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due
>>>>> to
>>>>> socket error: null
>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
>>>>> exception in broker proxy thread.
>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
>>>>> interrupt.
>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
>>>>> multiplexer.
>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>> instance
>>>>> stream tasks.
>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>> instance
>>>>> stores.
>>>>>
>>>>>
>>>>> Same exception is thrown if I try to put a value in RocksDB. Has anyone
>>>>> run into this problem before or has any pointers into solving it?
>>>>>
>>>>> Lukas
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to