[ https://issues.apache.org/jira/browse/STORM-1269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15187612#comment-15187612 ]
ASF GitHub Bot commented on STORM-1269: --------------------------------------- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1185#discussion_r55565851 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java --- @@ -0,0 +1,605 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon; + +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.Thrift; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.daemon.metrics.reporters.PreparableReporter; +import org.apache.storm.generated.*; +import org.apache.storm.generated.StormBase; +import org.apache.storm.metric.EventLoggerBolt; +import org.apache.storm.metric.MetricsConsumerBolt; +import org.apache.storm.metric.SystemBolt; +import org.apache.storm.security.auth.IAuthorizer; +import org.apache.storm.task.IBolt; +import org.apache.storm.testing.NonRichBoltTracker; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.IPredicate; +import org.apache.storm.utils.ThriftTopologyUtils; +import org.apache.storm.utils.Utils; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class StormCommon { + // A singleton instance allows us to mock delegated static methods in our + // tests by subclassing. + private static StormCommon _instance = new StormCommon(); + + /** + * Provide an instance of this class for delegates to use. To mock out + * delegated methods, provide an instance of a subclass that overrides the + * implementation of the delegated method. + * @param common a StormCommon instance + * @return the previously set instance + */ + public static StormCommon setInstance(StormCommon common) { + StormCommon oldInstance = _instance; + _instance = common; + return oldInstance; + } + + private static final Logger LOG = LoggerFactory.getLogger(StormCommon.class); + + public static final String ACKER_COMPONENT_ID = Acker.ACKER_COMPONENT_ID; + public static final String ACKER_INIT_STREAM_ID = Acker.ACKER_INIT_STREAM_ID; + public static final String ACKER_ACK_STREAM_ID = Acker.ACKER_ACK_STREAM_ID; + public static final String ACKER_FAIL_STREAM_ID = Acker.ACKER_FAIL_STREAM_ID; + + public static final String SYSTEM_STREAM_ID = "__system"; + + public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger"; + public static final String EVENTLOGGER_STREAM_ID = "__eventlog"; + + public static void startMetricsReporter(PreparableReporter report, Map conf) { + report.prepare(new MetricRegistry(), conf); + report.start(); + LOG.info("Started statistics report plugin..."); + } + + public static void startMetricsReporters(Map conf) { + List<PreparableReporter> reporters = MetricsUtils.getPreparableReporters(conf); + for (PreparableReporter reporter : reporters) { + startMetricsReporter(reporter, conf); + } + } + + public static String getTopologyNameById(String topologyId) { + String topologyName = null; + try { + topologyName = topologyIdToName(topologyId); + } catch (InvalidTopologyException e) { + LOG.error("Invalid topologyId=" + topologyId); + } + return topologyName; + } + + /** + * Convert topologyId to topologyName. TopologyId = topoloygName-counter-timeStamp + * + * @param topologyId + * @return + */ + public static String topologyIdToName(String topologyId) throws InvalidTopologyException { + String ret = null; + int index = topologyId.lastIndexOf('-'); + if (index != -1 && index > 2) { + index = topologyId.lastIndexOf('-', index - 1); + if (index != -1 && index > 0) + ret = topologyId.substring(0, index); + else + throw new InvalidTopologyException(topologyId + " is not a valid topologyId"); + } else + throw new InvalidTopologyException(topologyId + " is not a valid topologyId"); + return ret; + } + + public static String getStormId(IStormClusterState stormClusterState, final String topologyName) { + List<String> activeTopologys = stormClusterState.activeStorms(); + IPredicate pred = new IPredicate<String>() { + @Override + public boolean test(String obj) { + return obj != null ? getTopologyNameById(obj).equals(topologyName) : false; + } + }; + return Utils.findOne(pred, activeTopologys); + } + + public static Map<String, StormBase> topologyBases(IStormClusterState stormClusterState) { + return _instance.topologyBasesImpl(stormClusterState); + } + + protected Map<String, StormBase> topologyBasesImpl(IStormClusterState stormClusterState) { + List<String> activeTopologys = stormClusterState.activeStorms(); + Map<String, StormBase> stormBases = new HashMap<String, StormBase>(); + if (activeTopologys != null) { + for (String topologyId : activeTopologys) { + StormBase base = stormClusterState.stormBase(topologyId, null); + if (base != null) { + stormBases.put(topologyId, base); + } + } + } + return stormBases; + } + + public static void validateDistributedMode(Map conf) { + if (ConfigUtils.isLocalMode(conf)) { + throw new IllegalArgumentException("Cannot start server in local mode!"); + } + } + + private static void validateIds(StormTopology topology) throws InvalidTopologyException { + List<String> componentIds = new ArrayList<String>(); + + for (StormTopology._Fields field : Thrift.getTopologyFields()) { + if (ThriftTopologyUtils.isWorkerHook(field) == false) { + Object value = topology.getFieldValue(field); + if (value != null) { + Map<String, Object> componentMap = (Map<String, Object>) value; + componentIds.addAll(componentMap.keySet()); + + for (String id : componentMap.keySet()) { + if (Utils.isSystemId(id)) { + throw new InvalidTopologyException(id + " is not a valid component id."); + } + } + for (Object componentObj : componentMap.values()) { + ComponentCommon common = getComponentCommon(componentObj); + Set<String> streamIds = common.get_streams().keySet(); + for (String id : streamIds) { + if (Utils.isSystemId(id)) { + throw new InvalidTopologyException(id + " is not a valid stream id."); + } + } + } + } + } + } + + List<String> offending = Utils.getRepeat(componentIds); + if (offending.isEmpty() == false) { + throw new InvalidTopologyException("Duplicate component ids: " + offending); + } + } + + private static boolean isEmptyInputs(ComponentCommon common) { + if (common == null) { + return true; + } else if (common.get_inputs() == null) { + return true; + } else { + return common.get_inputs().isEmpty(); + } + } + + public static Map<String, Object> allComponents(StormTopology topology) { + Map<String, Object> components = new HashMap<String, Object>(); + List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields()); + for (StormTopology._Fields field : topologyFields) { + if (ThriftTopologyUtils.isWorkerHook(field) == false) { + components.putAll(((Map) topology.getFieldValue(field))); + } + } + return components; + } + + public static Map componentConf(Object component) { + Map<Object, Object> conf = new HashMap<Object, Object>(); + ComponentCommon common = getComponentCommon(component); + if (common != null) { + String jconf = common.get_json_conf(); + if (jconf != null) { + conf.putAll((Map<Object, Object>) JSONValue.parse(jconf)); + } + } + return conf; + } + + public static void validateBasic(StormTopology topology) throws InvalidTopologyException { + validateIds(topology); + + List<StormTopology._Fields> spoutFields = Arrays.asList(Thrift.getSpoutFields()); + for (StormTopology._Fields field : spoutFields) { + Map<String, Object> spoutComponents = (Map<String, Object>) topology.getFieldValue(field); + if (spoutComponents != null) { + for (Object obj : spoutComponents.values()) { + ComponentCommon common = getComponentCommon(obj); + if (isEmptyInputs(common) == false) { + throw new InvalidTopologyException("May not declare inputs for a spout"); + } + } + } + } + + Map<String, Object> componentMap = allComponents(topology); + for (Object componentObj : componentMap.values()) { + Map conf = componentConf(componentObj); + ComponentCommon common = getComponentCommon(componentObj); + if (common != null) { + int parallelismHintNum = Thrift.getParallelismHint(common); + Integer taskNum = Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0); + if (taskNum > 0 && parallelismHintNum <= 0) { + throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0"); + } + } + } + } + + private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) { + Set<String> outputFields = new HashSet<String>(); + if (streams != null) { + for (StreamInfo streamInfo : streams.values()) { + outputFields.addAll(streamInfo.get_output_fields()); + } + } + return outputFields; + } + + public static void validateStructure(StormTopology topology) throws InvalidTopologyException { + Map<String, Object> componentMap = allComponents(topology); + for (Map.Entry<String, Object> entry : componentMap.entrySet()) { + String componentId = entry.getKey(); + ComponentCommon common = getComponentCommon(entry.getValue()); + if (common != null) { + Map<GlobalStreamId, Grouping> inputs = common.get_inputs(); + for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) { + String sourceStreamId = input.getKey().get_streamId(); + String sourceComponentId = input.getKey().get_componentId(); + if(componentMap.keySet().contains(sourceComponentId) == false) { + throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent component [" + sourceComponentId + "]"); + } + + ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId)); + if (sourceComponent == null || sourceComponent.get_streams().containsKey(sourceStreamId) == false) { + throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent stream: " + + "[" + sourceStreamId + "] of component [" + sourceComponentId + "]"); + } + + Grouping grouping = input.getValue(); + if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) { + List<String> fields = grouping.get_fields(); + Map<String, StreamInfo> streams = sourceComponent.get_streams(); + Set<String> sourceOutputFields = getStreamOutputFields(streams); + if (sourceOutputFields.containsAll(fields) == false) { + throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from stream: [" + sourceStreamId +"] of component " + + "[" + sourceComponentId + "] + with non-existent fields: " + fields); + } + } + } + } + } + } + + public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) { + Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>(); + Set<String> boltIds = topology.get_bolts().keySet(); + Set<String> spoutIds = topology.get_spouts().keySet(); + + for(String id : spoutIds) { + inputs.put(Utils.getGlobalStreamId(id, ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); + } + + for(String id : boltIds) { + inputs.put(Utils.getGlobalStreamId(id, ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); + inputs.put(Utils.getGlobalStreamId(id, ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); + } + return inputs; + } + + public static String clusterId = null; + public static IBolt makeAckerBolt() { + return _instance.makeAckerBoltImpl(); + } + public IBolt makeAckerBoltImpl() { + return new Acker(); + } + + public static void addAcker(Map conf, StormTopology topology) { + int ackerNum = Utils.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS))); + Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology); + + Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>(); + outputStreams.put(ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id"))); + outputStreams.put(ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id"))); + + Map<String, Object> ackerConf = new HashMap<String, Object>(); + ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum); + ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); + + Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf); + + for(Bolt bolt : topology.get_bolts().values()) { + ComponentCommon common = bolt.get_common(); + common.put_to_streams(ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val"))); + common.put_to_streams(ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); + } + + for (SpoutSpec spout : topology.get_spouts().values()) { + ComponentCommon common = spout.get_common(); + Map spoutConf = componentConf(spout); + spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); + common.set_json_conf(JSONValue.toJSONString(spoutConf)); + common.put_to_streams(ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task"))); + common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping()); + common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping()); + } + + topology.put_to_bolts(ACKER_COMPONENT_ID, acker); + } + + public static ComponentCommon getComponentCommon(Object component) { + if (component == null) { + return null; + } + + ComponentCommon common = null; + if (component instanceof StateSpoutSpec) { + common = ((StateSpoutSpec) component).get_common(); + } else if (component instanceof SpoutSpec) { + common = ((SpoutSpec) component).get_common(); + } else if (component instanceof Bolt) { + common = ((Bolt) component).get_common(); + } + return common; + } + + public static void addMetricStreams(StormTopology topology) { + for (Object component : allComponents(topology).values()) { + ComponentCommon common = getComponentCommon(component); + if (common != null) { + StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points")); + common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo); + } + } + } + + public static void addSystemStreams(StormTopology topology) { + for (Object component : allComponents(topology).values()) { + ComponentCommon common = getComponentCommon(component); + if (common != null) { + StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event")); + common.put_to_streams(SYSTEM_STREAM_ID, streamInfo); + } + } + } + + public static List<String> eventLoggerBoltFields() { + List<String> fields = Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS, + EventLoggerBolt.FIELD_VALUES); + return fields; + } + + public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) { + Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>(); + Set<String> allIds = new HashSet<String>(); + allIds.addAll(topology.get_bolts().keySet()); + allIds.addAll(topology.get_spouts().keySet()); + + for(String id : allIds) { + inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("component-id"))); + } + return inputs; + } + + public static void addEventLogger(Map conf, StormTopology topology) { + Integer numExecutors = Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS))); + HashMap<String, Object> componentConf = new HashMap<String, Object>(); + componentConf.put(Config.TOPOLOGY_TASKS, numExecutors); + componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); + Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf); + + for(Object component : allComponents(topology).values()) { + ComponentCommon common = getComponentCommon(component); + if (common != null) { + common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields())); + } + } + topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt); + } + + public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf, StormTopology topology) { + Map<String, Bolt> metricsConsumerBolts = new HashMap<String, Bolt>(); + + Set<String> componentIdsEmitMetrics = new HashSet<String>(); + componentIdsEmitMetrics.addAll(allComponents(topology).keySet()); + componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID); + + Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>(); + for (String componentId : componentIdsEmitMetrics) { + inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping()); + } + + List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER); + if (registerInfo != null) { + Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>(); + for (Map<String, Object> info : registerInfo) { + String className = (String) info.get("class"); + Object argument = info.get("argument"); + Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1); + Map<String, Object> metricsConsumerConf = new HashMap<String, Object>(); + metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum); + Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, argument), null, phintNum, metricsConsumerConf); + + String id = className; + if (classOccurrencesMap.containsKey(className)) { + // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]" + int occurrenceNum = classOccurrencesMap.get(className); + occurrenceNum++; + classOccurrencesMap.put(className, occurrenceNum); + id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum; + } else { + classOccurrencesMap.put(className, 1); + } + metricsConsumerBolts.put(id, metricsConsumerBolt); + } + } + return metricsConsumerBolts; + } + + public static void addMetricComponents(Map conf, StormTopology topology) { + Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology); + for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) { + topology.put_to_bolts(entry.getKey(), entry.getValue()); + } + } + + public static void addSystemComponents(Map conf, StormTopology topology) { + Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>(); + outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs"))); + outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval"))); + outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds"))); + + Map<String, Object> boltConf = new HashMap<String, Object>(); + boltConf.put(Config.TOPOLOGY_TASKS, 0); + + Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf); + topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, systemBoltSpec); + } + + public static StormTopology systemTopology(Map stormConf, StormTopology topology) throws InvalidTopologyException { + return _instance.systemTopologyImpl(stormConf, topology); + } + + protected StormTopology systemTopologyImpl(Map stormConf, StormTopology topology) throws InvalidTopologyException { --- End diff -- That sounds fine, but could we do it in another follow on JIRA? This is really close to being done. > port backtype.storm.daemon.common to java > ----------------------------------------- > > Key: STORM-1269 > URL: https://issues.apache.org/jira/browse/STORM-1269 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Basti Liu > Labels: java-migration, jstorm-merger > > Common utils shared by the daemons (Some things should just use the Thrift > object) > https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java > is similar but not exactly the same. -- This message was sent by Atlassian JIRA (v6.3.4#6332)