Merge branch 'master' into security-upmerge
Conflicts:
storm-core/src/clj/backtype/storm/testing.clj
storm-core/src/jvm/backtype/storm/utils/LocalState.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6b18863
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6b18863
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6b18863
Branch: refs/heads/security
Commit: e6b18863e1cc5a5af8c7d1da703d31d9e17b70bf
Parents: 04a7937 e15dbe2
Author: Robert (Bobby) Evans <[email protected]>
Authored: Wed Nov 12 10:43:41 2014 -0600
Committer: Robert (Bobby) Evans <[email protected]>
Committed: Wed Nov 12 10:43:41 2014 -0600
----------------------------------------------------------------------
CHANGELOG.md | 7 +++
DEVELOPER.md | 4 ++
LICENSE | 14 ++++-
README.markdown | 1 +
bin/storm.cmd | 11 +++-
external/storm-kafka/README.md | 7 +--
.../src/jvm/storm/kafka/KafkaUtils.java | 63 ++++++++++----------
.../src/jvm/storm/kafka/PartitionManager.java | 11 +++-
.../jvm/storm/kafka/UpdateOffsetException.java | 5 ++
.../src/test/storm/kafka/KafkaUtilsTest.java | 6 +-
logback/cluster.xml | 4 +-
storm-core/pom.xml | 4 ++
.../clj/backtype/storm/daemon/supervisor.clj | 4 ++
storm-core/src/clj/backtype/storm/testing.clj | 4 +-
storm-core/src/clj/backtype/storm/testing4j.clj | 3 +
storm-core/src/dev/resources/storm.py | 8 ++-
.../backtype/storm/messaging/netty/Client.java | 2 +
.../backtype/storm/messaging/netty/Server.java | 14 ++---
.../jvm/backtype/storm/utils/LocalState.java | 29 ++++++---
storm-core/src/multilang/py/storm.py | 8 ++-
.../clj/backtype/storm/local_state_test.clj | 14 ++++-
.../storm/messaging/netty_unit_test.clj | 28 ++++-----
.../test/clj/backtype/storm/supervisor_test.clj | 4 ++
storm-dist/binary/LICENSE | 15 ++++-
24 files changed, 188 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/README.markdown
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/storm-core/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index c580bc8,b370eb7..6f7995a
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -605,8 -501,9 +605,10 @@@
(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
+ run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
storm-home (System/getProperty "storm.home")
+ storm-options (System/getProperty "storm.options")
+ storm-conf-file (System/getProperty "storm.conf.file")
storm-log-dir (or (System/getProperty "storm.log.dir") (str
storm-home file-path-separator "logs"))
stormroot (supervisor-stormdist-root conf storm-id)
jlp (jlp stormroot conf)
@@@ -637,8 -530,10 +639,10 @@@
[(str "-Djava.library.path=" jlp)
(str "-Dlogfile.name=" logfilename)
(str "-Dstorm.home=" storm-home)
+ (str "-Dstorm.conf.file=" storm-conf-file)
+ (str "-Dstorm.options=" storm-options)
(str "-Dstorm.log.dir=" storm-log-dir)
- (str "-Dlogback.configurationFile=" storm-home
file-path-separator "logback" file-path-separator "cluster.xml")
+ (str "-Dlogback.configurationFile=" storm-home
file-path-separator "logback" file-path-separator "worker.xml")
(str "-Dstorm.id=" storm-id)
(str "-Dworker.id=" worker-id)
(str "-Dworker.port=" port)
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/storm-core/src/clj/backtype/storm/testing4j.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/storm-core/src/jvm/backtype/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/LocalState.java
index f412ff3,14a45da..dc64e0f
--- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
@@@ -35,10 -37,9 +38,10 @@@ public class LocalState
private VersionedStore _vs;
public LocalState(String backingDir) throws IOException {
+ LOG.debug("New Local State for {}", backingDir);
_vs = new VersionedStore(backingDir);
}
-
+
public synchronized Map<Object, Object> snapshot() throws IOException {
int attempts = 0;
while(true) {
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 8aaa7e5,ed5797d..2061ddf
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@@ -26,10 -26,9 +26,10 @@@
(deftest test-basic
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT
"backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
@@@ -44,15 -43,14 +44,15 @@@
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
- (.term context)))
+ (.term context)))
(deftest test-large-msg
- (let [req_msg (apply str (repeat 2048000 'c'))
+ (let [req_msg (apply str (repeat 2048000 'c'))
storm-conf {STORM-MESSAGING-TRANSPORT
"backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
@@@ -72,10 -70,9 +72,10 @@@
(deftest test-server-delayed
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT
"backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
@@@ -102,10 -99,9 +102,10 @@@
(deftest test-batch
(let [storm-conf {STORM-MESSAGING-TRANSPORT
"backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
http://git-wip-us.apache.org/repos/asf/storm/blob/e6b18863/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/supervisor_test.clj
index ebbf060,0bb47f3..a3594a3
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@@ -259,10 -253,12 +259,12 @@@
opts
topo-opts
["-Djava.library.path="
- (str "-Dlogfile.name=worker-" mock-port
".log")
+ (str "-Dlogfile.name=" mock-storm-id
"-worker-" mock-port ".log")
"-Dstorm.home="
+ "-Dstorm.conf.file="
+ "-Dstorm.options="
(str "-Dstorm.log.dir=" file-path-separator
"logs")
- (str "-Dlogback.configurationFile="
file-path-separator "logback" file-path-separator "cluster.xml")
+ (str "-Dlogback.configurationFile="
file-path-separator "logback" file-path-separator "worker.xml")
(str "-Dstorm.id=" mock-storm-id)
(str "-Dworker.id=" mock-worker-id)
(str "-Dworker.port=" mock-port)
@@@ -351,101 -338,7 +353,103 @@@
mock-worker-id)
(verify-first-call-args-for-indices launch-process
[2]
- (merge topo-env
{"LD_LIBRARY_PATH" nil}))))))))
+ full-env)))))))
+
+(defn rm-r [f]
+ (if (.isDirectory f)
+ (for [sub (.listFiles f)] (rm-r sub))
+ (.delete f)
+ ))
+
+(deftest test-worker-launch-command-run-as-user
+ (testing "*.worker.childopts configuration"
+ (let [mock-port "42"
+ mock-storm-id "fake-storm-id"
+ mock-worker-id "fake-worker-id"
+ mock-cp "mock-classpath'quote-on-purpose"
+ storm-local (str "/tmp/" (UUID/randomUUID))
+ worker-script (str storm-local "/workers/" mock-worker-id
"/storm-worker-script.sh")
+ exp-launch ["/bin/worker-launcher"
+ "me"
+ "worker"
+ (str storm-local "/workers/" mock-worker-id)
+ worker-script]
+ exp-script-fn (fn [opts topo-opts]
+ (str "#!/bin/bash\n'export'
'LD_LIBRARY_PATH=';\n\nexec 'java' '-server'"
+ " " (shell-cmd opts)
+ " " (shell-cmd topo-opts)
+ " '-Djava.library.path='"
+ " '-Dlogfile.name=" mock-storm-id "-worker-"
mock-port ".log'"
+ " '-Dstorm.home='"
++ " '-Dstorm.conf.file='"
++ " '-Dstorm.options='"
+ " '-Dstorm.log.dir=/logs'"
+ "
'-Dlogback.configurationFile=/logback/worker.xml'"
+ " '-Dstorm.id=" mock-storm-id "'"
+ " '-Dworker.id=" mock-worker-id "'"
+ " '-Dworker.port=" mock-port "'"
+ " '-cp'
'mock-classpath'\"'\"'quote-on-purpose'"
+ " 'backtype.storm.daemon.worker'"
+ " '" mock-storm-id "'"
+ " '" mock-port "'"
+ " '" mock-worker-id "';"))]
+ (.mkdirs (io/file storm-local "workers" mock-worker-id))
+ (try
+ (testing "testing *.worker.childopts as strings with extra spaces"
+ (let [string-opts "-Dfoo=bar -Xmx1024m"
+ topo-string-opts "-Dkau=aux -Xmx2048m"
+ exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
+ ["-Dkau=aux" "-Xmx2048m"])
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+ STORM-LOCAL-DIR storm-local
+ SUPERVISOR-RUN-WORKER-AS-USER true
+ WORKER-CHILDOPTS string-opts}}]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-string-opts
+ TOPOLOGY-SUBMITTER-USER "me"}
+ add-to-classpath mock-cp
+ supervisor-stormdist-root nil
+ launch-process nil
+ set-worker-user! nil
+ supervisor/java-cmd "java"
+ supervisor/jlp nil
+ supervisor/write-log-metadata! nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id)
+ (verify-first-call-args-for-indices launch-process
+ [0]
+ exp-launch))
+ (is (= (slurp worker-script) exp-script))))
+ (testing "testing *.worker.childopts as list of strings, with spaces in
values"
+ (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
+ topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
+ exp-script (exp-script-fn list-opts topo-list-opts)
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+ STORM-LOCAL-DIR storm-local
+ SUPERVISOR-RUN-WORKER-AS-USER true
+ WORKER-CHILDOPTS list-opts}}]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-list-opts
+ TOPOLOGY-SUBMITTER-USER "me"}
+ add-to-classpath mock-cp
+ supervisor-stormdist-root nil
+ launch-process nil
+ set-worker-user! nil
+ supervisor/java-cmd "java"
+ supervisor/jlp nil
+ supervisor/write-log-metadata! nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id)
+ (verify-first-call-args-for-indices launch-process
+ [0]
+ exp-launch))
+ (is (= (slurp worker-script) exp-script))))
+(finally (rm-r (io/file storm-local)))
+))))
(deftest test-workers-go-bananas
;; test that multiple workers are started for a port, and test that