STORM-548:Receive Thread Shutdown hook should connect to local hostname but not 
'localhost'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fcd45d82
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fcd45d82
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fcd45d82

Branch: refs/heads/master
Commit: fcd45d82d7ae9b54645c944acc5945bd80fe1f4b
Parents: f9707b5 96e8a5b
Author: caofangkun <caofang...@gmail.com>
Authored: Mon Nov 24 10:49:51 2014 +0800
Committer: caofangkun <caofang...@gmail.com>
Committed: Mon Nov 24 10:49:51 2014 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   17 +-
 DEVELOPER.md                                    |    4 +
 LICENSE                                         |   14 +-
 README.markdown                                 |    2 +
 SECURITY.md                                     |  356 ++-
 STORM-UI-REST-API.md                            |   17 +-
 bin/storm                                       |   15 +-
 bin/storm.cmd                                   |   11 +-
 conf/defaults.yaml                              |   45 +-
 conf/jaas_digest.conf                           |    8 +-
 conf/jaas_kerberos.conf                         |   15 +
 .../storm-starter/multilang/resources/storm.js  |   20 +-
 .../storm-starter/multilang/resources/storm.py  |   87 +-
 .../storm-starter/multilang/resources/storm.rb  |   90 +-
 examples/storm-starter/pom.xml                  |    2 +-
 .../src/jvm/storm/starter/util/StormRunner.java |    3 +-
 external/storm-hbase/pom.xml                    |    4 +-
 external/storm-hdfs/pom.xml                     |    4 +-
 external/storm-kafka/README.md                  |    7 +-
 external/storm-kafka/pom.xml                    |    2 +-
 .../src/jvm/storm/kafka/KafkaUtils.java         |   63 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |   11 +-
 .../jvm/storm/kafka/UpdateOffsetException.java  |    5 +
 .../src/test/storm/kafka/KafkaUtilsTest.java    |    7 +-
 logback/cluster.xml                             |    4 +-
 logback/worker.xml                              |   41 +
 pom.xml                                         |   39 +-
 .../maven-shade-clojure-transformer/pom.xml     |    4 +-
 storm-core/pom.xml                              |  307 +-
 .../src/clj/backtype/storm/LocalCluster.clj     |    4 +
 storm-core/src/clj/backtype/storm/LocalDRPC.clj |    4 +-
 .../src/clj/backtype/storm/MockAutoCred.clj     |   58 +
 storm-core/src/clj/backtype/storm/bootstrap.clj |    5 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |  167 +-
 .../storm/command/upload_credentials.clj        |   35 +
 storm-core/src/clj/backtype/storm/config.clj    |   40 +-
 .../backtype/storm/daemon/builtin_metrics.clj   |   21 +-
 .../src/clj/backtype/storm/daemon/common.clj    |   18 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |  174 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |  137 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |  271 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  245 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  185 +-
 .../src/clj/backtype/storm/daemon/task.clj      |    7 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  109 +-
 storm-core/src/clj/backtype/storm/disruptor.clj |   11 +-
 .../src/clj/backtype/storm/messaging/loader.clj |   11 +-
 .../clj/backtype/storm/messaging/loader.clj~    |   89 +
 storm-core/src/clj/backtype/storm/testing.clj   |   56 +-
 storm-core/src/clj/backtype/storm/testing4j.clj |    5 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |   28 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  173 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |   69 +-
 storm-core/src/clj/backtype/storm/util.clj      |   95 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |   19 +-
 storm-core/src/clj/storm/trident/testing.clj    |    1 +
 .../src/dev/drpc-simple-acl-test-scenario.yaml  |   11 +
 storm-core/src/dev/resources/storm.js           |    6 +
 storm-core/src/dev/resources/storm.py           |   25 +-
 storm-core/src/dev/resources/storm.rb           |   55 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  537 +++-
 .../jvm/backtype/storm/ConfigValidation.java    |  156 +-
 .../src/jvm/backtype/storm/Constants.java       |    3 +-
 .../backtype/storm/ICredentialsListener.java    |   32 +
 .../src/jvm/backtype/storm/ILocalCluster.java   |    2 +
 .../src/jvm/backtype/storm/StormSubmitter.java  |  164 +-
 .../storm/drpc/DRPCInvocationsClient.java       |   91 +-
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |  100 +-
 .../jvm/backtype/storm/drpc/ReturnResults.java  |   35 +-
 .../storm/generated/AuthorizationException.java |  345 ++
 .../backtype/storm/generated/Credentials.java   |  390 +++
 .../storm/generated/DistributedRPC.java         |  110 +-
 .../generated/DistributedRPCInvocations.java    |  352 +-
 .../jvm/backtype/storm/generated/Nimbus.java    | 3006 +++++++++++++++++-
 .../backtype/storm/generated/SubmitOptions.java |   98 +-
 .../backtype/storm/generated/TopologyInfo.java  |  192 +-
 .../storm/generated/TopologySummary.java        |  192 +-
 .../backtype/storm/messaging/netty/Client.java  |   73 +-
 .../storm/messaging/netty/ControlMessage.java   |    4 +-
 .../storm/messaging/netty/MessageDecoder.java   |   32 +-
 .../storm/messaging/netty/MessageEncoder.java   |    4 +
 .../storm/messaging/netty/SaslMessageToken.java |   99 +
 .../storm/messaging/netty/SaslNettyClient.java  |  166 +
 .../messaging/netty/SaslNettyClientState.java   |   31 +
 .../storm/messaging/netty/SaslNettyServer.java  |  165 +
 .../messaging/netty/SaslNettyServerState.java   |   31 +
 .../messaging/netty/SaslStormClientHandler.java |  158 +
 .../netty/SaslStormServerAuthorizeHandler.java  |   83 +
 .../messaging/netty/SaslStormServerHandler.java |  155 +
 .../storm/messaging/netty/SaslUtils.java        |   74 +
 .../backtype/storm/messaging/netty/Server.java  |  104 +-
 .../netty/StormClientPipelineFactory.java       |   12 +-
 .../messaging/netty/StormServerHandler.java     |    2 +-
 .../netty/StormServerPipelineFactory.java       |   20 +-
 .../jvm/backtype/storm/scheduler/Cluster.java   |   13 +
 .../scheduler/multitenant/DefaultPool.java      |  219 ++
 .../storm/scheduler/multitenant/FreePool.java   |  125 +
 .../scheduler/multitenant/IsolatedPool.java     |  346 ++
 .../multitenant/MultitenantScheduler.java       |   98 +
 .../storm/scheduler/multitenant/Node.java       |  343 ++
 .../storm/scheduler/multitenant/NodePool.java   |  296 ++
 .../storm/security/INimbusCredentialPlugin.java |   47 +
 .../backtype/storm/security/auth/AuthUtils.java |  228 +-
 .../auth/DefaultHttpCredentialsPlugin.java      |   87 +
 .../security/auth/DefaultPrincipalToLocal.java  |   43 +
 .../storm/security/auth/IAutoCredentials.java   |   55 +
 .../security/auth/ICredentialsRenewer.java      |   41 +
 .../auth/IGroupMappingServiceProvider.java      |   42 +
 .../security/auth/IHttpCredentialsPlugin.java   |   50 +
 .../storm/security/auth/IPrincipalToLocal.java  |   41 +
 .../storm/security/auth/ITransportPlugin.java   |   14 +-
 .../security/auth/KerberosPrincipalToLocal.java |   45 +
 .../storm/security/auth/ReqContext.java         |   10 +-
 .../security/auth/SaslTransportPlugin.java      |   44 +-
 .../security/auth/ShellBasedGroupsMapping.java  |   94 +
 .../security/auth/SimpleTransportPlugin.java    |   61 +-
 .../security/auth/SingleUserPrincipal.java      |   56 +
 .../storm/security/auth/TBackoffConnect.java    |   77 +
 .../storm/security/auth/ThriftClient.java       |   85 +-
 .../security/auth/ThriftConnectionType.java     |   77 +
 .../storm/security/auth/ThriftServer.java       |   19 +-
 .../auth/authorizer/DRPCAuthorizerBase.java     |   46 +
 .../authorizer/DRPCSimpleACLAuthorizer.java     |  157 +
 .../auth/authorizer/DenyAuthorizer.java         |    4 +-
 .../auth/authorizer/NoopAuthorizer.java         |    6 +-
 .../auth/authorizer/SimpleACLAuthorizer.java    |  131 +
 .../authorizer/SimpleWhitelistAuthorizer.java   |   70 +
 .../auth/digest/DigestSaslTransportPlugin.java  |    1 +
 .../storm/security/auth/hadoop/AutoHDFS.java    |  262 ++
 .../storm/security/auth/kerberos/AutoTGT.java   |  281 ++
 .../auth/kerberos/AutoTGTKrb5LoginModule.java   |  108 +
 .../kerberos/AutoTGTKrb5LoginModuleTest.java    |   44 +
 .../auth/kerberos/ClientCallbackHandler.java    |  104 +
 .../kerberos/KerberosSaslTransportPlugin.java   |  206 ++
 .../auth/kerberos/ServerCallbackHandler.java    |   86 +
 .../auth/kerberos/jaas_kerberos_cluster.conf    |   31 +
 .../auth/kerberos/jaas_kerberos_launcher.conf   |   12 +
 .../jvm/backtype/storm/spout/ShellSpout.java    |   65 +-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  211 +-
 .../testing/ForwardingMetricsConsumer.java      |   95 +
 .../testing/SingleUserSimpleTransport.java      |   37 +
 .../state/TestTransactionalState.java           |   47 +
 .../transactional/state/TransactionalState.java |   56 +-
 .../jvm/backtype/storm/utils/DRPCClient.java    |   63 +-
 .../backtype/storm/utils/DisruptorQueue.java    |   56 +-
 .../jvm/backtype/storm/utils/LocalState.java    |   44 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  |   11 +-
 .../jvm/backtype/storm/utils/ShellUtils.java    |  498 +++
 .../src/jvm/backtype/storm/utils/TestUtils.java |   34 +
 .../src/jvm/backtype/storm/utils/Utils.java     |  130 +-
 .../backtype/storm/utils/ZookeeperAuthInfo.java |    9 +-
 .../storm/utils/ZookeeperServerCnxnFactory.java |   84 +
 .../trident/drpc/ReturnResultsReducer.java      |   13 +-
 .../topology/state/TestTransactionalState.java  |   47 +
 .../topology/state/TransactionalState.java      |   58 +-
 storm-core/src/multilang/py/storm.py            |   25 +-
 storm-core/src/multilang/rb/storm.rb            |   55 +-
 .../src/native/worker-launcher/.autom4te.cfg    |   42 +
 .../worker-launcher/.deps/worker-launcher.Po    |    1 +
 .../src/native/worker-launcher/Makefile.am      |   32 +
 .../src/native/worker-launcher/configure.ac     |   50 +
 .../native/worker-launcher/impl/configuration.c |  340 ++
 .../native/worker-launcher/impl/configuration.h |   45 +
 .../src/native/worker-launcher/impl/main.c      |  210 ++
 .../worker-launcher/impl/worker-launcher.c      |  779 +++++
 .../worker-launcher/impl/worker-launcher.h      |  129 +
 .../worker-launcher/test/test-worker-launcher.c |  340 ++
 storm-core/src/py/__init__.py                   |   16 +
 storm-core/src/py/storm/DistributedRPC-remote   |   18 +
 storm-core/src/py/storm/DistributedRPC.py       |   37 +-
 .../py/storm/DistributedRPCInvocations-remote   |   18 +
 .../src/py/storm/DistributedRPCInvocations.py   |   96 +-
 storm-core/src/py/storm/Nimbus-remote           |   25 +
 storm-core/src/py/storm/Nimbus.py               |  652 +++-
 storm-core/src/py/storm/__init__.py             |   16 +
 storm-core/src/py/storm/constants.py            |   16 +
 storm-core/src/py/storm/ttypes.py               | 1243 +++++++-
 storm-core/src/storm.thrift                     |   58 +-
 storm-core/src/ui/public/component.html         |   21 +-
 storm-core/src/ui/public/css/style.css          |    9 +
 storm-core/src/ui/public/index.html             |   12 +-
 storm-core/src/ui/public/js/script.js           |    3 +-
 .../public/templates/anti-forgery-template.html |   19 +
 .../public/templates/index-page-template.html   |   12 +
 .../templates/topology-page-template.html       |   23 +-
 .../src/ui/public/templates/user-template.html  |   25 +
 storm-core/src/ui/public/topology.html          |   21 +-
 .../test/clj/backtype/storm/cluster_test.clj    |   93 +-
 .../test/clj/backtype/storm/config_test.clj     |   11 +
 .../test/clj/backtype/storm/drpc_test.clj       |   14 +-
 .../clj/backtype/storm/local_state_test.clj     |   14 +-
 .../test/clj/backtype/storm/logviewer_test.clj  |  187 ++
 .../storm/messaging/netty_integration_test.clj  |    4 +-
 .../storm/messaging/netty_unit_test.clj         |   32 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |  377 ++-
 .../scheduler/multitenant_scheduler_test.clj    |  831 +++++
 .../storm/security/auth/AuthUtils_test.clj      |   16 +-
 .../auth/DefaultHttpCredentialsPlugin_test.clj  |   40 +
 .../storm/security/auth/ThriftClient_test.clj   |   28 +-
 .../storm/security/auth/ThriftServer_test.clj   |    8 +-
 .../backtype/storm/security/auth/auth_test.clj  |  374 ++-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |  226 ++
 .../security/auth/auto_login_module_test.clj    |   91 +
 .../storm/security/auth/drpc-auth-alice.jaas    |    5 +
 .../storm/security/auth/drpc-auth-bob.jaas      |    5 +
 .../storm/security/auth/drpc-auth-charlie.jaas  |    5 +
 .../storm/security/auth/drpc-auth-server.jaas   |    6 +
 .../storm/security/auth/drpc_auth_test.clj      |  315 ++
 .../storm/security/auth/nimbus_auth_test.clj    |  181 ++
 .../test/clj/backtype/storm/submitter_test.clj  |   75 +
 .../test/clj/backtype/storm/supervisor_test.clj |  180 +-
 .../test/clj/backtype/storm/testing4j_test.clj  |   32 +
 .../clj/backtype/storm/transactional_test.clj   |   27 +-
 .../utils/ZookeeperServerCnxnFactory_test.clj   |   35 +
 .../test/clj/backtype/storm/utils_test.clj      |   58 +-
 .../test/clj/storm/trident/state_test.clj       |   25 +-
 .../storm/utils/DisruptorQueueTest.java         |   25 +-
 storm-dist/binary/LICENSE                       |   15 +-
 storm-dist/binary/pom.xml                       |    2 +-
 storm-dist/binary/src/main/assembly/binary.xml  |    4 +
 storm-dist/source/pom.xml                       |    2 +-
 221 files changed, 22207 insertions(+), 1643 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fcd45d82/storm-core/src/clj/backtype/storm/messaging/loader.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/messaging/loader.clj
index c318b2a,e0ffeea..13130af
--- a/storm-core/src/clj/backtype/storm/messaging/loader.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/loader.clj
@@@ -65,12 -65,10 +65,11 @@@
     :kill-fn (fn [t] (System/exit 1))
     :priority Thread/NORM_PRIORITY]
    (let [max-buffer-size (int max-buffer-size)
 +        local-hostname (memoized-local-hostname)
-         socket (.bind ^IContext context storm-id port)
          thread-count (if receiver-thread-count receiver-thread-count 1)
-         vthreads (mk-receive-threads context storm-id port transfer-local-fn 
daemon kill-fn priority socket max-buffer-size thread-count)]
+         vthreads (mk-receive-threads storm-id port transfer-local-fn daemon 
kill-fn priority socket max-buffer-size thread-count)]
      (fn []
 -      (let [kill-socket (.connect ^IContext context storm-id "localhost" 
port)]
 +      (let [kill-socket (.connect ^IContext context storm-id local-hostname 
port)]
          (log-message "Shutting down receiving-thread: [" storm-id ", " port 
"]")
          (.send ^IConnection kill-socket
                    -1 (byte-array []))

http://git-wip-us.apache.org/repos/asf/storm/blob/fcd45d82/storm-core/src/clj/backtype/storm/messaging/loader.clj~
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/messaging/loader.clj~
index 0000000,0000000..35ceab5
new file mode 100644
--- /dev/null
+++ b/storm-core/src/clj/backtype/storm/messaging/loader.clj~
@@@ -1,0 -1,0 +1,89 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements.  See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership.  The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License.  You may obtain a copy of the License at
++;;
++;; http://www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
++(ns backtype.storm.messaging.loader
++  (:use [backtype.storm util log])
++  (:import [java.util ArrayList Iterator])
++  (:import [backtype.storm.messaging IContext IConnection TaskMessage])
++  (:import [backtype.storm.utils DisruptorQueue MutableObject])
++  (:require [backtype.storm.messaging [local :as local]])
++  (:require [backtype.storm [disruptor :as disruptor]]))
++
++(defn mk-local-context []
++  (local/mk-context))
++
++(defn- mk-receive-thread [storm-id port transfer-local-fn  daemon kill-fn 
priority socket max-buffer-size thread-id]
++    (async-loop
++       (fn []
++         (log-message "Starting receive-thread: [stormId: " storm-id ", port: 
" port ", thread-id: " thread-id  " ]")
++         (fn []
++           (let [batched (ArrayList.)
++                 ^Iterator iter (.recv ^IConnection socket 0 thread-id)
++                 closed (atom false)]
++             (when iter
++               (while (and (not @closed) (.hasNext iter)) 
++                  (let [packet (.next iter)
++                        task (if packet (.task ^TaskMessage packet))
++                        message (if packet (.message ^TaskMessage packet))]
++                      (if (= task -1)
++                         (do (log-message "Receiving-thread:[" storm-id ", " 
port "] received shutdown notice")
++                           (.close socket)
++                           (reset! closed  true))
++                         (when packet (.add batched [task message]))))))
++             
++             (when (not @closed)
++               (do
++                 (if (> (.size batched) 0)
++                   (transfer-local-fn batched))
++                 0)))))
++         :factory? true
++         :daemon daemon
++         :kill-fn kill-fn
++         :priority priority
++         :thread-name (str "worker-receiver-thread-" thread-id)))
++
++(defn- mk-receive-threads [storm-id port transfer-local-fn  daemon kill-fn 
priority socket max-buffer-size thread-count]
++  (into [] (for [thread-id (range thread-count)] 
++             (mk-receive-thread storm-id port transfer-local-fn  daemon 
kill-fn priority socket max-buffer-size thread-id))))
++
++
++(defnk launch-receive-thread!
++  [context socket storm-id receiver-thread-count port transfer-local-fn 
max-buffer-size
++   :daemon true
++   :kill-fn (fn [t] (System/exit 1))
++   :priority Thread/NORM_PRIORITY]
++  (let [max-buffer-size (int max-buffer-size)
++<<<<<<< HEAD
++        local-hostname (memoized-local-hostname)
++        socket (.bind ^IContext context storm-id port)
++=======
++>>>>>>> origin/master
++        thread-count (if receiver-thread-count receiver-thread-count 1)
++        vthreads (mk-receive-threads storm-id port transfer-local-fn daemon 
kill-fn priority socket max-buffer-size thread-count)]
++    (fn []
++      (let [kill-socket (.connect ^IContext context storm-id local-hostname 
port)]
++        (log-message "Shutting down receiving-thread: [" storm-id ", " port 
"]")
++        (.send ^IConnection kill-socket
++                  -1 (byte-array []))
++        
++        (.close ^IConnection kill-socket)
++        
++        (log-message "Waiting for receiving-thread:[" storm-id ", " port "] 
to die")
++        
++        (for [thread-id (range thread-count)] 
++             (.join (vthreads thread-id)))
++        
++        (log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
++        ))))

Reply via email to