http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java b/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java deleted file mode 100644 index 2c416ed..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.drpc; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class JoinResult extends BaseRichBolt { - public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class); - - String returnComponent; - Map<Object, Tuple> returns = new HashMap<>(); - Map<Object, Tuple> results = new HashMap<>(); - OutputCollector _collector; - - public JoinResult(String returnComponent) { - this.returnComponent = returnComponent; - } - - public void prepare(Map map, TopologyContext context, OutputCollector collector) { - _collector = collector; - } - - public void execute(Tuple tuple) { - Object requestId = tuple.getValue(0); - if(tuple.getSourceComponent().equals(returnComponent)) { - returns.put(requestId, tuple); - } else { - results.put(requestId, tuple); - } - - if(returns.containsKey(requestId) && results.containsKey(requestId)) { - Tuple result = results.remove(requestId); - Tuple returner = returns.remove(requestId); - LOG.debug(result.getValue(1).toString()); - List<Tuple> anchors = new ArrayList<>(); - anchors.add(result); - anchors.add(returner); - _collector.emit(anchors, new Values(""+result.getValue(1), returner.getValue(1))); - _collector.ack(result); - _collector.ack(returner); - } - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("result", "return-info")); - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java b/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java deleted file mode 100644 index 113163d..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.drpc; - -import backtype.storm.coordination.CoordinatedBolt.FinishedCallback; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicBoltExecutor; -import backtype.storm.topology.IBasicBolt; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; -import backtype.storm.utils.KeyedRoundRobinQueue; -import java.util.HashMap; -import java.util.Map; - - -public class KeyedFairBolt implements IRichBolt, FinishedCallback { - IRichBolt _delegate; - KeyedRoundRobinQueue<Tuple> _rrQueue; - Thread _executor; - FinishedCallback _callback; - - public KeyedFairBolt(IRichBolt delegate) { - _delegate = delegate; - } - - public KeyedFairBolt(IBasicBolt delegate) { - this(new BasicBoltExecutor(delegate)); - } - - - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - if(_delegate instanceof FinishedCallback) { - _callback = (FinishedCallback) _delegate; - } - _delegate.prepare(stormConf, context, collector); - _rrQueue = new KeyedRoundRobinQueue<Tuple>(); - _executor = new Thread(new Runnable() { - public void run() { - try { - while(true) { - _delegate.execute(_rrQueue.take()); - } - } catch (InterruptedException e) { - - } - } - }); - _executor.setDaemon(true); - _executor.start(); - } - - public void execute(Tuple input) { - Object key = input.getValue(0); - _rrQueue.add(key, input); - } - - public void cleanup() { - _executor.interrupt(); - _delegate.cleanup(); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - _delegate.declareOutputFields(declarer); - } - - public void finishedId(Object id) { - if(_callback!=null) { - _callback.finishedId(id); - } - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return new HashMap<String, Object>(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java deleted file mode 100644 index d03075e..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.drpc; - -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.topology.ComponentConfigurationDeclarer; -import backtype.storm.tuple.Fields; - -public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> { - public LinearDRPCInputDeclarer fieldsGrouping(Fields fields); - public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields); - - public LinearDRPCInputDeclarer globalGrouping(); - public LinearDRPCInputDeclarer globalGrouping(String streamId); - - public LinearDRPCInputDeclarer shuffleGrouping(); - public LinearDRPCInputDeclarer shuffleGrouping(String streamId); - - public LinearDRPCInputDeclarer localOrShuffleGrouping(); - public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId); - - public LinearDRPCInputDeclarer noneGrouping(); - public LinearDRPCInputDeclarer noneGrouping(String streamId); - - public LinearDRPCInputDeclarer allGrouping(); - public LinearDRPCInputDeclarer allGrouping(String streamId); - - public LinearDRPCInputDeclarer directGrouping(); - public LinearDRPCInputDeclarer directGrouping(String streamId); - - public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields); - public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields); - - public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping); - public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping); - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java deleted file mode 100644 index ee82091..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java +++ /dev/null @@ -1,393 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.drpc; - -import backtype.storm.Constants; -import backtype.storm.ILocalDRPC; -import backtype.storm.coordination.BatchBoltExecutor; -import backtype.storm.coordination.CoordinatedBolt; -import backtype.storm.coordination.CoordinatedBolt.FinishedCallback; -import backtype.storm.coordination.CoordinatedBolt.IdStreamSpec; -import backtype.storm.coordination.CoordinatedBolt.SourceArgs; -import backtype.storm.coordination.IBatchBolt; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.StreamInfo; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.grouping.PartialKeyGrouping; -import backtype.storm.topology.BaseConfigurationDeclarer; -import backtype.storm.topology.BasicBoltExecutor; -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.IBasicBolt; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsGetter; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -// Trident subsumes the functionality provided by this class, so it's deprecated -@Deprecated -public class LinearDRPCTopologyBuilder { - String _function; - List<Component> _components = new ArrayList<Component>(); - - - public LinearDRPCTopologyBuilder(String function) { - _function = function; - } - - public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) { - return addBolt(new BatchBoltExecutor(bolt), parallelism); - } - - public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) { - return addBolt(bolt, 1); - } - - @Deprecated - public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) { - if(parallelism==null) parallelism = 1; - Component component = new Component(bolt, parallelism.intValue()); - _components.add(component); - return new InputDeclarerImpl(component); - } - - @Deprecated - public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) { - return addBolt(bolt, null); - } - - public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) { - return addBolt(new BasicBoltExecutor(bolt), parallelism); - } - - public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) { - return addBolt(bolt, null); - } - - public StormTopology createLocalTopology(ILocalDRPC drpc) { - return createTopology(new DRPCSpout(_function, drpc)); - } - - public StormTopology createRemoteTopology() { - return createTopology(new DRPCSpout(_function)); - } - - - private StormTopology createTopology(DRPCSpout spout) { - final String SPOUT_ID = "spout"; - final String PREPARE_ID = "prepare-request"; - - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout(SPOUT_ID, spout); - builder.setBolt(PREPARE_ID, new PrepareRequest()) - .noneGrouping(SPOUT_ID); - int i=0; - for(; i<_components.size();i++) { - Component component = _components.get(i); - - Map<String, SourceArgs> source = new HashMap<String, SourceArgs>(); - if (i==1) { - source.put(boltId(i-1), SourceArgs.single()); - } else if (i>=2) { - source.put(boltId(i-1), SourceArgs.all()); - } - IdStreamSpec idSpec = null; - if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) { - idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM); - } - BoltDeclarer declarer = builder.setBolt( - boltId(i), - new CoordinatedBolt(component.bolt, source, idSpec), - component.parallelism); - - for(Map<String, Object> conf: component.componentConfs) { - declarer.addConfigurations(conf); - } - - if(idSpec!=null) { - declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request")); - } - if(i==0 && component.declarations.isEmpty()) { - declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM); - } else { - String prevId; - if(i==0) { - prevId = PREPARE_ID; - } else { - prevId = boltId(i-1); - } - for(InputDeclaration declaration: component.declarations) { - declaration.declare(prevId, declarer); - } - } - if(i>0) { - declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); - } - } - - IRichBolt lastBolt = _components.get(_components.size()-1).bolt; - OutputFieldsGetter getter = new OutputFieldsGetter(); - lastBolt.declareOutputFields(getter); - Map<String, StreamInfo> streams = getter.getFieldsDeclaration(); - if(streams.size()!=1) { - throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology"); - } - String outputStream = streams.keySet().iterator().next(); - List<String> fields = streams.get(outputStream).get_output_fields(); - if(fields.size()!=2) { - throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result."); - } - - builder.setBolt(boltId(i), new JoinResult(PREPARE_ID)) - .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0))) - .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request")); - i++; - builder.setBolt(boltId(i), new ReturnResults()) - .noneGrouping(boltId(i-1)); - return builder.createTopology(); - } - - private static String boltId(int index) { - return "bolt" + index; - } - - private static class Component { - public IRichBolt bolt; - public int parallelism; - public List<Map<String, Object>> componentConfs = new ArrayList<>(); - public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>(); - - public Component(IRichBolt bolt, int parallelism) { - this.bolt = bolt; - this.parallelism = parallelism; - } - } - - private static interface InputDeclaration { - public void declare(String prevComponent, InputDeclarer declarer); - } - - private static class InputDeclarerImpl extends BaseConfigurationDeclarer<LinearDRPCInputDeclarer> implements LinearDRPCInputDeclarer { - Component _component; - - public InputDeclarerImpl(Component component) { - _component = component; - } - - @Override - public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.fieldsGrouping(prevComponent, fields); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer fieldsGrouping(final String streamId, final Fields fields) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.fieldsGrouping(prevComponent, streamId, fields); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer globalGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.globalGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer globalGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.globalGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer shuffleGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.shuffleGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer shuffleGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.shuffleGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer localOrShuffleGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.localOrShuffleGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer localOrShuffleGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.localOrShuffleGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer noneGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.noneGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer noneGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.noneGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer allGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.allGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer allGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.allGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer directGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.directGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer directGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.directGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields) { - return customGrouping(new PartialKeyGrouping(fields)); - } - - @Override - public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields) { - return customGrouping(streamId, new PartialKeyGrouping(fields)); - } - - @Override - public LinearDRPCInputDeclarer customGrouping(final CustomStreamGrouping grouping) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.customGrouping(prevComponent, grouping); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer customGrouping(final String streamId, final CustomStreamGrouping grouping) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.customGrouping(prevComponent, streamId, grouping); - } - }); - return this; - } - - private void addDeclaration(InputDeclaration declaration) { - _component.declarations.add(declaration); - } - - @Override - public LinearDRPCInputDeclarer addConfigurations(Map<String, Object> conf) { - _component.componentConfs.add(conf); - return this; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java b/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java deleted file mode 100644 index bd32169..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.drpc; - -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import java.util.Map; -import java.util.Random; -import backtype.storm.utils.Utils; - - -public class PrepareRequest extends BaseBasicBolt { - public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID; - public static final String RETURN_STREAM = "ret"; - public static final String ID_STREAM = "id"; - - Random rand; - - @Override - public void prepare(Map map, TopologyContext context) { - rand = new Random(); - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String args = tuple.getString(0); - String returnInfo = tuple.getString(1); - long requestId = rand.nextLong(); - collector.emit(ARGS_STREAM, new Values(requestId, args)); - collector.emit(RETURN_STREAM, new Values(requestId, returnInfo)); - collector.emit(ID_STREAM, new Values(requestId)); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declareStream(ARGS_STREAM, new Fields("request", "args")); - declarer.declareStream(RETURN_STREAM, new Fields("request", "return")); - declarer.declareStream(ID_STREAM, new Fields("request")); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java b/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java deleted file mode 100644 index b26508d..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.drpc; - -import backtype.storm.Config; -import backtype.storm.generated.DistributedRPCInvocations; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; -import backtype.storm.utils.ServiceRegistry; -import backtype.storm.utils.Utils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; -import org.json.simple.JSONValue; - - -public class ReturnResults extends BaseRichBolt { - //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS - static final long serialVersionUID = -774882142710631591L; - - public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class); - OutputCollector _collector; - boolean local; - Map _conf; - Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>(); - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - _conf = stormConf; - _collector = collector; - local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local"); - } - - @Override - public void execute(Tuple input) { - String result = (String) input.getValue(0); - String returnInfo = (String) input.getValue(1); - if(returnInfo!=null) { - Map retMap = (Map) JSONValue.parse(returnInfo); - final String host = (String) retMap.get("host"); - final int port = Utils.getInt(retMap.get("port")); - String id = (String) retMap.get("id"); - DistributedRPCInvocations.Iface client; - if(local) { - client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host); - } else { - List server = new ArrayList() {{ - add(host); - add(port); - }}; - - if(!_clients.containsKey(server)) { - try { - _clients.put(server, new DRPCInvocationsClient(_conf, host, port)); - } catch (TTransportException ex) { - throw new RuntimeException(ex); - } - } - client = _clients.get(server); - } - - try { - client.result(id, result); - _collector.ack(input); - } catch (AuthorizationException aze) { - LOG.error("Not authorized to return results to DRPC server", aze); - _collector.fail(input); - if (client instanceof DRPCInvocationsClient) { - try { - LOG.info("reconnecting... "); - ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call - } catch (TException e2) { - throw new RuntimeException(e2); - } - } - } catch(TException e) { - LOG.error("Failed to return results to DRPC server", e); - _collector.fail(input); - if (client instanceof DRPCInvocationsClient) { - try { - LOG.info("reconnecting... "); - ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call - } catch (TException e2) { - throw new RuntimeException(e2); - } - } - } - } - } - - @Override - public void cleanup() { - for(DRPCInvocationsClient c: _clients.values()) { - c.close(); - } - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/generated/AccessControl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/AccessControl.java b/storm-core/src/jvm/backtype/storm/generated/AccessControl.java deleted file mode 100644 index 2209168..0000000 --- a/storm-core/src/jvm/backtype/storm/generated/AccessControl.java +++ /dev/null @@ -1,627 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Autogenerated by Thrift Compiler (0.9.3) - * - * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING - * @generated - */ -package backtype.storm.generated; - -import org.apache.thrift.scheme.IScheme; -import org.apache.thrift.scheme.SchemeFactory; -import org.apache.thrift.scheme.StandardScheme; - -import org.apache.thrift.scheme.TupleScheme; -import org.apache.thrift.protocol.TTupleProtocol; -import org.apache.thrift.protocol.TProtocolException; -import org.apache.thrift.EncodingUtils; -import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.server.AbstractNonblockingServer.*; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.EnumMap; -import java.util.Set; -import java.util.HashSet; -import java.util.EnumSet; -import java.util.Collections; -import java.util.BitSet; -import java.nio.ByteBuffer; -import java.util.Arrays; -import javax.annotation.Generated; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)") -public class AccessControl implements org.apache.thrift.TBase<AccessControl, AccessControl._Fields>, java.io.Serializable, Cloneable, Comparable<AccessControl> { - private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AccessControl"); - - private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)1); - private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)2); - private static final org.apache.thrift.protocol.TField ACCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("access", org.apache.thrift.protocol.TType.I32, (short)3); - - private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); - static { - schemes.put(StandardScheme.class, new AccessControlStandardSchemeFactory()); - schemes.put(TupleScheme.class, new AccessControlTupleSchemeFactory()); - } - - private AccessControlType type; // required - private String name; // optional - private int access; // required - - /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ - public enum _Fields implements org.apache.thrift.TFieldIdEnum { - /** - * - * @see AccessControlType - */ - TYPE((short)1, "type"), - NAME((short)2, "name"), - ACCESS((short)3, "access"); - - private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); - - static { - for (_Fields field : EnumSet.allOf(_Fields.class)) { - byName.put(field.getFieldName(), field); - } - } - - /** - * Find the _Fields constant that matches fieldId, or null if its not found. - */ - public static _Fields findByThriftId(int fieldId) { - switch(fieldId) { - case 1: // TYPE - return TYPE; - case 2: // NAME - return NAME; - case 3: // ACCESS - return ACCESS; - default: - return null; - } - } - - /** - * Find the _Fields constant that matches fieldId, throwing an exception - * if it is not found. - */ - public static _Fields findByThriftIdOrThrow(int fieldId) { - _Fields fields = findByThriftId(fieldId); - if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); - return fields; - } - - /** - * Find the _Fields constant that matches name, or null if its not found. - */ - public static _Fields findByName(String name) { - return byName.get(name); - } - - private final short _thriftId; - private final String _fieldName; - - _Fields(short thriftId, String fieldName) { - _thriftId = thriftId; - _fieldName = fieldName; - } - - public short getThriftFieldId() { - return _thriftId; - } - - public String getFieldName() { - return _fieldName; - } - } - - // isset id assignments - private static final int __ACCESS_ISSET_ID = 0; - private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.NAME}; - public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; - static { - Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.REQUIRED, - new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, AccessControlType.class))); - tmpMap.put(_Fields.NAME, new org.apache.thrift.meta_data.FieldMetaData("name", org.apache.thrift.TFieldRequirementType.OPTIONAL, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - tmpMap.put(_Fields.ACCESS, new org.apache.thrift.meta_data.FieldMetaData("access", org.apache.thrift.TFieldRequirementType.REQUIRED, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); - metaDataMap = Collections.unmodifiableMap(tmpMap); - org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AccessControl.class, metaDataMap); - } - - public AccessControl() { - } - - public AccessControl( - AccessControlType type, - int access) - { - this(); - this.type = type; - this.access = access; - set_access_isSet(true); - } - - /** - * Performs a deep copy on <i>other</i>. - */ - public AccessControl(AccessControl other) { - __isset_bitfield = other.__isset_bitfield; - if (other.is_set_type()) { - this.type = other.type; - } - if (other.is_set_name()) { - this.name = other.name; - } - this.access = other.access; - } - - public AccessControl deepCopy() { - return new AccessControl(this); - } - - @Override - public void clear() { - this.type = null; - this.name = null; - set_access_isSet(false); - this.access = 0; - } - - /** - * - * @see AccessControlType - */ - public AccessControlType get_type() { - return this.type; - } - - /** - * - * @see AccessControlType - */ - public void set_type(AccessControlType type) { - this.type = type; - } - - public void unset_type() { - this.type = null; - } - - /** Returns true if field type is set (has been assigned a value) and false otherwise */ - public boolean is_set_type() { - return this.type != null; - } - - public void set_type_isSet(boolean value) { - if (!value) { - this.type = null; - } - } - - public String get_name() { - return this.name; - } - - public void set_name(String name) { - this.name = name; - } - - public void unset_name() { - this.name = null; - } - - /** Returns true if field name is set (has been assigned a value) and false otherwise */ - public boolean is_set_name() { - return this.name != null; - } - - public void set_name_isSet(boolean value) { - if (!value) { - this.name = null; - } - } - - public int get_access() { - return this.access; - } - - public void set_access(int access) { - this.access = access; - set_access_isSet(true); - } - - public void unset_access() { - __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ACCESS_ISSET_ID); - } - - /** Returns true if field access is set (has been assigned a value) and false otherwise */ - public boolean is_set_access() { - return EncodingUtils.testBit(__isset_bitfield, __ACCESS_ISSET_ID); - } - - public void set_access_isSet(boolean value) { - __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ACCESS_ISSET_ID, value); - } - - public void setFieldValue(_Fields field, Object value) { - switch (field) { - case TYPE: - if (value == null) { - unset_type(); - } else { - set_type((AccessControlType)value); - } - break; - - case NAME: - if (value == null) { - unset_name(); - } else { - set_name((String)value); - } - break; - - case ACCESS: - if (value == null) { - unset_access(); - } else { - set_access((Integer)value); - } - break; - - } - } - - public Object getFieldValue(_Fields field) { - switch (field) { - case TYPE: - return get_type(); - - case NAME: - return get_name(); - - case ACCESS: - return get_access(); - - } - throw new IllegalStateException(); - } - - /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ - public boolean isSet(_Fields field) { - if (field == null) { - throw new IllegalArgumentException(); - } - - switch (field) { - case TYPE: - return is_set_type(); - case NAME: - return is_set_name(); - case ACCESS: - return is_set_access(); - } - throw new IllegalStateException(); - } - - @Override - public boolean equals(Object that) { - if (that == null) - return false; - if (that instanceof AccessControl) - return this.equals((AccessControl)that); - return false; - } - - public boolean equals(AccessControl that) { - if (that == null) - return false; - - boolean this_present_type = true && this.is_set_type(); - boolean that_present_type = true && that.is_set_type(); - if (this_present_type || that_present_type) { - if (!(this_present_type && that_present_type)) - return false; - if (!this.type.equals(that.type)) - return false; - } - - boolean this_present_name = true && this.is_set_name(); - boolean that_present_name = true && that.is_set_name(); - if (this_present_name || that_present_name) { - if (!(this_present_name && that_present_name)) - return false; - if (!this.name.equals(that.name)) - return false; - } - - boolean this_present_access = true; - boolean that_present_access = true; - if (this_present_access || that_present_access) { - if (!(this_present_access && that_present_access)) - return false; - if (this.access != that.access) - return false; - } - - return true; - } - - @Override - public int hashCode() { - List<Object> list = new ArrayList<Object>(); - - boolean present_type = true && (is_set_type()); - list.add(present_type); - if (present_type) - list.add(type.getValue()); - - boolean present_name = true && (is_set_name()); - list.add(present_name); - if (present_name) - list.add(name); - - boolean present_access = true; - list.add(present_access); - if (present_access) - list.add(access); - - return list.hashCode(); - } - - @Override - public int compareTo(AccessControl other) { - if (!getClass().equals(other.getClass())) { - return getClass().getName().compareTo(other.getClass().getName()); - } - - int lastComparison = 0; - - lastComparison = Boolean.valueOf(is_set_type()).compareTo(other.is_set_type()); - if (lastComparison != 0) { - return lastComparison; - } - if (is_set_type()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, other.type); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = Boolean.valueOf(is_set_name()).compareTo(other.is_set_name()); - if (lastComparison != 0) { - return lastComparison; - } - if (is_set_name()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.name, other.name); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = Boolean.valueOf(is_set_access()).compareTo(other.is_set_access()); - if (lastComparison != 0) { - return lastComparison; - } - if (is_set_access()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.access, other.access); - if (lastComparison != 0) { - return lastComparison; - } - } - return 0; - } - - public _Fields fieldForId(int fieldId) { - return _Fields.findByThriftId(fieldId); - } - - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { - schemes.get(iprot.getScheme()).getScheme().read(iprot, this); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { - schemes.get(oprot.getScheme()).getScheme().write(oprot, this); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder("AccessControl("); - boolean first = true; - - sb.append("type:"); - if (this.type == null) { - sb.append("null"); - } else { - sb.append(this.type); - } - first = false; - if (is_set_name()) { - if (!first) sb.append(", "); - sb.append("name:"); - if (this.name == null) { - sb.append("null"); - } else { - sb.append(this.name); - } - first = false; - } - if (!first) sb.append(", "); - sb.append("access:"); - sb.append(this.access); - first = false; - sb.append(")"); - return sb.toString(); - } - - public void validate() throws org.apache.thrift.TException { - // check for required fields - if (!is_set_type()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'type' is unset! Struct:" + toString()); - } - - if (!is_set_access()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'access' is unset! Struct:" + toString()); - } - - // check for sub-struct validity - } - - private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { - try { - write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { - try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bitfield = 0; - read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private static class AccessControlStandardSchemeFactory implements SchemeFactory { - public AccessControlStandardScheme getScheme() { - return new AccessControlStandardScheme(); - } - } - - private static class AccessControlStandardScheme extends StandardScheme<AccessControl> { - - public void read(org.apache.thrift.protocol.TProtocol iprot, AccessControl struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TField schemeField; - iprot.readStructBegin(); - while (true) - { - schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { - break; - } - switch (schemeField.id) { - case 1: // TYPE - if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.type = backtype.storm.generated.AccessControlType.findByValue(iprot.readI32()); - struct.set_type_isSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 2: // NAME - if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.name = iprot.readString(); - struct.set_name_isSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 3: // ACCESS - if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.access = iprot.readI32(); - struct.set_access_isSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - default: - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - iprot.readFieldEnd(); - } - iprot.readStructEnd(); - struct.validate(); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot, AccessControl struct) throws org.apache.thrift.TException { - struct.validate(); - - oprot.writeStructBegin(STRUCT_DESC); - if (struct.type != null) { - oprot.writeFieldBegin(TYPE_FIELD_DESC); - oprot.writeI32(struct.type.getValue()); - oprot.writeFieldEnd(); - } - if (struct.name != null) { - if (struct.is_set_name()) { - oprot.writeFieldBegin(NAME_FIELD_DESC); - oprot.writeString(struct.name); - oprot.writeFieldEnd(); - } - } - oprot.writeFieldBegin(ACCESS_FIELD_DESC); - oprot.writeI32(struct.access); - oprot.writeFieldEnd(); - oprot.writeFieldStop(); - oprot.writeStructEnd(); - } - - } - - private static class AccessControlTupleSchemeFactory implements SchemeFactory { - public AccessControlTupleScheme getScheme() { - return new AccessControlTupleScheme(); - } - } - - private static class AccessControlTupleScheme extends TupleScheme<AccessControl> { - - @Override - public void write(org.apache.thrift.protocol.TProtocol prot, AccessControl struct) throws org.apache.thrift.TException { - TTupleProtocol oprot = (TTupleProtocol) prot; - oprot.writeI32(struct.type.getValue()); - oprot.writeI32(struct.access); - BitSet optionals = new BitSet(); - if (struct.is_set_name()) { - optionals.set(0); - } - oprot.writeBitSet(optionals, 1); - if (struct.is_set_name()) { - oprot.writeString(struct.name); - } - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol prot, AccessControl struct) throws org.apache.thrift.TException { - TTupleProtocol iprot = (TTupleProtocol) prot; - struct.type = backtype.storm.generated.AccessControlType.findByValue(iprot.readI32()); - struct.set_type_isSet(true); - struct.access = iprot.readI32(); - struct.set_access_isSet(true); - BitSet incoming = iprot.readBitSet(1); - if (incoming.get(0)) { - struct.name = iprot.readString(); - struct.set_name_isSet(true); - } - } - } - -} - http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/generated/AccessControlType.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/AccessControlType.java b/storm-core/src/jvm/backtype/storm/generated/AccessControlType.java deleted file mode 100644 index 3a9aa70..0000000 --- a/storm-core/src/jvm/backtype/storm/generated/AccessControlType.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Autogenerated by Thrift Compiler (0.9.3) - * - * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING - * @generated - */ -package backtype.storm.generated; - - -import java.util.Map; -import java.util.HashMap; -import org.apache.thrift.TEnum; - -public enum AccessControlType implements org.apache.thrift.TEnum { - OTHER(1), - USER(2); - - private final int value; - - private AccessControlType(int value) { - this.value = value; - } - - /** - * Get the integer value of this enum value, as defined in the Thrift IDL. - */ - public int getValue() { - return value; - } - - /** - * Find a the enum type by its integer value, as defined in the Thrift IDL. - * @return null if the value is not found. - */ - public static AccessControlType findByValue(int value) { - switch (value) { - case 1: - return OTHER; - case 2: - return USER; - default: - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java b/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java deleted file mode 100644 index eecf044..0000000 --- a/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java +++ /dev/null @@ -1,406 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Autogenerated by Thrift Compiler (0.9.3) - * - * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING - * @generated - */ -package backtype.storm.generated; - -import org.apache.thrift.scheme.IScheme; -import org.apache.thrift.scheme.SchemeFactory; -import org.apache.thrift.scheme.StandardScheme; - -import org.apache.thrift.scheme.TupleScheme; -import org.apache.thrift.protocol.TTupleProtocol; -import org.apache.thrift.protocol.TProtocolException; -import org.apache.thrift.EncodingUtils; -import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.server.AbstractNonblockingServer.*; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.EnumMap; -import java.util.Set; -import java.util.HashSet; -import java.util.EnumSet; -import java.util.Collections; -import java.util.BitSet; -import java.nio.ByteBuffer; -import java.util.Arrays; -import javax.annotation.Generated; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)") -public class AlreadyAliveException extends TException implements org.apache.thrift.TBase<AlreadyAliveException, AlreadyAliveException._Fields>, java.io.Serializable, Cloneable, Comparable<AlreadyAliveException> { - private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AlreadyAliveException"); - - private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)1); - - private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); - static { - schemes.put(StandardScheme.class, new AlreadyAliveExceptionStandardSchemeFactory()); - schemes.put(TupleScheme.class, new AlreadyAliveExceptionTupleSchemeFactory()); - } - - private String msg; // required - - /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ - public enum _Fields implements org.apache.thrift.TFieldIdEnum { - MSG((short)1, "msg"); - - private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); - - static { - for (_Fields field : EnumSet.allOf(_Fields.class)) { - byName.put(field.getFieldName(), field); - } - } - - /** - * Find the _Fields constant that matches fieldId, or null if its not found. - */ - public static _Fields findByThriftId(int fieldId) { - switch(fieldId) { - case 1: // MSG - return MSG; - default: - return null; - } - } - - /** - * Find the _Fields constant that matches fieldId, throwing an exception - * if it is not found. - */ - public static _Fields findByThriftIdOrThrow(int fieldId) { - _Fields fields = findByThriftId(fieldId); - if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); - return fields; - } - - /** - * Find the _Fields constant that matches name, or null if its not found. - */ - public static _Fields findByName(String name) { - return byName.get(name); - } - - private final short _thriftId; - private final String _fieldName; - - _Fields(short thriftId, String fieldName) { - _thriftId = thriftId; - _fieldName = fieldName; - } - - public short getThriftFieldId() { - return _thriftId; - } - - public String getFieldName() { - return _fieldName; - } - } - - // isset id assignments - public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; - static { - Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.REQUIRED, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - metaDataMap = Collections.unmodifiableMap(tmpMap); - org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AlreadyAliveException.class, metaDataMap); - } - - public AlreadyAliveException() { - } - - public AlreadyAliveException( - String msg) - { - this(); - this.msg = msg; - } - - /** - * Performs a deep copy on <i>other</i>. - */ - public AlreadyAliveException(AlreadyAliveException other) { - if (other.is_set_msg()) { - this.msg = other.msg; - } - } - - public AlreadyAliveException deepCopy() { - return new AlreadyAliveException(this); - } - - @Override - public void clear() { - this.msg = null; - } - - public String get_msg() { - return this.msg; - } - - public void set_msg(String msg) { - this.msg = msg; - } - - public void unset_msg() { - this.msg = null; - } - - /** Returns true if field msg is set (has been assigned a value) and false otherwise */ - public boolean is_set_msg() { - return this.msg != null; - } - - public void set_msg_isSet(boolean value) { - if (!value) { - this.msg = null; - } - } - - public void setFieldValue(_Fields field, Object value) { - switch (field) { - case MSG: - if (value == null) { - unset_msg(); - } else { - set_msg((String)value); - } - break; - - } - } - - public Object getFieldValue(_Fields field) { - switch (field) { - case MSG: - return get_msg(); - - } - throw new IllegalStateException(); - } - - /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ - public boolean isSet(_Fields field) { - if (field == null) { - throw new IllegalArgumentException(); - } - - switch (field) { - case MSG: - return is_set_msg(); - } - throw new IllegalStateException(); - } - - @Override - public boolean equals(Object that) { - if (that == null) - return false; - if (that instanceof AlreadyAliveException) - return this.equals((AlreadyAliveException)that); - return false; - } - - public boolean equals(AlreadyAliveException that) { - if (that == null) - return false; - - boolean this_present_msg = true && this.is_set_msg(); - boolean that_present_msg = true && that.is_set_msg(); - if (this_present_msg || that_present_msg) { - if (!(this_present_msg && that_present_msg)) - return false; - if (!this.msg.equals(that.msg)) - return false; - } - - return true; - } - - @Override - public int hashCode() { - List<Object> list = new ArrayList<Object>(); - - boolean present_msg = true && (is_set_msg()); - list.add(present_msg); - if (present_msg) - list.add(msg); - - return list.hashCode(); - } - - @Override - public int compareTo(AlreadyAliveException other) { - if (!getClass().equals(other.getClass())) { - return getClass().getName().compareTo(other.getClass().getName()); - } - - int lastComparison = 0; - - lastComparison = Boolean.valueOf(is_set_msg()).compareTo(other.is_set_msg()); - if (lastComparison != 0) { - return lastComparison; - } - if (is_set_msg()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, other.msg); - if (lastComparison != 0) { - return lastComparison; - } - } - return 0; - } - - public _Fields fieldForId(int fieldId) { - return _Fields.findByThriftId(fieldId); - } - - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { - schemes.get(iprot.getScheme()).getScheme().read(iprot, this); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { - schemes.get(oprot.getScheme()).getScheme().write(oprot, this); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder("AlreadyAliveException("); - boolean first = true; - - sb.append("msg:"); - if (this.msg == null) { - sb.append("null"); - } else { - sb.append(this.msg); - } - first = false; - sb.append(")"); - return sb.toString(); - } - - public void validate() throws org.apache.thrift.TException { - // check for required fields - if (!is_set_msg()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString()); - } - - // check for sub-struct validity - } - - private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { - try { - write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { - try { - read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private static class AlreadyAliveExceptionStandardSchemeFactory implements SchemeFactory { - public AlreadyAliveExceptionStandardScheme getScheme() { - return new AlreadyAliveExceptionStandardScheme(); - } - } - - private static class AlreadyAliveExceptionStandardScheme extends StandardScheme<AlreadyAliveException> { - - public void read(org.apache.thrift.protocol.TProtocol iprot, AlreadyAliveException struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TField schemeField; - iprot.readStructBegin(); - while (true) - { - schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { - break; - } - switch (schemeField.id) { - case 1: // MSG - if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.msg = iprot.readString(); - struct.set_msg_isSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - default: - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - iprot.readFieldEnd(); - } - iprot.readStructEnd(); - struct.validate(); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot, AlreadyAliveException struct) throws org.apache.thrift.TException { - struct.validate(); - - oprot.writeStructBegin(STRUCT_DESC); - if (struct.msg != null) { - oprot.writeFieldBegin(MSG_FIELD_DESC); - oprot.writeString(struct.msg); - oprot.writeFieldEnd(); - } - oprot.writeFieldStop(); - oprot.writeStructEnd(); - } - - } - - private static class AlreadyAliveExceptionTupleSchemeFactory implements SchemeFactory { - public AlreadyAliveExceptionTupleScheme getScheme() { - return new AlreadyAliveExceptionTupleScheme(); - } - } - - private static class AlreadyAliveExceptionTupleScheme extends TupleScheme<AlreadyAliveException> { - - @Override - public void write(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException { - TTupleProtocol oprot = (TTupleProtocol) prot; - oprot.writeString(struct.msg); - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException { - TTupleProtocol iprot = (TTupleProtocol) prot; - struct.msg = iprot.readString(); - struct.set_msg_isSet(true); - } - } - -} -
