Hey Lukas,

Interesting. Does this happen only after restarting your job? Or does it
happen the first time, as well? I'm wondering if this is the problem:

    options.setErrorIfExists(true)

In RocksDbKeyValueStore.scala. I think this is set under the assumption
that the job is run in YARN. If you run locally, it seems to me that the
directory would continue to exist after a job is restarted. If you delete
your state directory, and restart your job, does the problem temporarily go
away until a subsequent restart happens?

Cheers,
Chris

On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

> Hi Chris,
>
> 1. We're running locally using ProcessJobFactory
> 2. CentOS 7 x86_64
> 3.
>    startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
>    engaged-users.log: https://gist.github.com/imbusy/0b3d264a40ddf34ab8e7
>    engaged-users.properties: https://gist.github.com/
> imbusy/d0019db29d7b68c60bfc
>
>    Also note that the properties file sets the default offset to oldest,
> but the log file says that it's setting the offset to largest: "2015-02-17
> 18:46:32 GetOffset [INFO] Got reset of type largest."
>
> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
>    I checked the directory and it actually exists:
>
> du -h /vagrant/SamzaJobs/deploy/samza/state
>
> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0
> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3
> 36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
> 36K    /vagrant/SamzaJobs/deploy/samza/state
>
> Lukas
>
> -----Original Message----- From: Chris Riccomini
> Sent: Monday, February 16, 2015 5:53 PM
> To: dev@samza.apache.org
> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>
>
> Hey Lukas,
>
> It looks like the exception is actually thrown on get, not put:
>
>          at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
> KeyValueStorageEngine.scala:44)
>
> 1. Are you running your job under YARN, or as a local job
> (ThreadJobFactory/ProcessJobFactory)?
> 2. What OS are you running on?
> 3. Could post a fully copy of your logs somewhere (github gist, pasteboard,
> or something)?
> 4.  Also, what does this line say in your logs:
>
>    info("Got storage engine base directory: %s" format storeBaseDir)
>
> It sounds like something is getting messed up with the directory where the
> RocksDB store is trying to keep its data.
>
> Cheers,
> Chris
>
> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me>
> wrote:
>
>  Hello,
>>
>> I was setting up the key-value storage engine in Samza and ran into an
>> exception when querying the data.
>>
>> I added these properties to the config:
>>
>>
>> stores.engaged-store.factory=org.apache.samza.storage.kv.
>> RocksDbKeyValueStorageEngineFactory
>>     stores.engaged-store.changelog=kafka.engaged-store-changelog
>>     # a custom data type with an appropriate Serde
>>     stores.engaged-store.key.serde=UserAppPair
>>     # wrote a Serde for Long using ByteBuffer
>>     stores.engaged-store.msg.serde=Long
>>
>> I have no trouble initializing the storage engine with:
>>
>>     val store =
>> context.getStore("engaged-store").asInstanceOf[KeyValueStore[UserAppPair,
>> Long]];
>>
>> but when I query by the key when processing messages, it’s throwing an
>> exception:
>>
>>     val key = new UserAppPair(userId, appId);
>>     val value = store.get(key);
>>
>> Here’s the log:
>>
>>     2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
>> localhost:9092
>>     2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an
>> invalid or empty offset None for [Follows,0]. Attempting to use Kafka's
>> auto.offset.reset setting. This can result in data loss if processing
>> continues.
>>     2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is
>> defined for topic Follows
>>     2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
>>     2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
>> localhost:9092
>>     2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
>>     2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in
>> rocksdb.
>>     2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process
>> loop.
>>     org.rocksdb.RocksDBException: IO error: directory: Invalid argument
>>         at org.rocksdb.RocksDB.open(Native Method)
>>         at org.rocksdb.RocksDB.open(RocksDB.java:133)
>>         at
>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
>> RocksDbKeyValueStore.scala:85)
>>         at
>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
>> RocksDbKeyValueStore.scala:85)
>>         at
>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>> RocksDbKeyValueStore.scala:92)
>>         at
>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>> RocksDbKeyValueStore.scala:80)
>>         at
>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
>>         at
>> org.apache.samza.storage.kv.SerializedKeyValueStore.get(
>> SerializedKeyValueStore.scala:36)
>>         at
>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
>>         at
>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
>> NullSafeKeyValueStore.scala:36)
>>         at
>> org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>> KeyValueStorageEngine.scala:44)
>>         at
>> me.doubledutch.analytics.task.EngagedUsersTask.engaged(
>> EngagedUsersTask.scala:66)
>>         at
>> me.doubledutch.analytics.task.EngagedUsersTask.process(
>> EngagedUsersTask.scala:100)
>>         at
>> org.apache.samza.container.TaskInstance$$anonfun$process$
>> 1.apply$mcV$sp(TaskInstance.scala:137)
>>         at
>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
>> TaskInstanceExceptionHandler.scala:54)
>>         at
>> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
>>         at
>> org.apache.samza.container.RunLoop$$anonfun$process$2.
>> apply(RunLoop.scala:93)
>>         at
>> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>>         at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
>> scala:36)
>>         at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>         at
>> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>>         at
>> org.apache.samza.container.SamzaContainer$.safeMain(
>> SamzaContainer.scala:108)
>>         at
>> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>>         at
>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
>> multiplexer.
>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for
>> localhost:9092
>>     2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to
>> socket error: null
>>     2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
>> exception in broker proxy thread.
>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt.
>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
>> multiplexer.
>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
>> stream tasks.
>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
>> stores.
>>
>>
>> Same exception is thrown if I try to put a value in RocksDB. Has anyone
>> run into this problem before or has any pointers into solving it?
>>
>> Lukas
>>
>
>

Reply via email to