Christophe Carré created STORM-493:
--------------------------------------

             Summary: Workers don't inherit storm.conf.file/storm.options 
properties of the supervisor
                 Key: STORM-493
                 URL: https://issues.apache.org/jira/browse/STORM-493
             Project: Apache Storm (Incubating)
          Issue Type: Bug
    Affects Versions: 0.9.2-incubating, 0.9.3-incubating
            Reporter: Christophe Carré
            Priority: Minor


If we override some configuration parameters on the command line (using storm 
-c "param=value") when we launch the supervisor, workers don't inherit them.

{noformat}
> cat conf/storm.yaml
storm.zookeeper.servers:
     - "127.0.0.1"
nimbus.host: "127.0.0.1"
storm.zookeeper.root: "/stormtest"
storm.local.dir: "storm-local-main"

> python bin/storm -c "storm.local.dir=\"storm-local-custom\"" supervisor

> less logs/worker-6701.log
[...]
2014-09-10 09:35:00 o.a.s.z.s.ZooKeeperServer [INFO] Server 
environment:user.dir=/optc/2014-09-05-5aae7686
2014-09-10 09:35:01 b.s.d.worker [INFO] Launching worker for 
mytopo-1-1410334488 on 96f32da2-2043-4371-988b-ec9ca107ce69:6701 with id 
b9178c80-922b-4b8d-9984-7cfaf06f3c86 and conf {"dev.zookeeper.path" 
"/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, 
"topology.builtin.metrics.bucket.size.secs" 60, 
"topology.fall.back.on.java.serialization" true, 
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000, 
"topology.skip.missing.kryo.registrations" false, 
"storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", 
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
"topology.trident.batch.emit.interval.millis" 500, 
"storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 
10, "logviewer.childopts" "-Xmx128m", "java.library.path" 
"/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 
1024, "storm.local.dir" "storm-local-main", "storm.messaging.netty.buffer_size" 
5242880, "supervisor.worker.start.timeout.secs" 120, 
"topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, 
"nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
"storm.meta.serialization.delegate" 
"backtype.storm.serialization.DefaultSerializationDelegate", 
"topology.worker.shared.thread.pool.size" 4, "nimbus.host" "127.0.0.1", 
"storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2181, 
"transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 
1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" 
"/stormtest", "storm.zookeeper.retry.intervalceiling.millis" 30000, 
"supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, 
"storm.zookeeper.servers" ["127.0.0.1"], "transactional.zookeeper.root" 
"/transactional", "topology.acker.executors" nil, 
"topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
"drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
"supervisor.heartbeat.frequency.secs" 5, 
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
"supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
"topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
"topology.tasks" nil, "storm.messaging.netty.max_retries" 300, 
"topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", 
"nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, 
"storm.zookeeper.retry.interval" 1000, 
"topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" 
"backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" 
[6700 6701 6702 6703], "topology.environment" nil, "topology.debug" false, 
"nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, 
"topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, 
"topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 
6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, 
"topology.tuple.serializer" 
"backtype.storm.serialization.types.ListDelegateSerializer", 
"topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", 
"topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", 
"nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, 
"topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", 
"drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, 
"storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, 
"storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", 
"topology.state.synchronization.timeout.secs" 60, 
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, 
"storm.messaging.transport" "backtype.storm.messaging.netty.Context", 
"logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
"drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, 
"nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", 
"topology.max.task.parallelism" nil, 
"storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
2014-09-10 09:35:01 b.s.util [DEBUG] Touching file at 
storm-local-main/workers/b9178c80-922b-4b8d-9984-7cfaf06f3c86/pids/30958
2014-09-10 09:35:01 b.s.d.worker [ERROR] Error on initialization of server 
mk-worker
java.io.IOException: No such file or directory
        at java.io.UnixFileSystem.createFileExclusively(Native Method) 
~[na:1.7.0_60]
        at java.io.File.createNewFile(File.java:1006) ~[na:1.7.0_60]
        at backtype.storm.util$touch.invoke(util.clj:519) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at 
backtype.storm.daemon.worker$fn__6535$exec_fn__1474__auto____6536.invoke(worker.clj:362)
 ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
        at 
backtype.storm.daemon.worker$fn__6535$mk_worker__6591.doInvoke(worker.clj:354) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker.main(Unknown Source) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
2014-09-10 09:35:01 b.s.util [ERROR] Halting process: ("Error on 
initialization")
java.lang.RuntimeException: ("Error on initialization")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:319) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at 
backtype.storm.daemon.worker$fn__6535$mk_worker__6591.doInvoke(worker.clj:354) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker.main(Unknown Source) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
[...]
{noformat}

The worker tries to use the local directory defined in the configuration file 
(storm-local-main) instead of the local directory defined on the command line 
(storm-local-custom).

----

There is a similar problem with configuration file name. If we specify it on 
the command line, workers don't inherit it and search the default "storm.yaml" 
file in the classpath.

{noformat}
> cat conf/storm.yaml
storm.zookeeper.servers:
     - "127.0.0.1"
nimbus.host: "127.0.0.1"
storm.zookeeper.root: "/stormtest"
storm.local.dir: "storm-local-default"

> cat conf/storm-custom.yaml
storm.zookeeper.servers:
     - "127.0.0.1"
nimbus.host: "127.0.0.1"
storm.zookeeper.root: "/stormtest"
storm.local.dir: "storm-local-custom"

> python bin/storm --config storm-custom.yaml supervisor

> less logs/worker-6700.log
[...]
2014-09-10 10:16:01 o.a.s.z.s.ZooKeeperServer [INFO] Server 
environment:user.dir=/optc/2014-09-05-5aae7686
2014-09-10 10:16:01 b.s.d.worker [INFO] Launching worker for 
mytopo-1-1410336834 on 38c99fbd-e5e5-4c53-b704-7ec460f5e227:6700 with id 
2133bad6-432d-4c63-a156-1184064d296b and conf {"dev.zookeeper.path" 
"/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, 
"topology.builtin.metrics.bucket.size.secs" 60, 
"topology.fall.back.on.java.serialization" true, 
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000, 
"topology.skip.missing.kryo.registrations" false, 
"storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", 
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
"topology.trident.batch.emit.interval.millis" 500, 
"storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 
10, "logviewer.childopts" "-Xmx128m", "java.library.path" 
"/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 
1024, "storm.local.dir" "storm-local-default", 
"storm.messaging.netty.buffer_size" 5242880, 
"supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" 
true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 
3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" 
"backtype.storm.serialization.DefaultSerializationDelegate", 
"topology.worker.shared.thread.pool.size" 4, "nimbus.host" "127.0.0.1", 
"storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2181, 
"transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 
1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" 
"/stormtest", "storm.zookeeper.retry.intervalceiling.millis" 30000, 
"supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, 
"storm.zookeeper.servers" ["127.0.0.1"], "transactional.zookeeper.root" 
"/transactional", "topology.acker.executors" nil, 
"topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
"drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
"supervisor.heartbeat.frequency.secs" 5, 
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
"supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
"topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
"topology.tasks" nil, "storm.messaging.netty.max_retries" 300, 
"topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", 
"nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, 
"storm.zookeeper.retry.interval" 1000, 
"topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" 
"backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" 
[6700 6701 6702 6703], "topology.environment" nil, "topology.debug" false, 
"nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, 
"topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, 
"topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 
6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, 
"topology.tuple.serializer" 
"backtype.storm.serialization.types.ListDelegateSerializer", 
"topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", 
"topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", 
"nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, 
"topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", 
"drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, 
"storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, 
"storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", 
"topology.state.synchronization.timeout.secs" 60, 
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, 
"storm.messaging.transport" "backtype.storm.messaging.netty.Context", 
"logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
"drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, 
"nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", 
"topology.max.task.parallelism" nil, 
"storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
2014-09-10 10:16:01 b.s.util [DEBUG] Touching file at 
storm-local-default/workers/2133bad6-432d-4c63-a156-1184064d296b/pids/31689
2014-09-10 10:16:01 b.s.d.worker [ERROR] Error on initialization of server 
mk-worker
java.io.IOException: No such file or directory
        at java.io.UnixFileSystem.createFileExclusively(Native Method) 
~[na:1.7.0_60]
        at java.io.File.createNewFile(File.java:1006) ~[na:1.7.0_60]
        at backtype.storm.util$touch.invoke(util.clj:519) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at 
backtype.storm.daemon.worker$fn__6535$exec_fn__1474__auto____6536.invoke(worker.clj:362)
 ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
        at 
backtype.storm.daemon.worker$fn__6535$mk_worker__6591.doInvoke(worker.clj:354) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker.main(Unknown Source) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
2014-09-10 10:16:01 b.s.util [ERROR] Halting process: ("Error on 
initialization")
java.lang.RuntimeException: ("Error on initialization")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:319) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at 
backtype.storm.daemon.worker$fn__6535$mk_worker__6591.doInvoke(worker.clj:354) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
        at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
        at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker.main(Unknown Source) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
{noformat}

The worker tries to use the local directory "storm-local-default" defined in 
the configuration file "storm.yaml" instead of using the local directory 
"storm-local-custom" defined in the file "storm-custom.yaml" specified on the 
command line.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to