It starts out with a fresh FS. I deleted all the state, but the job still fails on the first get.

Lukas

-----Original Message----- From: Chris Riccomini
Sent: Tuesday, February 17, 2015 12:12 PM
To: Chris Riccomini
Cc: dev@samza.apache.org
Subject: Re: RocksDBException: IO error: directory: Invalid argument

Hey Lukas,

This happens every time even if I spin up a new VM.

Ah I might have misunderstood. Are your VMs started with a fresh FS? You're
not using EBS or anything like that, are you?

I want to see if you're getting hit by that setErrorIfExists line. If you:

1. Stop your job.
2. Clear the state from the FS.
3. Start your job.

Does it work?

Cheers,
Chris

On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccom...@apache.org>
wrote:

Hey Lukas,

Could you try clearing out the state, and starting the job?

Cheers,
Chris

On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

This happens every time even if I spin up a new VM. Happens after a
restart as well.

Lukas

-----Original Message----- From: Chris Riccomini
Sent: Tuesday, February 17, 2015 11:01 AM
To: dev@samza.apache.org
Subject: Re: RocksDBException: IO error: directory: Invalid argument

Hey Lukas,

Interesting. Does this happen only after restarting your job? Or does it
happen the first time, as well? I'm wondering if this is the problem:

   options.setErrorIfExists(true)

In RocksDbKeyValueStore.scala. I think this is set under the assumption
that the job is run in YARN. If you run locally, it seems to me that the
directory would continue to exist after a job is restarted. If you delete
your state directory, and restart your job, does the problem temporarily
go
away until a subsequent restart happens?

Cheers,
Chris

On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

 Hi Chris,

1. We're running locally using ProcessJobFactory
2. CentOS 7 x86_64
3.
   startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
   engaged-users.log: https://gist.github.com/
imbusy/0b3d264a40ddf34ab8e7
   engaged-users.properties: https://gist.github.com/
imbusy/d0019db29d7b68c60bfc

   Also note that the properties file sets the default offset to oldest,
but the log file says that it's setting the offset to largest:
"2015-02-17
18:46:32 GetOffset [INFO] Got reset of type largest."

4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
   I checked the directory and it actually exists:

du -h /vagrant/SamzaJobs/deploy/samza/state

16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0
0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3
36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
36K    /vagrant/SamzaJobs/deploy/samza/state

Lukas

-----Original Message----- From: Chris Riccomini
Sent: Monday, February 16, 2015 5:53 PM
To: dev@samza.apache.org
Subject: Re: RocksDBException: IO error: directory: Invalid argument


Hey Lukas,

It looks like the exception is actually thrown on get, not put:

         at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
KeyValueStorageEngine.scala:44)

1. Are you running your job under YARN, or as a local job
(ThreadJobFactory/ProcessJobFactory)?
2. What OS are you running on?
3. Could post a fully copy of your logs somewhere (github gist,
pasteboard,
or something)?
4.  Also, what does this line say in your logs:

   info("Got storage engine base directory: %s" format storeBaseDir)

It sounds like something is getting messed up with the directory where
the
RocksDB store is trying to keep its data.

Cheers,
Chris

On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

 Hello,


I was setting up the key-value storage engine in Samza and ran into an
exception when querying the data.

I added these properties to the config:


stores.engaged-store.factory=org.apache.samza.storage.kv.
RocksDbKeyValueStorageEngineFactory
    stores.engaged-store.changelog=kafka.engaged-store-changelog
    # a custom data type with an appropriate Serde
    stores.engaged-store.key.serde=UserAppPair
    # wrote a Serde for Long using ByteBuffer
    stores.engaged-store.msg.serde=Long

I have no trouble initializing the storage engine with:

    val store =
context.getStore("engaged-store").asInstanceOf[
KeyValueStore[UserAppPair,
Long]];

but when I query by the key when processing messages, it’s throwing an
exception:

    val key = new UserAppPair(userId, appId);
    val value = store.get(key);

Here’s the log:

    2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
    2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received
an
invalid or empty offset None for [Follows,0]. Attempting to use Kafka's
auto.offset.reset setting. This can result in data loss if processing
continues.
    2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset
is
defined for topic Follows
    2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
    2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
    2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
    2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key
in
rocksdb.
    2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in
process
loop.
    org.rocksdb.RocksDBException: IO error: directory: Invalid argument
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:133)
        at
org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
RocksDbKeyValueStore.scala:85)
        at
org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
RocksDbKeyValueStore.scala:85)
        at
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
RocksDbKeyValueStore.scala:92)
        at
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
RocksDbKeyValueStore.scala:80)
        at
org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
        at
org.apache.samza.storage.kv.SerializedKeyValueStore.get(
SerializedKeyValueStore.scala:36)
        at
org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
        at
org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
NullSafeKeyValueStore.scala:36)
        at
org.apache.samza.storage.kv.KeyValueStorageEngine.get(
KeyValueStorageEngine.scala:44)
        at
me.doubledutch.analytics.task.EngagedUsersTask.engaged(
EngagedUsersTask.scala:66)
        at
me.doubledutch.analytics.task.EngagedUsersTask.process(
EngagedUsersTask.scala:100)
        at
org.apache.samza.container.TaskInstance$$anonfun$process$
1.apply$mcV$sp(TaskInstance.scala:137)
        at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
TaskInstanceExceptionHandler.scala:54)
        at
org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
        at
org.apache.samza.container.RunLoop$$anonfun$process$2.
apply(RunLoop.scala:93)
        at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
scala:36)
        at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
        at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
        at
org.apache.samza.container.SamzaContainer$.safeMain(
SamzaContainer.scala:108)
        at
org.apache.samza.container.SamzaContainer$.main(
SamzaContainer.scala:87)
        at
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
multiplexer.
2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for
localhost:9092
    2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due
to
socket error: null
    2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
exception in broker proxy thread.
    2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
interrupt.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
multiplexer.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
instance
stream tasks.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
instance
stores.


Same exception is thrown if I try to put a value in RocksDB. Has anyone
run into this problem before or has any pointers into solving it?

Lukas







Reply via email to