Repository: storm Updated Branches: refs/heads/master 9ddd29ff2 -> 12ceb0975
STORM-1248: port backtype.storm.messaging.loader to java Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/36aa7b07 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/36aa7b07 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/36aa7b07 Branch: refs/heads/master Commit: 36aa7b07344fe6b0caf46b3592d1754891ff9597 Parents: 3339dae Author: Abhishek Agarwal <[email protected]> Authored: Fri Feb 12 00:33:56 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Fri Feb 12 00:33:56 2016 +0530 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/worker.clj | 13 ++++---- .../clj/org/apache/storm/messaging/loader.clj | 34 -------------------- .../clj/org/apache/storm/messaging/local.clj | 23 ------------- storm-core/src/clj/org/apache/storm/testing.clj | 8 +++-- 4 files changed, 11 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/36aa7b07/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 48934f6..0a2a6d6 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -21,14 +21,13 @@ (:require [org.apache.storm.daemon [executor :as executor]]) (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]]) (:require [clojure.set :as set]) - (:require [org.apache.storm.messaging.loader :as msg-loader]) (:import [java.util.concurrent Executors] [org.apache.storm.hooks IWorkerHook BaseWorkerHook]) (:import [java.util ArrayList HashMap]) (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue]) (:import [org.apache.storm.grouping LoadMapping]) (:import [org.apache.storm.messaging TransportFactory]) - (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) + (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback]) (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.serialization KryoTupleSerializer]) (:import [org.apache.storm.generated StormTopology]) @@ -461,11 +460,11 @@ ))))) (defn register-callbacks [worker] - (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker)) - (msg-loader/register-callback (:transfer-local-fn worker) - (:receiver worker) - (:storm-conf worker) - (worker-context worker))) + (let [transfer-local-fn (:transfer-local-fn worker) ^IConnection socket (:receiver worker)] + (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker)) + (.registerRecv socket (DeserializingConnectionCallback. (:storm-conf worker) + (worker-context worker) + transfer-local-fn)))) (defn- close-resources [worker] (let [dr (:default-shared-resources worker)] http://git-wip-us.apache.org/repos/asf/storm/blob/36aa7b07/storm-core/src/clj/org/apache/storm/messaging/loader.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/messaging/loader.clj b/storm-core/src/clj/org/apache/storm/messaging/loader.clj deleted file mode 100644 index b190ab0..0000000 --- a/storm-core/src/clj/org/apache/storm/messaging/loader.clj +++ /dev/null @@ -1,34 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.messaging.loader - (:import [org.apache.storm.messaging IConnection DeserializingConnectionCallback]) - (:require [org.apache.storm.messaging [local :as local]])) - -(defn mk-local-context [] - (local/mk-context)) - -(defn- mk-connection-callback - "make an IConnectionCallback" - [transfer-local-fn storm-conf worker-context] - (DeserializingConnectionCallback. storm-conf - worker-context - (fn [batch] - (transfer-local-fn batch)))) - -(defn register-callback - "register the local-transfer-fn with the server" - [transfer-local-fn ^IConnection socket storm-conf worker-context] - (.registerRecv socket (mk-connection-callback transfer-local-fn storm-conf worker-context))) http://git-wip-us.apache.org/repos/asf/storm/blob/36aa7b07/storm-core/src/clj/org/apache/storm/messaging/local.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/messaging/local.clj b/storm-core/src/clj/org/apache/storm/messaging/local.clj deleted file mode 100644 index 32fbb34..0000000 --- a/storm-core/src/clj/org/apache/storm/messaging/local.clj +++ /dev/null @@ -1,23 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.messaging.local - (:import [org.apache.storm.messaging IContext]) - (:import [org.apache.storm.messaging.local Context])) - -(defn mk-context [] - (let [context (Context.)] - (.prepare ^IContext context nil) - context)) http://git-wip-us.apache.org/repos/asf/storm/blob/36aa7b07/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index cc78659..12828d6 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -44,9 +44,9 @@ (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor]) (:import [org.apache.storm.tuple Tuple]) (:import [org.apache.storm.generated StormTopology]) - (:import [org.apache.storm.task TopologyContext]) + (:import [org.apache.storm.task TopologyContext] + (org.apache.storm.messaging IContext)) (:require [org.apache.storm [zookeeper :as zk]]) - (:require [org.apache.storm.messaging.loader :as msg-loader]) (:require [org.apache.storm.daemon.acker :as acker]) (:use [org.apache.storm cluster util thrift config log local-state])) @@ -117,7 +117,9 @@ (defn mk-shared-context [conf] (if-not (conf STORM-LOCAL-MODE-ZMQ) - (msg-loader/mk-local-context))) + (let [context (org.apache.storm.messaging.local.Context.)] + (.prepare ^IContext context nil) + context))) (defn start-nimbus-daemon [conf nimbus] (let [server (ThriftServer. conf (Nimbus$Processor. nimbus)
