http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java deleted file mode 100644 index 1960371..0000000 --- a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.cluster; - -import clojure.lang.APersistentMap; -import clojure.lang.IFn; -import java.util.List; -import org.apache.zookeeper.data.ACL; - -/** - * ClusterState provides the API for the pluggable state store used by the - * Storm daemons. Data is stored in path/value format, and the store supports - * listing sub-paths at a given path. - * All data should be available across all nodes with eventual consistency. - * - * IMPORTANT NOTE: - * Heartbeats have different api calls used to interact with them. The root - * path (/) may or may not be the same as the root path for the other api calls. - * - * For example, performing these two calls: - * set_data("/path", data, acls); - * void set_worker_hb("/path", heartbeat, acls); - * may or may not cause a collision in "/path". - * Never use the same paths with the *_hb* methods as you do with the others. - */ -public interface ClusterState { - - /** - * Registers a callback function that gets called when CuratorEvents happen. - * @param callback is a clojure IFn that accepts the type - translated to - * clojure keyword as in zookeeper.clj - and the path: (callback type path) - * @return is an id that can be passed to unregister(...) to unregister the - * callback. - */ - String register(IFn callback); - - /** - * Unregisters a callback function that was registered with register(...). - * @param id is the String id that was returned from register(...). - */ - void unregister(String id); - - /** - * Path will be appended with a monotonically increasing integer, a new node - * will be created there, and data will be put at that node. - * @param path The path that the monotonically increasing integer suffix will - * be added to. - * @param data The data that will be written at the suffixed path's node. - * @param acls The acls to apply to the path. May be null. - * @return The path with the integer suffix appended. - */ - String create_sequential(String path, byte[] data, List<ACL> acls); - - /** - * Creates nodes for path and all its parents. Path elements are separated by - * a "/", as in *nix filesystem notation. Equivalent to mkdir -p in *nix. - * @param path The path to create, along with all its parents. - * @param acls The acls to apply to the path. May be null. - * @return path - */ - String mkdirs(String path, List<ACL> acls); - - /** - * Deletes the node at a given path, and any child nodes that may exist. - * @param path The path to delete - */ - void delete_node(String path); - - /** - * Creates an ephemeral node at path. Ephemeral nodes are destroyed - * by the store when the client disconnects. - * @param path The path where a node will be created. - * @param data The data to be written at the node. - * @param acls The acls to apply to the path. May be null. - */ - void set_ephemeral_node(String path, byte[] data, List<ACL> acls); - - /** - * Gets the 'version' of the node at a path. Optionally sets a watch - * on that node. The version should increase whenever a write happens. - * @param path The path to get the version of. - * @param watch Whether or not to set a watch on the path. Watched paths - * emit events which are consumed by functions registered with the - * register method. Very useful for catching updates to nodes. - * @return The integer version of this node. - */ - Integer get_version(String path, boolean watch); - - /** - * Check if a node exists and optionally set a watch on the path. - * @param path The path to check for the existence of a node. - * @param watch Whether or not to set a watch on the path. Watched paths - * emit events which are consumed by functions registered with the - * register method. Very useful for catching updates to nodes. - * @return Whether or not a node exists at path. - */ - boolean node_exists(String path, boolean watch); - - /** - * Get a list of paths of all the child nodes which exist immediately - * under path. - * @param path The path to look under - * @param watch Whether or not to set a watch on the path. Watched paths - * emit events which are consumed by functions registered with the - * register method. Very useful for catching updates to nodes. - * @return list of string paths under path. - */ - List<String> get_children(String path, boolean watch); - - /** - * Close the connection to the data store. - */ - void close(); - - /** - * Set the value of the node at path to data. - * @param path The path whose node we want to set. - * @param data The data to put in the node. - * @param acls The acls to apply to the path. May be null. - */ - void set_data(String path, byte[] data, List<ACL> acls); - - /** - * Get the data from the node at path - * @param path The path to look under - * @param watch Whether or not to set a watch on the path. Watched paths - * emit events which are consumed by functions registered with the - * register method. Very useful for catching updates to nodes. - * @return The data at the node. - */ - byte[] get_data(String path, boolean watch); - - /** - * Get the data at the node along with its version. Data is returned - * in an APersistentMap with clojure keyword keys :data and :version. - * @param path The path to look under - * @param watch Whether or not to set a watch on the path. Watched paths - * emit events which are consumed by functions registered with the - * register method. Very useful for catching updates to nodes. - * @return An APersistentMap in the form {:data data :version version} - */ - APersistentMap get_data_with_version(String path, boolean watch); - - /** - * Write a worker heartbeat at the path. - * @param path The path whose node we want to set. - * @param data The data to put in the node. - * @param acls The acls to apply to the path. May be null. - */ - void set_worker_hb(String path, byte[] data, List<ACL> acls); - - /** - * Get the heartbeat from the node at path - * @param path The path to look under - * @param watch Whether or not to set a watch on the path. Watched paths - * emit events which are consumed by functions registered with the - * register method. Very useful for catching updates to nodes. - * @return The heartbeat at the node. - */ - byte[] get_worker_hb(String path, boolean watch); - - /** - * Get a list of paths of all the child nodes which exist immediately - * under path. This is similar to get_children, but must be used for - * any nodes - * @param path The path to look under - * @param watch Whether or not to set a watch on the path. Watched paths - * emit events which are consumed by functions registered with the - * register method. Very useful for catching updates to nodes. - * @return list of string paths under path. - */ - List<String> get_worker_hb_children(String path, boolean watch); - - /** - * Deletes the heartbeat at a given path, and any child nodes that may exist. - * @param path The path to delete. - */ - void delete_worker_hb(String path); - - /** - * Add a ClusterStateListener to the connection. - * @param listener A ClusterStateListener to handle changing cluster state - * events. - */ - void add_listener(ClusterStateListener listener); - - /** - * Force consistency on a path. Any writes committed on the path before - * this call will be completely propagated when it returns. - * @param path The path to synchronize. - */ - void sync_path(String path); - - /** - * Allows us to delete the znodes within /storm/blobstore/key_name - * whose znodes start with the corresponding nimbusHostPortInfo - * @param path /storm/blobstore/key_name - * @param nimbusHostPortInfo Contains the host port information of - * a nimbus node. - */ - void delete_node_blobstore(String path, String nimbusHostPortInfo); -}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java deleted file mode 100644 index 5ccde23..0000000 --- a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package backtype.storm.cluster; - -/** - * This class is intended to provide runtime-context to ClusterStateFactory - * implementors, giving information such as what daemon is creating it. - */ -public class ClusterStateContext { - - private DaemonType daemonType; - - public ClusterStateContext() { - daemonType = DaemonType.UNKNOWN; - } - - public ClusterStateContext(DaemonType daemonType) { - this.daemonType = daemonType; - } - - public DaemonType getDaemonType() { - return daemonType; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java deleted file mode 100644 index 1f946ee..0000000 --- a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.cluster; - -import clojure.lang.APersistentMap; -import java.util.List; -import org.apache.zookeeper.data.ACL; - -public interface ClusterStateFactory { - - ClusterState mkState(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context); - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java deleted file mode 100644 index 22693f8..0000000 --- a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.cluster; - -public interface ClusterStateListener { - void stateChanged(ConnectionState newState); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java b/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java deleted file mode 100644 index d6887da..0000000 --- a/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.cluster; - -public enum ConnectionState { - CONNECTED, - RECONNECTED, - LOST -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java b/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java deleted file mode 100644 index 684d0ef..0000000 --- a/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package backtype.storm.cluster; - -public enum DaemonType { - SUPERVISOR, - NIMBUS, - WORKER, - PACEMAKER, - UNKNOWN -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java deleted file mode 100644 index 55590d0..0000000 --- a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.coordination; - -import backtype.storm.coordination.CoordinatedBolt.FinishedCallback; -import backtype.storm.coordination.CoordinatedBolt.TimeoutCallback; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.FailedException; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; -import backtype.storm.utils.Utils; -import java.util.HashMap; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback { - public static final Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class); - - byte[] _boltSer; - Map<Object, IBatchBolt> _openTransactions; - Map _conf; - TopologyContext _context; - BatchOutputCollectorImpl _collector; - - public BatchBoltExecutor(IBatchBolt bolt) { - _boltSer = Utils.javaSerialize(bolt); - } - - @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { - _conf = conf; - _context = context; - _collector = new BatchOutputCollectorImpl(collector); - _openTransactions = new HashMap<>(); - } - - @Override - public void execute(Tuple input) { - Object id = input.getValue(0); - IBatchBolt bolt = getBatchBolt(id); - try { - bolt.execute(input); - _collector.ack(input); - } catch(FailedException e) { - LOG.error("Failed to process tuple in batch", e); - _collector.fail(input); - } - } - - @Override - public void cleanup() { - } - - @Override - public void finishedId(Object id) { - IBatchBolt bolt = getBatchBolt(id); - _openTransactions.remove(id); - bolt.finishBatch(); - } - - @Override - public void timeoutId(Object attempt) { - _openTransactions.remove(attempt); - } - - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - newTransactionalBolt().declareOutputFields(declarer); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return newTransactionalBolt().getComponentConfiguration(); - } - - private IBatchBolt getBatchBolt(Object id) { - IBatchBolt bolt = _openTransactions.get(id); - if(bolt==null) { - bolt = newTransactionalBolt(); - bolt.prepare(_conf, _context, _collector, id); - _openTransactions.put(id, bolt); - } - return bolt; - } - - private IBatchBolt newTransactionalBolt() { - return Utils.javaDeserialize(_boltSer, IBatchBolt.class); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java b/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java deleted file mode 100644 index f5f3457..0000000 --- a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.coordination; - -import backtype.storm.utils.Utils; -import java.util.List; - -public abstract class BatchOutputCollector { - - /** - * Emits a tuple to the default output stream. - */ - public List<Integer> emit(List<Object> tuple) { - return emit(Utils.DEFAULT_STREAM_ID, tuple); - } - - public abstract List<Integer> emit(String streamId, List<Object> tuple); - - /** - * Emits a tuple to the specified task on the default output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. - */ - public void emitDirect(int taskId, List<Object> tuple) { - emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); - } - - public abstract void emitDirect(int taskId, String streamId, List<Object> tuple); - - public abstract void reportError(Throwable error); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java b/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java deleted file mode 100644 index cae7560..0000000 --- a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.coordination; - -import backtype.storm.task.OutputCollector; -import backtype.storm.tuple.Tuple; -import java.util.List; - -public class BatchOutputCollectorImpl extends BatchOutputCollector { - OutputCollector _collector; - - public BatchOutputCollectorImpl(OutputCollector collector) { - _collector = collector; - } - - @Override - public List<Integer> emit(String streamId, List<Object> tuple) { - return _collector.emit(streamId, tuple); - } - - @Override - public void emitDirect(int taskId, String streamId, List<Object> tuple) { - _collector.emitDirect(taskId, streamId, tuple); - } - - @Override - public void reportError(Throwable error) { - _collector.reportError(error); - } - - public void ack(Tuple tup) { - _collector.ack(tup); - } - - public void fail(Tuple tup) { - _collector.fail(tup); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java b/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java deleted file mode 100644 index 1dd1c9f..0000000 --- a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java +++ /dev/null @@ -1,447 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.coordination; - -import backtype.storm.Constants; -import backtype.storm.coordination.CoordinatedBolt.SourceArgs; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.generated.Grouping; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.grouping.PartialKeyGrouping; -import backtype.storm.topology.BaseConfigurationDeclarer; -import backtype.storm.topology.BasicBoltExecutor; -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.IBasicBolt; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class BatchSubtopologyBuilder { - Map<String, Component> _bolts = new HashMap<String, Component>(); - Component _masterBolt; - String _masterId; - - public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) { - Integer p = boltParallelism == null ? null : boltParallelism.intValue(); - _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p); - _masterId = masterBoltId; - } - - public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) { - this(masterBoltId, masterBolt, null); - } - - public BoltDeclarer getMasterDeclarer() { - return new BoltDeclarerImpl(_masterBolt); - } - - public BoltDeclarer setBolt(String id, IBatchBolt bolt) { - return setBolt(id, bolt, null); - } - - public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) { - return setBolt(id, new BatchBoltExecutor(bolt), parallelism); - } - - public BoltDeclarer setBolt(String id, IBasicBolt bolt) { - return setBolt(id, bolt, null); - } - - public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) { - return setBolt(id, new BasicBoltExecutor(bolt), parallelism); - } - - private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) { - Integer p = null; - if(parallelism!=null) p = parallelism.intValue(); - Component component = new Component(bolt, p); - _bolts.put(id, component); - return new BoltDeclarerImpl(component); - } - - public void extendTopology(TopologyBuilder builder) { - BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism); - for(InputDeclaration decl: _masterBolt.declarations) { - decl.declare(declarer); - } - for(Map conf: _masterBolt.componentConfs) { - declarer.addConfigurations(conf); - } - for(String id: _bolts.keySet()) { - Component component = _bolts.get(id); - Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>(); - for(String c: componentBoltSubscriptions(component)) { - SourceArgs source; - if(c.equals(_masterId)) { - source = SourceArgs.single(); - } else { - source = SourceArgs.all(); - } - coordinatedArgs.put(c, source); - } - - - BoltDeclarer input = builder.setBolt(id, - new CoordinatedBolt(component.bolt, - coordinatedArgs, - null), - component.parallelism); - for(Map conf: component.componentConfs) { - input.addConfigurations(conf); - } - for(String c: componentBoltSubscriptions(component)) { - input.directGrouping(c, Constants.COORDINATED_STREAM_ID); - } - for(InputDeclaration d: component.declarations) { - d.declare(input); - } - } - } - - private Set<String> componentBoltSubscriptions(Component component) { - Set<String> ret = new HashSet<String>(); - for(InputDeclaration d: component.declarations) { - ret.add(d.getComponent()); - } - return ret; - } - - private static class Component { - public IRichBolt bolt; - public Integer parallelism; - public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>(); - public List<Map<String, Object>> componentConfs = new ArrayList<>(); - - public Component(IRichBolt bolt, Integer parallelism) { - this.bolt = bolt; - this.parallelism = parallelism; - } - } - - private static interface InputDeclaration { - void declare(InputDeclarer declarer); - String getComponent(); - } - - private static class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer { - Component _component; - - public BoltDeclarerImpl(Component component) { - _component = component; - } - - @Override - public BoltDeclarer fieldsGrouping(final String component, final Fields fields) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.fieldsGrouping(component, fields); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.fieldsGrouping(component, streamId, fields); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer globalGrouping(final String component) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.globalGrouping(component); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer globalGrouping(final String component, final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.globalGrouping(component, streamId); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer shuffleGrouping(final String component) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.shuffleGrouping(component); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer shuffleGrouping(final String component, final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.shuffleGrouping(component, streamId); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer localOrShuffleGrouping(final String component) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.localOrShuffleGrouping(component); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.localOrShuffleGrouping(component, streamId); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer noneGrouping(final String component) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.noneGrouping(component); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer noneGrouping(final String component, final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.noneGrouping(component, streamId); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer allGrouping(final String component) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.allGrouping(component); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer allGrouping(final String component, final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.allGrouping(component, streamId); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer directGrouping(final String component) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.directGrouping(component); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer directGrouping(final String component, final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.directGrouping(component, streamId); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { - return customGrouping(componentId, new PartialKeyGrouping(fields)); - } - - @Override - public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { - return customGrouping(componentId, streamId, new PartialKeyGrouping(fields)); - } - - @Override - public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.customGrouping(component, grouping); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.customGrouping(component, streamId, grouping); - } - - @Override - public String getComponent() { - return component; - } - }); - return this; - } - - @Override - public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(InputDeclarer declarer) { - declarer.grouping(stream, grouping); - } - - @Override - public String getComponent() { - return stream.get_componentId(); - } - }); - return this; - } - - private void addDeclaration(InputDeclaration declaration) { - _component.declarations.add(declaration); - } - - @Override - public BoltDeclarer addConfigurations(Map<String, Object> conf) { - _component.componentConfs.add(conf); - return this; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java b/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java deleted file mode 100644 index c3a428c..0000000 --- a/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java +++ /dev/null @@ -1,382 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.coordination; - -import backtype.storm.topology.FailedException; -import java.util.Map.Entry; -import backtype.storm.tuple.Values; -import backtype.storm.generated.GlobalStreamId; -import java.util.Collection; -import backtype.storm.Constants; -import backtype.storm.generated.Grouping; -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.utils.TimeCacheMap; -import backtype.storm.utils.Utils; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static backtype.storm.utils.Utils.get; - -/** - * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused - * in the case of retries. - */ -public class CoordinatedBolt implements IRichBolt { - public static final Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class); - - public static interface FinishedCallback { - void finishedId(Object id); - } - - public static interface TimeoutCallback { - void timeoutId(Object id); - } - - - public static class SourceArgs implements Serializable { - public boolean singleCount; - - protected SourceArgs(boolean singleCount) { - this.singleCount = singleCount; - } - - public static SourceArgs single() { - return new SourceArgs(true); - } - - public static SourceArgs all() { - return new SourceArgs(false); - } - - @Override - public String toString() { - return "<Single: " + singleCount + ">"; - } - } - - public class CoordinatedOutputCollector implements IOutputCollector { - IOutputCollector _delegate; - - public CoordinatedOutputCollector(IOutputCollector delegate) { - _delegate = delegate; - } - - public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) { - List<Integer> tasks = _delegate.emit(stream, anchors, tuple); - updateTaskCounts(tuple.get(0), tasks); - return tasks; - } - - public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) { - updateTaskCounts(tuple.get(0), Arrays.asList(task)); - _delegate.emitDirect(task, stream, anchors, tuple); - } - - public void ack(Tuple tuple) { - Object id = tuple.getValue(0); - synchronized(_tracked) { - TrackingInfo track = _tracked.get(id); - if (track != null) - track.receivedTuples++; - } - boolean failed = checkFinishId(tuple, TupleType.REGULAR); - if(failed) { - _delegate.fail(tuple); - } else { - _delegate.ack(tuple); - } - } - - public void fail(Tuple tuple) { - Object id = tuple.getValue(0); - synchronized(_tracked) { - TrackingInfo track = _tracked.get(id); - if (track != null) - track.failed = true; - } - checkFinishId(tuple, TupleType.REGULAR); - _delegate.fail(tuple); - } - - public void reportError(Throwable error) { - _delegate.reportError(error); - } - - - private void updateTaskCounts(Object id, List<Integer> tasks) { - synchronized(_tracked) { - TrackingInfo track = _tracked.get(id); - if (track != null) { - Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples; - for(Integer task: tasks) { - int newCount = get(taskEmittedTuples, task, 0) + 1; - taskEmittedTuples.put(task, newCount); - } - } - } - } - } - - private Map<String, SourceArgs> _sourceArgs; - private IdStreamSpec _idStreamSpec; - private IRichBolt _delegate; - private Integer _numSourceReports; - private List<Integer> _countOutTasks = new ArrayList<>(); - private OutputCollector _collector; - private TimeCacheMap<Object, TrackingInfo> _tracked; - - public static class TrackingInfo { - int reportCount = 0; - int expectedTupleCount = 0; - int receivedTuples = 0; - boolean failed = false; - Map<Integer, Integer> taskEmittedTuples = new HashMap<>(); - boolean receivedId = false; - boolean finished = false; - List<Tuple> ackTuples = new ArrayList<>(); - - @Override - public String toString() { - return "reportCount: " + reportCount + "\n" + - "expectedTupleCount: " + expectedTupleCount + "\n" + - "receivedTuples: " + receivedTuples + "\n" + - "failed: " + failed + "\n" + - taskEmittedTuples.toString(); - } - } - - - public static class IdStreamSpec implements Serializable { - GlobalStreamId _id; - - public GlobalStreamId getGlobalStreamId() { - return _id; - } - - public static IdStreamSpec makeDetectSpec(String component, String stream) { - return new IdStreamSpec(component, stream); - } - - protected IdStreamSpec(String component, String stream) { - _id = new GlobalStreamId(component, stream); - } - } - - public CoordinatedBolt(IRichBolt delegate) { - this(delegate, null, null); - } - - public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) { - this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec); - } - - public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) { - _sourceArgs = sourceArgs; - if(_sourceArgs==null) _sourceArgs = new HashMap<>(); - _delegate = delegate; - _idStreamSpec = idStreamSpec; - } - - public void prepare(Map config, TopologyContext context, OutputCollector collector) { - TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null; - if(_delegate instanceof TimeoutCallback) { - callback = new TimeoutItems(); - } - _tracked = new TimeCacheMap<>(context.maxTopologyMessageTimeout(), callback); - _collector = collector; - _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector))); - for(String component: Utils.get(context.getThisTargets(), - Constants.COORDINATED_STREAM_ID, - new HashMap<String, Grouping>()) - .keySet()) { - for(Integer task: context.getComponentTasks(component)) { - _countOutTasks.add(task); - } - } - if(!_sourceArgs.isEmpty()) { - _numSourceReports = 0; - for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) { - if(entry.getValue().singleCount) { - _numSourceReports+=1; - } else { - _numSourceReports+=context.getComponentTasks(entry.getKey()).size(); - } - } - } - } - - private boolean checkFinishId(Tuple tup, TupleType type) { - Object id = tup.getValue(0); - boolean failed = false; - - synchronized(_tracked) { - TrackingInfo track = _tracked.get(id); - try { - if(track!=null) { - boolean delayed = false; - if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) { - track.ackTuples.add(tup); - delayed = true; - } - if(track.failed) { - failed = true; - for(Tuple t: track.ackTuples) { - _collector.fail(t); - } - _tracked.remove(id); - } else if(track.receivedId - && (_sourceArgs.isEmpty() || - track.reportCount==_numSourceReports && - track.expectedTupleCount == track.receivedTuples)){ - if(_delegate instanceof FinishedCallback) { - ((FinishedCallback)_delegate).finishedId(id); - } - if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) { - throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible"); - } - Iterator<Integer> outTasks = _countOutTasks.iterator(); - while(outTasks.hasNext()) { - int task = outTasks.next(); - int numTuples = get(track.taskEmittedTuples, task, 0); - _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples)); - } - for(Tuple t: track.ackTuples) { - _collector.ack(t); - } - track.finished = true; - _tracked.remove(id); - } - if(!delayed && type!=TupleType.REGULAR) { - if(track.failed) { - _collector.fail(tup); - } else { - _collector.ack(tup); - } - } - } else { - if(type!=TupleType.REGULAR) _collector.fail(tup); - } - } catch(FailedException e) { - LOG.error("Failed to finish batch", e); - for(Tuple t: track.ackTuples) { - _collector.fail(t); - } - _tracked.remove(id); - failed = true; - } - } - return failed; - } - - public void execute(Tuple tuple) { - Object id = tuple.getValue(0); - TrackingInfo track; - TupleType type = getTupleType(tuple); - synchronized(_tracked) { - track = _tracked.get(id); - if(track==null) { - track = new TrackingInfo(); - if(_idStreamSpec==null) track.receivedId = true; - _tracked.put(id, track); - } - } - - if(type==TupleType.ID) { - synchronized(_tracked) { - track.receivedId = true; - } - checkFinishId(tuple, type); - } else if(type==TupleType.COORD) { - int count = (Integer) tuple.getValue(1); - synchronized(_tracked) { - track.reportCount++; - track.expectedTupleCount+=count; - } - checkFinishId(tuple, type); - } else { - synchronized(_tracked) { - _delegate.execute(tuple); - } - } - } - - public void cleanup() { - _delegate.cleanup(); - _tracked.cleanup(); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - _delegate.declareOutputFields(declarer); - declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return _delegate.getComponentConfiguration(); - } - - private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) { - Map<String, SourceArgs> ret = new HashMap<>(); - ret.put(sourceComponent, sourceArgs); - return ret; - } - - private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> { - @Override - public void expire(Object id, TrackingInfo val) { - synchronized(_tracked) { - // the combination of the lock and the finished flag ensure that - // an id is never timed out if it has been finished - val.failed = true; - if(!val.finished) { - ((TimeoutCallback) _delegate).timeoutId(id); - } - } - } - } - - private TupleType getTupleType(Tuple tuple) { - if(_idStreamSpec!=null - && tuple.getSourceGlobalStreamId().equals(_idStreamSpec._id)) { - return TupleType.ID; - } else if(!_sourceArgs.isEmpty() - && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) { - return TupleType.COORD; - } else { - return TupleType.REGULAR; - } - } - - static enum TupleType { - REGULAR, - ID, - COORD - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java b/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java deleted file mode 100644 index ee5d9bd..0000000 --- a/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.coordination; - -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IComponent; -import backtype.storm.tuple.Tuple; -import java.io.Serializable; -import java.util.Map; - -public interface IBatchBolt<T> extends Serializable, IComponent { - void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id); - void execute(Tuple tuple); - void finishBatch(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java b/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java deleted file mode 100644 index 3a0dfbb..0000000 --- a/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package backtype.storm.daemon; - -import backtype.storm.utils.Utils; - -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.InputStream; - -/** - * Main executable to load and run a jar transformer - */ -public class ClientJarTransformerRunner { - public static void main(String [] args) throws IOException { - JarTransformer transformer = Utils.jarTransformer(args[0]); - InputStream in = new FileInputStream(args[1]); - OutputStream out = new FileOutputStream(args[2]); - transformer.transform(in, out); - in.close(); - out.close(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java b/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java deleted file mode 100644 index 67b6527..0000000 --- a/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.daemon; - -import java.io.IOException; -import java.io.File; -import java.io.FileInputStream; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.nio.file.Path; -import java.nio.file.DirectoryStream; -import java.util.Stack; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.Comparator; -import java.util.PriorityQueue; -import java.util.regex.Pattern; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provide methods to help Logviewer to clean up - * files in directories and to get a list of files without - * worrying about excessive memory usage. - * - */ -public class DirectoryCleaner { - private static final Logger LOG = LoggerFactory.getLogger(DirectoryCleaner.class); - // used to recognize the pattern of active log files, we may remove the "current" from this list - private static final Pattern ACTIVE_LOG_PATTERN = Pattern.compile(".*\\.(log|err|out|current|yaml|pid)$"); - // used to recognize the pattern of some meta files in a worker log directory - private static final Pattern META_LOG_PATTERN= Pattern.compile(".*\\.(yaml|pid)$"); - - // not defining this as static is to allow for mocking in tests - public DirectoryStream<Path> getStreamForDirectory(File dir) throws IOException { - DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath()); - return stream; - } - - /** - * If totalSize of files exceeds the either the per-worker quota or global quota, - * Logviewer deletes oldest inactive log files in a worker directory or in all worker dirs. - * We use the parameter for_per_dir to switch between the two deletion modes. - * @param dirs the list of directories to be scanned for deletion - * @param quota the per-dir quota or the total quota for the all directories - * @param for_per_dir if true, deletion happens for a single dir; otherwise, for all directories globally - * @param active_dirs only for global deletion, we want to skip the active logs in active_dirs - * @return number of files deleted - */ - public int deleteOldestWhileTooLarge(List<File> dirs, - long quota, boolean for_per_dir, Set<String> active_dirs) throws IOException { - final int PQ_SIZE = 1024; // max number of files to delete for every round - final int MAX_ROUNDS = 512; // max rounds of scanning the dirs - long totalSize = 0; - int deletedFiles = 0; - - for (File dir : dirs) { - try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) { - for (Path path : stream) { - File file = path.toFile(); - totalSize += file.length(); - } - } - } - long toDeleteSize = totalSize - quota; - if (toDeleteSize <= 0) { - return deletedFiles; - } - - Comparator<File> comparator = new Comparator<File>() { - public int compare(File f1, File f2) { - if (f1.lastModified() > f2.lastModified()) { - return -1; - } else { - return 1; - } - } - }; - // the oldest pq_size files in this directory will be placed in PQ, with the newest at the root - PriorityQueue<File> pq = new PriorityQueue<File>(PQ_SIZE, comparator); - int round = 0; - while (toDeleteSize > 0) { - LOG.debug("To delete size is {}, start a new round of deletion, round: {}", toDeleteSize, round); - for (File dir : dirs) { - try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) { - for (Path path : stream) { - File file = path.toFile(); - if (for_per_dir) { - if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) { - continue; // skip active log files - } - } else { // for global cleanup - if (active_dirs.contains(dir.getCanonicalPath())) { // for an active worker's dir, make sure for the last "/" - if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) { - continue; // skip active log files - } - } else { - if (META_LOG_PATTERN.matcher(file.getName()).matches()) { - continue; // skip yaml and pid files - } - } - } - if (pq.size() < PQ_SIZE) { - pq.offer(file); - } else { - if (file.lastModified() < pq.peek().lastModified()) { - pq.poll(); - pq.offer(file); - } - } - } - } - } - // need to reverse the order of elements in PQ to delete files from oldest to newest - Stack<File> stack = new Stack<File>(); - while (!pq.isEmpty()) { - File file = pq.poll(); - stack.push(file); - } - while (!stack.isEmpty() && toDeleteSize > 0) { - File file = stack.pop(); - toDeleteSize -= file.length(); - LOG.info("Delete file: {}, size: {}, lastModified: {}", file.getName(), file.length(), file.lastModified()); - file.delete(); - deletedFiles++; - } - pq.clear(); - round++; - if (round >= MAX_ROUNDS) { - if (for_per_dir) { - LOG.warn("Reach the MAX_ROUNDS: {} during per-dir deletion, you may have too many files in " + - "a single directory : {}, will delete the rest files in next interval.", - MAX_ROUNDS, dirs.get(0).getCanonicalPath()); - } else { - LOG.warn("Reach the MAX_ROUNDS: {} during global deletion, you may have too many files, " + - "will delete the rest files in next interval.", MAX_ROUNDS); - } - break; - } - } - return deletedFiles; - } - - // Note that to avoid memory problem, we only return the first 1024 files in a directory - public static List<File> getFilesForDir(File dir) throws IOException { - List<File> files = new ArrayList<File>(); - final int MAX_NUM = 1024; - - try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath())) { - for (Path path : stream) { - files.add(path.toFile()); - if (files.size() >= MAX_NUM) { - break; - } - } - } - return files; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java b/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java deleted file mode 100644 index 914710a..0000000 --- a/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package backtype.storm.daemon; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * A plugin that can be used to transform a jar file in nimbus before it - * is used by a topology. - */ -public interface JarTransformer { - public void transform(InputStream input, OutputStream output) throws IOException; -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java b/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java deleted file mode 100644 index b1d8ddf..0000000 --- a/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.daemon; - -public interface Shutdownable { - public void shutdown(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java deleted file mode 100644 index 78e8d9b..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.drpc; - -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - -import backtype.storm.generated.DRPCRequest; -import backtype.storm.generated.DistributedRPCInvocations; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.security.auth.ThriftClient; -import backtype.storm.security.auth.ThriftConnectionType; -import org.apache.thrift.transport.TTransportException; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface { - public static final Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class); - private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<>(); - private String host; - private int port; - - public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException { - super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null); - this.host = host; - this.port = port; - client.set(new DistributedRPCInvocations.Client(_protocol)); - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - public void reconnectClient() throws TException { - if (client.get() == null) { - reconnect(); - client.set(new DistributedRPCInvocations.Client(_protocol)); - } - } - - public boolean isConnected() { - return client.get() != null; - } - - public void result(String id, String result) throws TException, AuthorizationException { - DistributedRPCInvocations.Client c = client.get(); - try { - if (c == null) { - throw new TException("Client is not connected..."); - } - c.result(id, result); - } catch(AuthorizationException aze) { - throw aze; - } catch(TException e) { - client.compareAndSet(c, null); - throw e; - } - } - - public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException { - DistributedRPCInvocations.Client c = client.get(); - try { - if (c == null) { - throw new TException("Client is not connected..."); - } - return c.fetchRequest(func); - } catch(AuthorizationException aze) { - throw aze; - } catch(TException e) { - client.compareAndSet(c, null); - throw e; - } - } - - public void failRequest(String id) throws TException, AuthorizationException { - DistributedRPCInvocations.Client c = client.get(); - try { - if (c == null) { - throw new TException("Client is not connected..."); - } - c.failRequest(id); - } catch(AuthorizationException aze) { - throw aze; - } catch(TException e) { - client.compareAndSet(c, null); - throw e; - } - } - - public DistributedRPCInvocations.Client getClient() { - return client.get(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java deleted file mode 100644 index 4ed15c0..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.drpc; - -import backtype.storm.Config; -import backtype.storm.ILocalDRPC; -import backtype.storm.generated.DRPCRequest; -import backtype.storm.generated.DistributedRPCInvocations; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.ExtendedThreadPoolExecutor; -import backtype.storm.utils.ServiceRegistry; -import backtype.storm.utils.Utils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.Callable; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.thrift.TException; -import org.json.simple.JSONValue; - -public class DRPCSpout extends BaseRichSpout { - //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS - static final long serialVersionUID = 2387848310969237877L; - - public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class); - - SpoutOutputCollector _collector; - List<DRPCInvocationsClient> _clients = new ArrayList<>(); - transient LinkedList<Future<Void>> _futures = null; - transient ExecutorService _backround = null; - String _function; - String _local_drpc_id = null; - - private static class DRPCMessageId { - String id; - int index; - - public DRPCMessageId(String id, int index) { - this.id = id; - this.index = index; - } - } - - - public DRPCSpout(String function) { - _function = function; - } - - public DRPCSpout(String function, ILocalDRPC drpc) { - _function = function; - _local_drpc_id = drpc.getServiceId(); - } - - public String get_function() { - return _function; - } - - private class Adder implements Callable<Void> { - private String server; - private int port; - private Map conf; - - public Adder(String server, int port, Map conf) { - this.server = server; - this.port = port; - this.conf = conf; - } - - @Override - public Void call() throws Exception { - DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port); - synchronized (_clients) { - _clients.add(c); - } - return null; - } - } - - private void reconnect(final DRPCInvocationsClient c) { - _futures.add(_backround.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - c.reconnectClient(); - return null; - } - })); - } - - private void checkFutures() { - Iterator<Future<Void>> i = _futures.iterator(); - while (i.hasNext()) { - Future<Void> f = i.next(); - if (f.isDone()) { - i.remove(); - } - try { - f.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - if(_local_drpc_id==null) { - _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE, - 60L, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>()); - _futures = new LinkedList<>(); - - int numTasks = context.getComponentTasks(context.getThisComponentId()).size(); - int index = context.getThisTaskIndex(); - - int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT)); - List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS); - if(servers == null || servers.isEmpty()) { - throw new RuntimeException("No DRPC servers configured for topology"); - } - - if (numTasks < servers.size()) { - for (String s: servers) { - _futures.add(_backround.submit(new Adder(s, port, conf))); - } - } else { - int i = index % servers.size(); - _futures.add(_backround.submit(new Adder(servers.get(i), port, conf))); - } - } - - } - - @Override - public void close() { - for(DRPCInvocationsClient client: _clients) { - client.close(); - } - } - - @Override - public void nextTuple() { - boolean gotRequest = false; - if(_local_drpc_id==null) { - int size; - synchronized (_clients) { - size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end - } - for(int i=0; i<size; i++) { - DRPCInvocationsClient client; - synchronized (_clients) { - client = _clients.get(i); - } - if (!client.isConnected()) { - continue; - } - try { - DRPCRequest req = client.fetchRequest(_function); - if(req.get_request_id().length() > 0) { - Map returnInfo = new HashMap(); - returnInfo.put("id", req.get_request_id()); - returnInfo.put("host", client.getHost()); - returnInfo.put("port", client.getPort()); - gotRequest = true; - _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i)); - break; - } - } catch (AuthorizationException aze) { - reconnect(client); - LOG.error("Not authorized to fetch DRPC result from DRPC server", aze); - } catch (TException e) { - reconnect(client); - LOG.error("Failed to fetch DRPC result from DRPC server", e); - } catch (Exception e) { - LOG.error("Failed to fetch DRPC result from DRPC server", e); - } - } - checkFutures(); - } else { - DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id); - if(drpc!=null) { // can happen during shutdown of drpc while topology is still up - try { - DRPCRequest req = drpc.fetchRequest(_function); - if(req.get_request_id().length() > 0) { - Map returnInfo = new HashMap(); - returnInfo.put("id", req.get_request_id()); - returnInfo.put("host", _local_drpc_id); - returnInfo.put("port", 0); - gotRequest = true; - _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 0)); - } - } catch (AuthorizationException aze) { - throw new RuntimeException(aze); - } catch (TException e) { - throw new RuntimeException(e); - } - } - } - if(!gotRequest) { - Utils.sleep(1); - } - } - - @Override - public void ack(Object msgId) { - } - - @Override - public void fail(Object msgId) { - DRPCMessageId did = (DRPCMessageId) msgId; - DistributedRPCInvocations.Iface client; - - if(_local_drpc_id == null) { - client = _clients.get(did.index); - } else { - client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id); - } - try { - client.failRequest(did.id); - } catch (AuthorizationException aze) { - LOG.error("Not authorized to failREquest from DRPC server", aze); - } catch (TException e) { - LOG.error("Failed to fail request", e); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("args", "return-info")); - } -}
