Repository: storm Updated Branches: refs/heads/master b4e610c5f -> 26818f2b4
http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java b/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java index 55a86cf..ab164ca 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java @@ -15,19 +15,53 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.nimbus; +import java.util.Map; +import org.apache.storm.generated.Bolt; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DefaultTopologyValidator implements ITopologyValidator { + private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyValidator.class); @Override public void prepare(Map<String, Object> StormConf){ } @Override - public void validate(String topologyName, Map<String, Object> topologyConf, StormTopology topology) throws InvalidTopologyException { + public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { + if (topologyName.contains(".")) { + LOG.warn("Metrics for topology name '{}' will be reported as '{}'.", topologyName, topologyName.replace('.', '_')); + } + Map<String, SpoutSpec> spouts = topology.get_spouts(); + for (String spoutName : spouts.keySet()) { + if (spoutName.contains(".")) { + LOG.warn("Metrics for spout name '{}' will be reported as '{}'.", spoutName, spoutName.replace('.', '_')); + } + SpoutSpec spoutSpec = spouts.get(spoutName); + for (String streamName : spoutSpec.get_common().get_streams().keySet()) { + if (streamName.contains(".")) { + LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_')); + } + } + } + + Map<String, Bolt> bolts = topology.get_bolts(); + for (String boltName : bolts.keySet()) { + if (boltName.contains(".")) { + LOG.warn("Metrics for bolt name '{}' will be reported as '{}'.", boltName, boltName.replace('.', '_')); + } + Bolt bolt = bolts.get(boltName); + for (String streamName : bolt.get_common().get_streams().keySet()) { + if (streamName.contains(".")) { + LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_')); + } + } + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java b/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java new file mode 100644 index 0000000..cdf22c7 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.nimbus; + +import java.util.Map; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StrictTopologyValidator implements ITopologyValidator { + private static final Logger LOG = LoggerFactory.getLogger(StrictTopologyValidator.class); + + @Override + public void prepare(Map stormConf){ + } + + @Override + public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { + if (topologyName.contains(".")) { + throw new InvalidTopologyException(String.format("Topology name '%s' contains illegal character '.'", topologyName)); + } + Map<String, SpoutSpec> spouts = topology.get_spouts(); + for (String spoutName : spouts.keySet()) { + if (spoutName.contains(".")) { + throw new InvalidTopologyException(String.format("Spout name '%s' contains illegal character '.'", spoutName)); + } + SpoutSpec spoutSpec = spouts.get(spoutName); + for (String streamName : spoutSpec.get_common().get_streams().keySet()) { + if (streamName.contains(".")) { + throw new InvalidTopologyException(String.format("Stream name '%s' contains illegal character '.'", streamName)); + } + } + } + + Map<String, Bolt> bolts = topology.get_bolts(); + for (String boltName : bolts.keySet()) { + if (boltName.contains(".")) { + throw new InvalidTopologyException(String.format("Bolt name '%s' contains illegal character '.'", boltName)); + } + Bolt bolt = bolts.get(boltName); + for (String streamName : bolt.get_common().get_streams().keySet()) { + if (streamName.contains(".")) { + throw new InvalidTopologyException(String.format("Stream name '%s' contains illegal character '.'", streamName)); + } + } + } + } +}
