Same thing happens if I delete the whole deploy folder.

Lukas

-----Original Message----- From: Chris Riccomini
Sent: Tuesday, February 17, 2015 12:07 PM
To: dev@samza.apache.org
Subject: Re: RocksDBException: IO error: directory: Invalid argument

Hey Lukas,

Could you try clearing out the state, and starting the job?

Cheers,
Chris

On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

This happens every time even if I spin up a new VM. Happens after a
restart as well.

Lukas

-----Original Message----- From: Chris Riccomini
Sent: Tuesday, February 17, 2015 11:01 AM
To: dev@samza.apache.org
Subject: Re: RocksDBException: IO error: directory: Invalid argument

Hey Lukas,

Interesting. Does this happen only after restarting your job? Or does it
happen the first time, as well? I'm wondering if this is the problem:

   options.setErrorIfExists(true)

In RocksDbKeyValueStore.scala. I think this is set under the assumption
that the job is run in YARN. If you run locally, it seems to me that the
directory would continue to exist after a job is restarted. If you delete
your state directory, and restart your job, does the problem temporarily go
away until a subsequent restart happens?

Cheers,
Chris

On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

 Hi Chris,

1. We're running locally using ProcessJobFactory
2. CentOS 7 x86_64
3.
   startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
   engaged-users.log: https://gist.github.com/imbusy/0b3d264a40ddf34ab8e7
   engaged-users.properties: https://gist.github.com/
imbusy/d0019db29d7b68c60bfc

   Also note that the properties file sets the default offset to oldest,
but the log file says that it's setting the offset to largest: "2015-02-17
18:46:32 GetOffset [INFO] Got reset of type largest."

4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
   I checked the directory and it actually exists:

du -h /vagrant/SamzaJobs/deploy/samza/state

16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0
0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3
36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
36K    /vagrant/SamzaJobs/deploy/samza/state

Lukas

-----Original Message----- From: Chris Riccomini
Sent: Monday, February 16, 2015 5:53 PM
To: dev@samza.apache.org
Subject: Re: RocksDBException: IO error: directory: Invalid argument


Hey Lukas,

It looks like the exception is actually thrown on get, not put:

         at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
KeyValueStorageEngine.scala:44)

1. Are you running your job under YARN, or as a local job
(ThreadJobFactory/ProcessJobFactory)?
2. What OS are you running on?
3. Could post a fully copy of your logs somewhere (github gist,
pasteboard,
or something)?
4.  Also, what does this line say in your logs:

   info("Got storage engine base directory: %s" format storeBaseDir)

It sounds like something is getting messed up with the directory where the
RocksDB store is trying to keep its data.

Cheers,
Chris

On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me>
wrote:

 Hello,


I was setting up the key-value storage engine in Samza and ran into an
exception when querying the data.

I added these properties to the config:


stores.engaged-store.factory=org.apache.samza.storage.kv.
RocksDbKeyValueStorageEngineFactory
    stores.engaged-store.changelog=kafka.engaged-store-changelog
    # a custom data type with an appropriate Serde
    stores.engaged-store.key.serde=UserAppPair
    # wrote a Serde for Long using ByteBuffer
    stores.engaged-store.msg.serde=Long

I have no trouble initializing the storage engine with:

    val store =
context.getStore("engaged-store").asInstanceOf[
KeyValueStore[UserAppPair,
Long]];

but when I query by the key when processing messages, it’s throwing an
exception:

    val key = new UserAppPair(userId, appId);
    val value = store.get(key);

Here’s the log:

    2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an
invalid or empty offset None for [Follows,0]. Attempting to use Kafka's
auto.offset.reset setting. This can result in data loss if processing
continues.
2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is
defined for topic Follows
    2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
    2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
    2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in
rocksdb.
    2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in
process
loop.
    org.rocksdb.RocksDBException: IO error: directory: Invalid argument
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:133)
        at
org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
RocksDbKeyValueStore.scala:85)
        at
org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
RocksDbKeyValueStore.scala:85)
        at
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
RocksDbKeyValueStore.scala:92)
        at
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
RocksDbKeyValueStore.scala:80)
        at
org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
        at
org.apache.samza.storage.kv.SerializedKeyValueStore.get(
SerializedKeyValueStore.scala:36)
        at
org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
        at
org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
NullSafeKeyValueStore.scala:36)
        at
org.apache.samza.storage.kv.KeyValueStorageEngine.get(
KeyValueStorageEngine.scala:44)
        at
me.doubledutch.analytics.task.EngagedUsersTask.engaged(
EngagedUsersTask.scala:66)
        at
me.doubledutch.analytics.task.EngagedUsersTask.process(
EngagedUsersTask.scala:100)
        at
org.apache.samza.container.TaskInstance$$anonfun$process$
1.apply$mcV$sp(TaskInstance.scala:137)
        at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
TaskInstanceExceptionHandler.scala:54)
        at
org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
        at
org.apache.samza.container.RunLoop$$anonfun$process$2.
apply(RunLoop.scala:93)
        at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
scala:36)
        at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
        at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
        at
org.apache.samza.container.SamzaContainer$.safeMain(
SamzaContainer.scala:108)
        at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
        at
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
multiplexer.
    2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for
localhost:9092
    2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due
to
socket error: null
    2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
exception in broker proxy thread.
    2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
interrupt.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
multiplexer.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
stream tasks.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
stores.


Same exception is thrown if I try to put a value in RocksDB. Has anyone
run into this problem before or has any pointers into solving it?

Lukas






Reply via email to