Hi Chris,
1. We're running locally using ProcessJobFactory
2. CentOS 7 x86_64
3.
startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
engaged-users.log: https://gist.github.com/imbusy/0b3d264a40ddf34ab8e7
engaged-users.properties:
https://gist.github.com/imbusy/d0019db29d7b68c60bfc
Also note that the properties file sets the default offset to oldest,
but the log file says that it's setting the offset to largest: "2015-02-17
18:46:32 GetOffset [INFO] Got reset of type largest."
4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
I checked the directory and it actually exists:
du -h /vagrant/SamzaJobs/deploy/samza/state
16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0
0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3
36K /vagrant/SamzaJobs/deploy/samza/state/engaged-store
36K /vagrant/SamzaJobs/deploy/samza/state
Lukas
-----Original Message-----
From: Chris Riccomini
Sent: Monday, February 16, 2015 5:53 PM
To: dev@samza.apache.org
Subject: Re: RocksDBException: IO error: directory: Invalid argument
Hey Lukas,
It looks like the exception is actually thrown on get, not put:
at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
KeyValueStorageEngine.scala:44)
1. Are you running your job under YARN, or as a local job
(ThreadJobFactory/ProcessJobFactory)?
2. What OS are you running on?
3. Could post a fully copy of your logs somewhere (github gist, pasteboard,
or something)?
4. Also, what does this line say in your logs:
info("Got storage engine base directory: %s" format storeBaseDir)
It sounds like something is getting messed up with the directory where the
RocksDB store is trying to keep its data.
Cheers,
Chris
On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me>
wrote:
Hello,
I was setting up the key-value storage engine in Samza and ran into an
exception when querying the data.
I added these properties to the config:
stores.engaged-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.engaged-store.changelog=kafka.engaged-store-changelog
# a custom data type with an appropriate Serde
stores.engaged-store.key.serde=UserAppPair
# wrote a Serde for Long using ByteBuffer
stores.engaged-store.msg.serde=Long
I have no trouble initializing the storage engine with:
val store =
context.getStore("engaged-store").asInstanceOf[KeyValueStore[UserAppPair,
Long]];
but when I query by the key when processing messages, it’s throwing an
exception:
val key = new UserAppPair(userId, appId);
val value = store.get(key);
Here’s the log:
2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an
invalid or empty offset None for [Follows,0]. Attempting to use Kafka's
auto.offset.reset setting. This can result in data loss if processing
continues.
2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is
defined for topic Follows
2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in
rocksdb.
2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process
loop.
org.rocksdb.RocksDBException: IO error: directory: Invalid argument
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:133)
at
org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(RocksDbKeyValueStore.scala:85)
at
org.apache.samza.storage.kv.RocksDbKeyValueStore.db(RocksDbKeyValueStore.scala:85)
at
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:92)
at
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:80)
at
org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
at
org.apache.samza.storage.kv.SerializedKeyValueStore.get(SerializedKeyValueStore.scala:36)
at
org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
at
org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
at
org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
at
me.doubledutch.analytics.task.EngagedUsersTask.engaged(EngagedUsersTask.scala:66)
at
me.doubledutch.analytics.task.EngagedUsersTask.process(EngagedUsersTask.scala:100)
at
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:137)
at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
at
org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
at
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:93)
at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at
org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
at
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
multiplexer.
2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for
localhost:9092
2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to
socket error: null
2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
exception in broker proxy thread.
2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
multiplexer.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
stream tasks.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
stores.
Same exception is thrown if I try to put a value in RocksDB. Has anyone
run into this problem before or has any pointers into solving it?
Lukas