Hey Lukas,

Could you try clearing out the state, and starting the job?

Cheers,
Chris

On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

> This happens every time even if I spin up a new VM. Happens after a
> restart as well.
>
> Lukas
>
> -----Original Message----- From: Chris Riccomini
> Sent: Tuesday, February 17, 2015 11:01 AM
> To: dev@samza.apache.org
> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>
> Hey Lukas,
>
> Interesting. Does this happen only after restarting your job? Or does it
> happen the first time, as well? I'm wondering if this is the problem:
>
>    options.setErrorIfExists(true)
>
> In RocksDbKeyValueStore.scala. I think this is set under the assumption
> that the job is run in YARN. If you run locally, it seems to me that the
> directory would continue to exist after a job is restarted. If you delete
> your state directory, and restart your job, does the problem temporarily go
> away until a subsequent restart happens?
>
> Cheers,
> Chris
>
> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me>
> wrote:
>
>  Hi Chris,
>>
>> 1. We're running locally using ProcessJobFactory
>> 2. CentOS 7 x86_64
>> 3.
>>    startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
>>    engaged-users.log: https://gist.github.com/imbusy/0b3d264a40ddf34ab8e7
>>    engaged-users.properties: https://gist.github.com/
>> imbusy/d0019db29d7b68c60bfc
>>
>>    Also note that the properties file sets the default offset to oldest,
>> but the log file says that it's setting the offset to largest: "2015-02-17
>> 18:46:32 GetOffset [INFO] Got reset of type largest."
>>
>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
>> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
>>    I checked the directory and it actually exists:
>>
>> du -h /vagrant/SamzaJobs/deploy/samza/state
>>
>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0
>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3
>> 36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
>> 36K    /vagrant/SamzaJobs/deploy/samza/state
>>
>> Lukas
>>
>> -----Original Message----- From: Chris Riccomini
>> Sent: Monday, February 16, 2015 5:53 PM
>> To: dev@samza.apache.org
>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>
>>
>> Hey Lukas,
>>
>> It looks like the exception is actually thrown on get, not put:
>>
>>          at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>> KeyValueStorageEngine.scala:44)
>>
>> 1. Are you running your job under YARN, or as a local job
>> (ThreadJobFactory/ProcessJobFactory)?
>> 2. What OS are you running on?
>> 3. Could post a fully copy of your logs somewhere (github gist,
>> pasteboard,
>> or something)?
>> 4.  Also, what does this line say in your logs:
>>
>>    info("Got storage engine base directory: %s" format storeBaseDir)
>>
>> It sounds like something is getting messed up with the directory where the
>> RocksDB store is trying to keep its data.
>>
>> Cheers,
>> Chris
>>
>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me>
>> wrote:
>>
>>  Hello,
>>
>>>
>>> I was setting up the key-value storage engine in Samza and ran into an
>>> exception when querying the data.
>>>
>>> I added these properties to the config:
>>>
>>>
>>> stores.engaged-store.factory=org.apache.samza.storage.kv.
>>> RocksDbKeyValueStorageEngineFactory
>>>     stores.engaged-store.changelog=kafka.engaged-store-changelog
>>>     # a custom data type with an appropriate Serde
>>>     stores.engaged-store.key.serde=UserAppPair
>>>     # wrote a Serde for Long using ByteBuffer
>>>     stores.engaged-store.msg.serde=Long
>>>
>>> I have no trouble initializing the storage engine with:
>>>
>>>     val store =
>>> context.getStore("engaged-store").asInstanceOf[
>>> KeyValueStore[UserAppPair,
>>> Long]];
>>>
>>> but when I query by the key when processing messages, it’s throwing an
>>> exception:
>>>
>>>     val key = new UserAppPair(userId, appId);
>>>     val value = store.get(key);
>>>
>>> Here’s the log:
>>>
>>>     2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
>>> localhost:9092
>>>     2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an
>>> invalid or empty offset None for [Follows,0]. Attempting to use Kafka's
>>> auto.offset.reset setting. This can result in data loss if processing
>>> continues.
>>>     2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is
>>> defined for topic Follows
>>>     2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
>>> localhost:9092
>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
>>>     2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in
>>> rocksdb.
>>>     2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in
>>> process
>>> loop.
>>>     org.rocksdb.RocksDBException: IO error: directory: Invalid argument
>>>         at org.rocksdb.RocksDB.open(Native Method)
>>>         at org.rocksdb.RocksDB.open(RocksDB.java:133)
>>>         at
>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
>>> RocksDbKeyValueStore.scala:85)
>>>         at
>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
>>> RocksDbKeyValueStore.scala:85)
>>>         at
>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>> RocksDbKeyValueStore.scala:92)
>>>         at
>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>> RocksDbKeyValueStore.scala:80)
>>>         at
>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
>>>         at
>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get(
>>> SerializedKeyValueStore.scala:36)
>>>         at
>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
>>>         at
>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
>>> NullSafeKeyValueStore.scala:36)
>>>         at
>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>> KeyValueStorageEngine.scala:44)
>>>         at
>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged(
>>> EngagedUsersTask.scala:66)
>>>         at
>>> me.doubledutch.analytics.task.EngagedUsersTask.process(
>>> EngagedUsersTask.scala:100)
>>>         at
>>> org.apache.samza.container.TaskInstance$$anonfun$process$
>>> 1.apply$mcV$sp(TaskInstance.scala:137)
>>>         at
>>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
>>> TaskInstanceExceptionHandler.scala:54)
>>>         at
>>> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
>>>         at
>>> org.apache.samza.container.RunLoop$$anonfun$process$2.
>>> apply(RunLoop.scala:93)
>>>         at
>>> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>>>         at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
>>> scala:36)
>>>         at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>>>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>>         at
>>> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>>>         at
>>> org.apache.samza.container.SamzaContainer$.safeMain(
>>> SamzaContainer.scala:108)
>>>         at
>>> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>>>         at
>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
>>> multiplexer.
>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for
>>> localhost:9092
>>>     2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due
>>> to
>>> socket error: null
>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
>>> exception in broker proxy thread.
>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
>>> interrupt.
>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
>>> multiplexer.
>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
>>> stream tasks.
>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
>>> stores.
>>>
>>>
>>> Same exception is thrown if I try to put a value in RocksDB. Has anyone
>>> run into this problem before or has any pointers into solving it?
>>>
>>> Lukas
>>>
>>>
>>
>>
>

Reply via email to