[ 
https://issues.apache.org/jira/browse/STORM-1269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15187612#comment-15187612
 ] 

ASF GitHub Bot commented on STORM-1269:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1185#discussion_r55565851
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---
    @@ -0,0 +1,605 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon;
    +
    +import com.codahale.metrics.MetricRegistry;
    +import org.apache.storm.Config;
    +import org.apache.storm.Constants;
    +import org.apache.storm.Thrift;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.daemon.metrics.MetricsUtils;
    +import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.generated.StormBase;
    +import org.apache.storm.metric.EventLoggerBolt;
    +import org.apache.storm.metric.MetricsConsumerBolt;
    +import org.apache.storm.metric.SystemBolt;
    +import org.apache.storm.security.auth.IAuthorizer;
    +import org.apache.storm.task.IBolt;
    +import org.apache.storm.testing.NonRichBoltTracker;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.IPredicate;
    +import org.apache.storm.utils.ThriftTopologyUtils;
    +import org.apache.storm.utils.Utils;
    +import org.json.simple.JSONValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.*;
    +
    +public class StormCommon {
    +    // A singleton instance allows us to mock delegated static methods in 
our
    +    // tests by subclassing.
    +    private static StormCommon _instance = new StormCommon();
    +
    +    /**
    +     * Provide an instance of this class for delegates to use.  To mock out
    +     * delegated methods, provide an instance of a subclass that overrides 
the
    +     * implementation of the delegated method.
    +     * @param common a StormCommon instance
    +     * @return the previously set instance
    +     */
    +    public static StormCommon setInstance(StormCommon common) {
    +        StormCommon oldInstance = _instance;
    +        _instance = common;
    +        return oldInstance;
    +    }
    +
    +    private static final Logger LOG = 
LoggerFactory.getLogger(StormCommon.class);
    +
    +    public static final String ACKER_COMPONENT_ID = 
Acker.ACKER_COMPONENT_ID;
    +    public static final String ACKER_INIT_STREAM_ID = 
Acker.ACKER_INIT_STREAM_ID;
    +    public static final String ACKER_ACK_STREAM_ID = 
Acker.ACKER_ACK_STREAM_ID;
    +    public static final String ACKER_FAIL_STREAM_ID = 
Acker.ACKER_FAIL_STREAM_ID;
    +
    +    public static final String SYSTEM_STREAM_ID = "__system";
    +
    +    public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
    +    public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
    +
    +    public static void startMetricsReporter(PreparableReporter report, Map 
conf) {
    +        report.prepare(new MetricRegistry(), conf);
    +        report.start();
    +        LOG.info("Started statistics report plugin...");
    +    }
    +
    +    public static void startMetricsReporters(Map conf) {
    +        List<PreparableReporter> reporters = 
MetricsUtils.getPreparableReporters(conf);
    +        for (PreparableReporter reporter : reporters) {
    +            startMetricsReporter(reporter, conf);
    +        }
    +    }
    +
    +    public static String getTopologyNameById(String topologyId) {
    +        String topologyName = null;
    +        try {
    +            topologyName = topologyIdToName(topologyId);
    +        } catch (InvalidTopologyException e) {
    +            LOG.error("Invalid topologyId=" + topologyId);
    +        }
    +        return topologyName;
    +    }
    +
    +    /**
    +     * Convert topologyId to topologyName. TopologyId = 
topoloygName-counter-timeStamp
    +     *
    +     * @param topologyId
    +     * @return
    +     */
    +    public static String topologyIdToName(String topologyId) throws 
InvalidTopologyException {
    +        String ret = null;
    +        int index = topologyId.lastIndexOf('-');
    +        if (index != -1 && index > 2) {
    +            index = topologyId.lastIndexOf('-', index - 1);
    +            if (index != -1 && index > 0)
    +                ret = topologyId.substring(0, index);
    +            else
    +                throw new InvalidTopologyException(topologyId + " is not a 
valid topologyId");
    +        } else
    +            throw new InvalidTopologyException(topologyId + " is not a 
valid topologyId");
    +        return ret;
    +    }
    +
    +    public static String getStormId(IStormClusterState stormClusterState, 
final String topologyName) {
    +        List<String> activeTopologys = stormClusterState.activeStorms();
    +        IPredicate pred = new IPredicate<String>() {
    +            @Override
    +            public boolean test(String obj) {
    +                return obj != null ? 
getTopologyNameById(obj).equals(topologyName) : false;
    +            }
    +        };
    +        return Utils.findOne(pred, activeTopologys);
    +    }
    +
    +    public static Map<String, StormBase> topologyBases(IStormClusterState 
stormClusterState) {
    +        return _instance.topologyBasesImpl(stormClusterState);
    +    }
    +
    +    protected Map<String, StormBase> topologyBasesImpl(IStormClusterState 
stormClusterState) {
    +        List<String> activeTopologys = stormClusterState.activeStorms();
    +        Map<String, StormBase> stormBases = new HashMap<String, 
StormBase>();
    +        if (activeTopologys != null) {
    +            for (String topologyId : activeTopologys) {
    +                StormBase base = stormClusterState.stormBase(topologyId, 
null);
    +                if (base != null) {
    +                    stormBases.put(topologyId, base);
    +                }
    +            }
    +        }
    +        return stormBases;
    +    }
    +
    +    public static void validateDistributedMode(Map conf) {
    +        if (ConfigUtils.isLocalMode(conf)) {
    +            throw new IllegalArgumentException("Cannot start server in 
local mode!");
    +        }
    +    }
    +
    +    private static void validateIds(StormTopology topology) throws 
InvalidTopologyException {
    +        List<String> componentIds = new ArrayList<String>();
    +
    +        for (StormTopology._Fields field : Thrift.getTopologyFields()) {
    +            if (ThriftTopologyUtils.isWorkerHook(field) == false) {
    +                Object value = topology.getFieldValue(field);
    +                if (value != null) {
    +                    Map<String, Object> componentMap = (Map<String, 
Object>) value;
    +                    componentIds.addAll(componentMap.keySet());
    +
    +                    for (String id : componentMap.keySet()) {
    +                        if (Utils.isSystemId(id)) {
    +                            throw new InvalidTopologyException(id + " is 
not a valid component id.");
    +                        }
    +                    }
    +                    for (Object componentObj : componentMap.values()) {
    +                        ComponentCommon common = 
getComponentCommon(componentObj);
    +                        Set<String> streamIds = 
common.get_streams().keySet();
    +                        for (String id : streamIds) {
    +                            if (Utils.isSystemId(id)) {
    +                                throw new InvalidTopologyException(id + " 
is not a valid stream id.");
    +                            }
    +                        }
    +                    }
    +                }
    +            }
    +        }
    +
    +        List<String> offending = Utils.getRepeat(componentIds);
    +        if (offending.isEmpty() == false) {
    +            throw new InvalidTopologyException("Duplicate component ids: " 
+ offending);
    +        }
    +    }
    +
    +    private static boolean isEmptyInputs(ComponentCommon common) {
    +        if (common == null) {
    +            return true;
    +        } else if (common.get_inputs() == null) {
    +            return true;
    +        } else {
    +            return common.get_inputs().isEmpty();
    +        }
    +    }
    +
    +    public static Map<String, Object> allComponents(StormTopology 
topology) {
    +        Map<String, Object> components = new HashMap<String, Object>();
    +        List<StormTopology._Fields> topologyFields = 
Arrays.asList(Thrift.getTopologyFields());
    +        for (StormTopology._Fields field : topologyFields) {
    +            if (ThriftTopologyUtils.isWorkerHook(field) == false) {
    +                components.putAll(((Map) topology.getFieldValue(field)));
    +            }
    +        }
    +        return components;
    +    }
    +
    +    public static Map componentConf(Object component) {
    +        Map<Object, Object> conf = new HashMap<Object, Object>();
    +        ComponentCommon common = getComponentCommon(component);
    +        if (common != null) {
    +            String jconf = common.get_json_conf();
    +            if (jconf != null) {
    +                conf.putAll((Map<Object, Object>) JSONValue.parse(jconf));
    +            }
    +        }
    +        return conf;
    +    }
    +
    +    public static void validateBasic(StormTopology topology) throws 
InvalidTopologyException {
    +        validateIds(topology);
    +
    +        List<StormTopology._Fields> spoutFields = 
Arrays.asList(Thrift.getSpoutFields());
    +        for (StormTopology._Fields field : spoutFields) {
    +            Map<String, Object> spoutComponents = (Map<String, Object>) 
topology.getFieldValue(field);
    +            if (spoutComponents != null) {
    +                for (Object obj : spoutComponents.values()) {
    +                    ComponentCommon common = getComponentCommon(obj);
    +                    if (isEmptyInputs(common) == false) {
    +                        throw new InvalidTopologyException("May not 
declare inputs for a spout");
    +                    }
    +                }
    +            }
    +        }
    +
    +        Map<String, Object> componentMap = allComponents(topology);
    +        for (Object componentObj : componentMap.values()) {
    +            Map conf = componentConf(componentObj);
    +            ComponentCommon common = getComponentCommon(componentObj);
    +            if (common != null) {
    +                int parallelismHintNum = Thrift.getParallelismHint(common);
    +                Integer taskNum = 
Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
    +                if (taskNum > 0 && parallelismHintNum <= 0) {
    +                    throw new InvalidTopologyException("Number of 
executors must be greater than 0 when number of tasks is greater than 0");
    +                }
    +            }
    +        }
    +    }
    +
    +    private static Set<String> getStreamOutputFields(Map<String, 
StreamInfo> streams) {
    +        Set<String> outputFields = new HashSet<String>();
    +        if (streams != null) {
    +            for (StreamInfo streamInfo : streams.values()) {
    +                outputFields.addAll(streamInfo.get_output_fields());
    +            }
    +        }
    +        return outputFields;
    +    }
    +
    +    public static void validateStructure(StormTopology topology) throws 
InvalidTopologyException {
    +        Map<String, Object> componentMap = allComponents(topology);
    +        for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
    +            String componentId = entry.getKey();
    +            ComponentCommon common = getComponentCommon(entry.getValue());
    +            if (common != null) {
    +                Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
    +                for (Map.Entry<GlobalStreamId, Grouping> input : 
inputs.entrySet()) {
    +                    String sourceStreamId = input.getKey().get_streamId();
    +                    String sourceComponentId = 
input.getKey().get_componentId();
    +                    if(componentMap.keySet().contains(sourceComponentId) 
== false) {
    +                        throw new InvalidTopologyException("Component: [" 
+ componentId + "] subscribes from non-existent component [" + 
sourceComponentId + "]");
    +                    }
    +
    +                    ComponentCommon sourceComponent = 
getComponentCommon(componentMap.get(sourceComponentId));
    +                    if (sourceComponent == null || 
sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
    +                        throw new InvalidTopologyException("Component: [" 
+ componentId + "] subscribes from non-existent stream: " +
    +                                "[" + sourceStreamId + "] of component [" 
+ sourceComponentId + "]");
    +                    }
    +
    +                    Grouping grouping = input.getValue();
    +                    if (Thrift.groupingType(grouping) == 
Grouping._Fields.FIELDS) {
    +                        List<String> fields = grouping.get_fields();
    +                        Map<String, StreamInfo> streams = 
sourceComponent.get_streams();
    +                        Set<String> sourceOutputFields = 
getStreamOutputFields(streams);
    +                        if (sourceOutputFields.containsAll(fields) == 
false) {
    +                            throw new InvalidTopologyException("Component: 
[" + componentId + "] subscribes from stream: [" + sourceStreamId  +"] of 
component " +
    +                                    "[" + sourceComponentId + "] + with 
non-existent fields: " + fields);
    +                        }
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology 
topology) {
    +        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, 
Grouping>();
    +        Set<String> boltIds = topology.get_bolts().keySet();
    +        Set<String> spoutIds = topology.get_spouts().keySet();
    +
    +        for(String id : spoutIds) {
    +            inputs.put(Utils.getGlobalStreamId(id, ACKER_INIT_STREAM_ID), 
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
    +        }
    +
    +        for(String id : boltIds) {
    +            inputs.put(Utils.getGlobalStreamId(id, ACKER_ACK_STREAM_ID), 
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
    +            inputs.put(Utils.getGlobalStreamId(id, ACKER_FAIL_STREAM_ID), 
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
    +        }
    +        return inputs;
    +    }
    +
    +    public static String clusterId = null;
    +    public static IBolt makeAckerBolt() {
    +        return _instance.makeAckerBoltImpl();
    +    }
    +    public IBolt makeAckerBoltImpl() {
    +        return new Acker();
    +    }
    +
    +    public static void addAcker(Map conf, StormTopology topology) {
    +        int ackerNum = 
Utils.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), 
Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
    +        Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
    +
    +        Map<String, StreamInfo> outputStreams = new HashMap<String, 
StreamInfo>();
    +        outputStreams.put(ACKER_ACK_STREAM_ID, 
Thrift.directOutputFields(Arrays.asList("id")));
    +        outputStreams.put(ACKER_FAIL_STREAM_ID, 
Thrift.directOutputFields(Arrays.asList("id")));
    +
    +        Map<String, Object> ackerConf = new HashMap<String, Object>();
    +        ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
    +        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
    +
    +        Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, 
makeAckerBolt(), outputStreams, ackerNum, ackerConf);
    +
    +        for(Bolt bolt : topology.get_bolts().values()) {
    +            ComponentCommon common = bolt.get_common();
    +            common.put_to_streams(ACKER_ACK_STREAM_ID, 
Thrift.outputFields(Arrays.asList("id", "ack-val")));
    +            common.put_to_streams(ACKER_FAIL_STREAM_ID, 
Thrift.outputFields(Arrays.asList("id")));
    +        }
    +
    +        for (SpoutSpec spout : topology.get_spouts().values()) {
    +            ComponentCommon common = spout.get_common();
    +            Map spoutConf = componentConf(spout);
    +            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
    +            common.set_json_conf(JSONValue.toJSONString(spoutConf));
    +            common.put_to_streams(ACKER_INIT_STREAM_ID, 
Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
    +            
common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, 
ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
    +            
common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, 
ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
    +        }
    +
    +        topology.put_to_bolts(ACKER_COMPONENT_ID, acker);
    +    }
    +
    +    public static ComponentCommon getComponentCommon(Object component) {
    +        if (component == null) {
    +            return null;
    +        }
    +
    +        ComponentCommon common = null;
    +        if (component instanceof StateSpoutSpec) {
    +            common = ((StateSpoutSpec) component).get_common();
    +        } else if (component instanceof SpoutSpec) {
    +            common = ((SpoutSpec) component).get_common();
    +        } else if (component instanceof Bolt) {
    +            common = ((Bolt) component).get_common();
    +        }
    +        return common;
    +    }
    +
    +    public static void addMetricStreams(StormTopology topology) {
    +        for (Object component : allComponents(topology).values()) {
    +            ComponentCommon common = getComponentCommon(component);
    +            if (common != null) {
    +                StreamInfo streamInfo = 
Thrift.outputFields(Arrays.asList("task-info", "data-points"));
    +                common.put_to_streams(Constants.METRICS_STREAM_ID, 
streamInfo);
    +            }
    +        }
    +    }
    +
    +    public static void addSystemStreams(StormTopology topology) {
    +        for (Object component : allComponents(topology).values()) {
    +            ComponentCommon common = getComponentCommon(component);
    +            if (common != null) {
    +                StreamInfo streamInfo = 
Thrift.outputFields(Arrays.asList("event"));
    +                common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
    +            }
    +        }
    +    }
    +
    +    public static List<String> eventLoggerBoltFields() {
    +        List<String> fields = 
Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, 
EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS,
    +                EventLoggerBolt.FIELD_VALUES);
    +        return fields;
    +    }
    +
    +    public static Map<GlobalStreamId, Grouping> 
eventLoggerInputs(StormTopology topology) {
    +        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, 
Grouping>();
    +        Set<String> allIds = new HashSet<String>();
    +        allIds.addAll(topology.get_bolts().keySet());
    +        allIds.addAll(topology.get_spouts().keySet());
    +
    +        for(String id : allIds) {
    +            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), 
Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
    +        }
    +        return inputs;
    +    }
    +
    +    public static void addEventLogger(Map conf, StormTopology topology) {
    +        Integer numExecutors = 
Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), 
Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
    +        HashMap<String, Object> componentConf = new HashMap<String, 
Object>();
    +        componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
    +        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
    +        Bolt eventLoggerBolt = 
Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new 
EventLoggerBolt(), null, numExecutors, componentConf);
    +
    +        for(Object component : allComponents(topology).values()) {
    +            ComponentCommon common = getComponentCommon(component);
    +            if (common != null) {
    +                common.put_to_streams(EVENTLOGGER_STREAM_ID, 
Thrift.outputFields(eventLoggerBoltFields()));
    +            }
    +        }
    +        topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
    +    }
    +
    +    public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf, 
StormTopology topology) {
    +        Map<String, Bolt> metricsConsumerBolts = new HashMap<String, 
Bolt>();
    +
    +        Set<String> componentIdsEmitMetrics = new HashSet<String>();
    +        componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
    +        componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
    +
    +        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, 
Grouping>();
    +        for (String componentId : componentIdsEmitMetrics) {
    +            inputs.put(Utils.getGlobalStreamId(componentId, 
Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
    +        }
    +
    +        List<Map<String, Object>> registerInfo = (List<Map<String, 
Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
    +        if (registerInfo != null) {
    +            Map<String, Integer> classOccurrencesMap = new HashMap<String, 
Integer>();
    +            for (Map<String, Object> info : registerInfo) {
    +                String className = (String) info.get("class");
    +                Object argument = info.get("argument");
    +                Integer phintNum = 
Utils.getInt(info.get("parallelism.hint"), 1);
    +                Map<String, Object> metricsConsumerConf = new 
HashMap<String, Object>();
    +                metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
    +                Bolt metricsConsumerBolt = 
Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, 
argument), null, phintNum, metricsConsumerConf);
    +
    +                String id = className;
    +                if (classOccurrencesMap.containsKey(className)) {
    +                    // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", 
\"a#2\"]"
    +                    int occurrenceNum = classOccurrencesMap.get(className);
    +                    occurrenceNum++;
    +                    classOccurrencesMap.put(className, occurrenceNum);
    +                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className 
+ "#" + occurrenceNum;
    +                } else {
    +                    classOccurrencesMap.put(className, 1);
    +                }
    +                metricsConsumerBolts.put(id, metricsConsumerBolt);
    +            }
    +        }
    +        return metricsConsumerBolts;
    +    }
    +
    +    public static void addMetricComponents(Map conf, StormTopology 
topology) {
    +        Map<String, Bolt> metricsConsumerBolts = 
metricsConsumerBoltSpecs(conf, topology);
    +        for (Map.Entry<String, Bolt> entry : 
metricsConsumerBolts.entrySet()) {
    +            topology.put_to_bolts(entry.getKey(), entry.getValue());
    +        }
    +    }
    +
    +    public static void addSystemComponents(Map conf, StormTopology 
topology) {
    +        Map<String, StreamInfo> outputStreams = new HashMap<String, 
StreamInfo>();
    +        outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, 
Thrift.outputFields(Arrays.asList("rate_secs")));
    +        outputStreams.put(Constants.METRICS_TICK_STREAM_ID, 
Thrift.outputFields(Arrays.asList("interval")));
    +        outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, 
Thrift.outputFields(Arrays.asList("creds")));
    +
    +        Map<String, Object> boltConf = new HashMap<String, Object>();
    +        boltConf.put(Config.TOPOLOGY_TASKS, 0);
    +
    +        Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, 
new SystemBolt(), outputStreams, 0, boltConf);
    +        topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, 
systemBoltSpec);
    +    }
    +
    +    public static StormTopology systemTopology(Map stormConf, 
StormTopology topology) throws InvalidTopologyException {
    +        return _instance.systemTopologyImpl(stormConf, topology);
    +    }
    +
    +    protected StormTopology systemTopologyImpl(Map stormConf, 
StormTopology topology) throws InvalidTopologyException {
    --- End diff --
    
    That sounds fine, but could we do it in another follow on JIRA?  This is 
really close to being done.


> port backtype.storm.daemon.common to java
> -----------------------------------------
>
>                 Key: STORM-1269
>                 URL: https://issues.apache.org/jira/browse/STORM-1269
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Basti Liu
>              Labels: java-migration, jstorm-merger
>
> Common utils shared by the daemons (Some things should just use the Thrift 
> object)
> https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java
>  is similar but not exactly the same.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to