I could try and built a test job if you can’t reproduce it locally.

Lukas

From: Chris Riccomini 
Sent: Tuesday, February 17, 2015 2:19 PM
To: Lukas Steiblys 
Cc: dev@samza.apache.org ; Chris Riccomini 
Subject: Re: RocksDBException: IO error: directory: Invalid argument

Hey Lukas, 

Let me try and reproduce locally.

Cheers,
Chris

On Tue, Feb 17, 2015 at 2:15 PM, Lukas Steiblys <lu...@doubledutch.me> wrote:

  There is a symlink in the user's directory to /vagrant, but as I said before, 
the user has no problem reading or writing files.

  I added a line to the deploy script "rm -rf deploy/samza/state" just before 
run-job.sh and I monitored the directory - it was deleted before running the 
job. However, this did not solve the problem.

  I also tried deleting the folder as the first instruction in init() even 
before acquiring a handle to the store, but then I got an error that 
state/engagement-store/Partition 0 was not found.

  Lukas

  -----Original Message----- From: Chris Riccomini
  Sent: Tuesday, February 17, 2015 1:47 PM
  To: dev@samza.apache.org
  Cc: Chris Riccomini
  Subject: Re: RocksDBException: IO error: directory: Invalid argument

  Hey Lukas,


  Hmm.


    1. I'm running it as another user, but in the user's home directory so it

  has no problem writing or reading files.

  If you're running from the user's home directory, how are the data files
  ending up in /vagrant? Samza uses:

  val storeBaseDir = new File(System.getProperty("user.dir"), "state")

  To set the directory for the data.


    Exception in thread "main" org.rocksdb.RocksDBException: Invalid

  argument: /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0:
  exists (error_if_exists is true)

  Awesome! So, this *is* the issue that I was referring to initially
  (options.setErrorIfExists(true)).
  Can you set your start script to fully delete the path before the job
  starts? This will get fully restored when the changelog restoration happens.

  Cheers,
  Chris

  On Tue, Feb 17, 2015 at 1:38 PM, Lukas Steiblys <lu...@doubledutch.me>
  wrote:


    Actually there's a symlink in the running user's home directory to
    /vagrant where the jobs are executed, but even then, it doesn't have any
    problems writing or reading the files.

    Lukas

    -----Original Message----- From: Lukas Steiblys
    Sent: Tuesday, February 17, 2015 1:37 PM

    To: dev@samza.apache.org
    Cc: Chris Riccomini
    Subject: Re: RocksDBException: IO error: directory: Invalid argument

    1. I'm running it as another user, but in the user's home directory so it
    has no problem writing or reading files.
    2. See below.
    3. I'm running Windows on my machine so I don't think I'll be able to run
    it
    outside the VM.

    I switched to root user, did "chmod -R a+rwx /vagrant", deleted "deploy"
    folder, ran the job as root as well and it still failed. However, there was
    a slight change in the error message in stderr:

    Exception in thread "main" org.rocksdb.RocksDBException: Invalid argument:
    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0: exists
    (error_if_exists is true)
       at org.rocksdb.RocksDB.open(Native Method)
       at org.rocksdb.RocksDB.open(RocksDB.java:133)
       at
    org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
    RocksDbKeyValueStore.scala:85)

    Even though the deploy folder was deleted before the job was run, it's
    failing on the check?

    Lukas

    -----Original Message----- From: Chris Riccomini
    Sent: Tuesday, February 17, 2015 1:02 PM
    To: dev@samza.apache.org
    Cc: Chris Riccomini
    Subject: Re: RocksDBException: IO error: directory: Invalid argument

    Hey Lucas,

    I'm wondering if this is a filesystem permission issue? This exception:

    org.rocksdb.RocksDBException: IO error: directory: Invalid argument

    Looks like it's coming from this line:


    https://github.com/facebook/rocksdb/blob/868bfa40336b99005beb9f4fc9cf2a
    cc0d330ae1/util/env_posix.cc#L1016

    Which seems to be trying to fsync data to disk. According to:

     http://docs.vagrantup.com/v2/synced-folders/basic_usage.html

    It sounds like the sync folder is set to be owned by the default Vagrant
    SSH user.

    1. Is this the user that you're running the Samza job as?
    2. Could you check the file permissions for /vagrant and all of its
    subdirectories, and make sure that they match up with what you expect (+rw
    for the Samza job's user)?
    3. If you try running the job outside of the VM, does it work?

    Cheers,
    Chris

    On Tue, Feb 17, 2015 at 12:57 PM, Lukas Steiblys <lu...@doubledutch.me>
    wrote:

    Yeah, I made sure the state is clean. This is the first time I'm trying to

      use RocksDB. I haven't tried LevelDB yet though.

      Lukas

      -----Original Message----- From: Chris Riccomini
      Sent: Tuesday, February 17, 2015 12:34 PM
      To: dev@samza.apache.org
      Cc: Chris Riccomini

      Subject: Re: RocksDBException: IO error: directory: Invalid argument

      Hey Lukas,

      Strange. Having a more detailed look at your logs.

      Note: /vagrant is a synced folder, and I think it *does* persist between
      VM
      restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the state
      should be empty.

      Cheers,
      Chris

      On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys <lu...@doubledutch.me>
      wrote:

      It starts out with a fresh FS. I deleted all the state, but the job still


        fails on the first get.

        Lukas

        -----Original Message----- From: Chris Riccomini
        Sent: Tuesday, February 17, 2015 12:12 PM
        To: Chris Riccomini
        Cc: dev@samza.apache.org

        Subject: Re: RocksDBException: IO error: directory: Invalid argument

        Hey Lukas,

        This happens every time even if I spin up a new VM.




          Ah I might have misunderstood. Are your VMs started with a fresh FS?

        You're
        not using EBS or anything like that, are you?

        I want to see if you're getting hit by that setErrorIfExists line. If
        you:

        1. Stop your job.
        2. Clear the state from the FS.
        3. Start your job.

        Does it work?

        Cheers,
        Chris

        On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccom...@apache.org
        >
        wrote:

        Hey Lukas,



          Could you try clearing out the state, and starting the job?

          Cheers,
          Chris

          On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys 
<lu...@doubledutch.me>
          wrote:

          This happens every time even if I spin up a new VM. Happens after a

          restart as well.


            Lukas

            -----Original Message----- From: Chris Riccomini
            Sent: Tuesday, February 17, 2015 11:01 AM
            To: dev@samza.apache.org
            Subject: Re: RocksDBException: IO error: directory: Invalid argument

            Hey Lukas,

            Interesting. Does this happen only after restarting your job? Or 
does
            it
            happen the first time, as well? I'm wondering if this is the 
problem:

               options.setErrorIfExists(true)

            In RocksDbKeyValueStore.scala. I think this is set under the 
assumption
            that the job is run in YARN. If you run locally, it seems to me that
            the
            directory would continue to exist after a job is restarted. If you
            delete
            your state directory, and restart your job, does the problem
            temporarily
            go
            away until a subsequent restart happens?

            Cheers,
            Chris

            On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys 
<lu...@doubledutch.me
            >
            wrote:

            Hi Chris,


            1. We're running locally using ProcessJobFactory

              2. CentOS 7 x86_64
              3.
                 startup.log: 
https://gist.github.com/imbusy/0592a9c52a96fcce48db
                 engaged-users.log: https://gist.github.com/
              imbusy/0b3d264a40ddf34ab8e7
                 engaged-users.properties: https://gist.github.com/
              imbusy/d0019db29d7b68c60bfc

                 Also note that the properties file sets the default offset to
              oldest,
              but the log file says that it's setting the offset to largest:
              "2015-02-17
              18:46:32 GetOffset [INFO] Got reset of type largest."

              4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] 
Got
              storage engine base directory: 
/vagrant/SamzaJobs/deploy/samza/state"
                 I checked the directory and it actually exists:

              du -h /vagrant/SamzaJobs/deploy/samza/state

              16K    
/vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition
              0
              0    
/vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
              0    
/vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
              16K    
/vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition
              3
              36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
              36K    /vagrant/SamzaJobs/deploy/samza/state

              Lukas

              -----Original Message----- From: Chris Riccomini
              Sent: Monday, February 16, 2015 5:53 PM
              To: dev@samza.apache.org
              Subject: Re: RocksDBException: IO error: directory: Invalid 
argument


              Hey Lukas,

              It looks like the exception is actually thrown on get, not put:

                       at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
              KeyValueStorageEngine.scala:44)

              1. Are you running your job under YARN, or as a local job
              (ThreadJobFactory/ProcessJobFactory)?
              2. What OS are you running on?
              3. Could post a fully copy of your logs somewhere (github gist,
              pasteboard,
              or something)?
              4.  Also, what does this line say in your logs:

                 info("Got storage engine base directory: %s" format 
storeBaseDir)

              It sounds like something is getting messed up with the directory 
where
              the
              RocksDB store is trying to keep its data.

              Cheers,
              Chris

              On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys 
<lu...@doubledutch.me
              >
              wrote:

              Hello,


              I was setting up the key-value storage engine in Samza and ran 
into
              an


                exception when querying the data.

                I added these properties to the config:


                stores.engaged-store.factory=org.apache.samza.storage.kv.
                RocksDbKeyValueStorageEngineFactory
                    stores.engaged-store.changelog=kafka.engaged-store-changelog
                    # a custom data type with an appropriate Serde
                    stores.engaged-store.key.serde=UserAppPair
                    # wrote a Serde for Long using ByteBuffer
                    stores.engaged-store.msg.serde=Long

                I have no trouble initializing the storage engine with:

                    val store =
                context.getStore("engaged-store").asInstanceOf[
                KeyValueStore[UserAppPair,
                Long]];

                but when I query by the key when processing messages, it’s 
throwing
                an
                exception:

                    val key = new UserAppPair(userId, appId);
                    val value = store.get(key);

                Here’s the log:

                    2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy 
for
                localhost:9092
                    2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we
                received
                an
                invalid or empty offset None for [Follows,0]. Attempting to use
                Kafka's
                auto.offset.reset setting. This can result in data loss if 
processing
                continues.
                    2015-02-16 23:30:18 GetOffset [INFO] Checking if
                auto.offset.reset
                is
                defined for topic Follows
                    2015-02-16 23:30:18 GetOffset [INFO] Got reset of type 
largest.
                    2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy 
for
                localhost:9092
                    2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
                    2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query 
for
                key
                in
                rocksdb.
                    2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception 
in
                process
                loop.
                    org.rocksdb.RocksDBException: IO error: directory: Invalid
                argument
                        at org.rocksdb.RocksDB.open(Native Method)
                        at org.rocksdb.RocksDB.open(RocksDB.java:133)
                        at
                org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
                RocksDbKeyValueStore.scala:85)
                        at
                org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
                RocksDbKeyValueStore.scala:85)
                        at
                org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
                RocksDbKeyValueStore.scala:92)
                        at
                org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
                RocksDbKeyValueStore.scala:80)
                        at
                
org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
                        at
                org.apache.samza.storage.kv.SerializedKeyValueStore.get(
                SerializedKeyValueStore.scala:36)
                        at
                
org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
                        at
                org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
                NullSafeKeyValueStore.scala:36)
                        at
                org.apache.samza.storage.kv.KeyValueStorageEngine.get(
                KeyValueStorageEngine.scala:44)
                        at
                me.doubledutch.analytics.task.EngagedUsersTask.engaged(
                EngagedUsersTask.scala:66)
                        at
                me.doubledutch.analytics.task.EngagedUsersTask.process(
                EngagedUsersTask.scala:100)
                        at
                org.apache.samza.container.TaskInstance$$anonfun$process$
                1.apply$mcV$sp(TaskInstance.scala:137)
                        at
                
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
                TaskInstanceExceptionHandler.scala:54)
                        at
                org.apache.samza.container.TaskInstance.process(
                TaskInstance.scala:136)
                        at
                org.apache.samza.container.RunLoop$$anonfun$process$2.
                apply(RunLoop.scala:93)
                        at
                org.apache.samza.util.TimerUtils$class.updateTimer(
                TimerUtils.scala:37)
                        at 
org.apache.samza.container.RunLoop.updateTimer(RunLoop.
                scala:36)
                        at 
org.apache.samza.container.RunLoop.process(RunLoop.scala:
                79)
                        at 
org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
                        at
                org.apache.samza.container.SamzaContainer.run(
                SamzaContainer.scala:556)
                        at
                org.apache.samza.container.SamzaContainer$.safeMain(
                SamzaContainer.scala:108)
                        at
                org.apache.samza.container.SamzaContainer$.main(
                SamzaContainer.scala:87)
                        at
                
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
                    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
                    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down 
consumer
                multiplexer.
                    2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down 
BrokerProxy
                for
                localhost:9092
                    2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] 
Reconnect
                due
                to
                socket error: null
                    2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by 
interrupt
                exception in broker proxy thread.
                    2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
                interrupt.
                    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down 
producer
                multiplexer.
                    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
                instance
                stream tasks.
                    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
                instance
                stores.


                Same exception is thrown if I try to put a value in RocksDB. Has
                anyone
                run into this problem before or has any pointers into solving 
it?

                Lukas





















Reply via email to