STORM-548,rm additional file loader.clj~
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca29b5b2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca29b5b2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca29b5b2 Branch: refs/heads/master Commit: ca29b5b23313eb46824842a321d55193b99db4eb Parents: fcd45d8 Author: caofangkun <caofang...@gmail.com> Authored: Tue Dec 16 11:03:28 2014 +0800 Committer: caofangkun <caofang...@gmail.com> Committed: Tue Dec 16 11:03:28 2014 +0800 ---------------------------------------------------------------------- .../clj/backtype/storm/messaging/loader.clj~ | 89 -------------------- 1 file changed, 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ca29b5b2/storm-core/src/clj/backtype/storm/messaging/loader.clj~ ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj~ b/storm-core/src/clj/backtype/storm/messaging/loader.clj~ deleted file mode 100644 index 35ceab5..0000000 --- a/storm-core/src/clj/backtype/storm/messaging/loader.clj~ +++ /dev/null @@ -1,89 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns backtype.storm.messaging.loader - (:use [backtype.storm util log]) - (:import [java.util ArrayList Iterator]) - (:import [backtype.storm.messaging IContext IConnection TaskMessage]) - (:import [backtype.storm.utils DisruptorQueue MutableObject]) - (:require [backtype.storm.messaging [local :as local]]) - (:require [backtype.storm [disruptor :as disruptor]])) - -(defn mk-local-context [] - (local/mk-context)) - -(defn- mk-receive-thread [storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-id] - (async-loop - (fn [] - (log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id " ]") - (fn [] - (let [batched (ArrayList.) - ^Iterator iter (.recv ^IConnection socket 0 thread-id) - closed (atom false)] - (when iter - (while (and (not @closed) (.hasNext iter)) - (let [packet (.next iter) - task (if packet (.task ^TaskMessage packet)) - message (if packet (.message ^TaskMessage packet))] - (if (= task -1) - (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice") - (.close socket) - (reset! closed true)) - (when packet (.add batched [task message])))))) - - (when (not @closed) - (do - (if (> (.size batched) 0) - (transfer-local-fn batched)) - 0))))) - :factory? true - :daemon daemon - :kill-fn kill-fn - :priority priority - :thread-name (str "worker-receiver-thread-" thread-id))) - -(defn- mk-receive-threads [storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-count] - (into [] (for [thread-id (range thread-count)] - (mk-receive-thread storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-id)))) - - -(defnk launch-receive-thread! - [context socket storm-id receiver-thread-count port transfer-local-fn max-buffer-size - :daemon true - :kill-fn (fn [t] (System/exit 1)) - :priority Thread/NORM_PRIORITY] - (let [max-buffer-size (int max-buffer-size) -<<<<<<< HEAD - local-hostname (memoized-local-hostname) - socket (.bind ^IContext context storm-id port) -======= ->>>>>>> origin/master - thread-count (if receiver-thread-count receiver-thread-count 1) - vthreads (mk-receive-threads storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-count)] - (fn [] - (let [kill-socket (.connect ^IContext context storm-id local-hostname port)] - (log-message "Shutting down receiving-thread: [" storm-id ", " port "]") - (.send ^IConnection kill-socket - -1 (byte-array [])) - - (.close ^IConnection kill-socket) - - (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die") - - (for [thread-id (range thread-count)] - (.join (vthreads thread-id))) - - (log-message "Shutdown receiving-thread: [" storm-id ", " port "]") - ))))