STORM-548,rm additional file loader.clj~

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca29b5b2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca29b5b2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca29b5b2

Branch: refs/heads/master
Commit: ca29b5b23313eb46824842a321d55193b99db4eb
Parents: fcd45d8
Author: caofangkun <caofang...@gmail.com>
Authored: Tue Dec 16 11:03:28 2014 +0800
Committer: caofangkun <caofang...@gmail.com>
Committed: Tue Dec 16 11:03:28 2014 +0800

----------------------------------------------------------------------
 .../clj/backtype/storm/messaging/loader.clj~    | 89 --------------------
 1 file changed, 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ca29b5b2/storm-core/src/clj/backtype/storm/messaging/loader.clj~
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj~ 
b/storm-core/src/clj/backtype/storm/messaging/loader.clj~
deleted file mode 100644
index 35ceab5..0000000
--- a/storm-core/src/clj/backtype/storm/messaging/loader.clj~
+++ /dev/null
@@ -1,89 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.messaging.loader
-  (:use [backtype.storm util log])
-  (:import [java.util ArrayList Iterator])
-  (:import [backtype.storm.messaging IContext IConnection TaskMessage])
-  (:import [backtype.storm.utils DisruptorQueue MutableObject])
-  (:require [backtype.storm.messaging [local :as local]])
-  (:require [backtype.storm [disruptor :as disruptor]]))
-
-(defn mk-local-context []
-  (local/mk-context))
-
-(defn- mk-receive-thread [storm-id port transfer-local-fn  daemon kill-fn 
priority socket max-buffer-size thread-id]
-    (async-loop
-       (fn []
-         (log-message "Starting receive-thread: [stormId: " storm-id ", port: 
" port ", thread-id: " thread-id  " ]")
-         (fn []
-           (let [batched (ArrayList.)
-                 ^Iterator iter (.recv ^IConnection socket 0 thread-id)
-                 closed (atom false)]
-             (when iter
-               (while (and (not @closed) (.hasNext iter)) 
-                  (let [packet (.next iter)
-                        task (if packet (.task ^TaskMessage packet))
-                        message (if packet (.message ^TaskMessage packet))]
-                      (if (= task -1)
-                         (do (log-message "Receiving-thread:[" storm-id ", " 
port "] received shutdown notice")
-                           (.close socket)
-                           (reset! closed  true))
-                         (when packet (.add batched [task message]))))))
-             
-             (when (not @closed)
-               (do
-                 (if (> (.size batched) 0)
-                   (transfer-local-fn batched))
-                 0)))))
-         :factory? true
-         :daemon daemon
-         :kill-fn kill-fn
-         :priority priority
-         :thread-name (str "worker-receiver-thread-" thread-id)))
-
-(defn- mk-receive-threads [storm-id port transfer-local-fn  daemon kill-fn 
priority socket max-buffer-size thread-count]
-  (into [] (for [thread-id (range thread-count)] 
-             (mk-receive-thread storm-id port transfer-local-fn  daemon 
kill-fn priority socket max-buffer-size thread-id))))
-
-
-(defnk launch-receive-thread!
-  [context socket storm-id receiver-thread-count port transfer-local-fn 
max-buffer-size
-   :daemon true
-   :kill-fn (fn [t] (System/exit 1))
-   :priority Thread/NORM_PRIORITY]
-  (let [max-buffer-size (int max-buffer-size)
-<<<<<<< HEAD
-        local-hostname (memoized-local-hostname)
-        socket (.bind ^IContext context storm-id port)
-=======
->>>>>>> origin/master
-        thread-count (if receiver-thread-count receiver-thread-count 1)
-        vthreads (mk-receive-threads storm-id port transfer-local-fn daemon 
kill-fn priority socket max-buffer-size thread-count)]
-    (fn []
-      (let [kill-socket (.connect ^IContext context storm-id local-hostname 
port)]
-        (log-message "Shutting down receiving-thread: [" storm-id ", " port 
"]")
-        (.send ^IConnection kill-socket
-                  -1 (byte-array []))
-        
-        (.close ^IConnection kill-socket)
-        
-        (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to 
die")
-        
-        (for [thread-id (range thread-count)] 
-             (.join (vthreads thread-id)))
-        
-        (log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
-        ))))

Reply via email to