This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 381076b Clean up rate limit config and add an example topology (#3350)
381076b is described below
commit 381076b9df0756ee976a808bed7a97be0764082a
Author: Ning Wang <[email protected]>
AuthorDate: Tue Oct 1 09:10:51 2019 -0700
Clean up rate limit config and add an example topology (#3350)
---
.../examples/api/ComponentConfigTopology.java | 120 +++++++++++++++++++++
.../api/src/java/org/apache/heron/api/Config.java | 30 +++++-
2 files changed, 145 insertions(+), 5 deletions(-)
diff --git
a/examples/src/java/org/apache/heron/examples/api/ComponentConfigTopology.java
b/examples/src/java/org/apache/heron/examples/api/ComponentConfigTopology.java
new file mode 100644
index 0000000..132dc8f
--- /dev/null
+++
b/examples/src/java/org/apache/heron/examples/api/ComponentConfigTopology.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.examples.api;
+
+import java.util.Map;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.HeronSubmitter;
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.metric.GlobalMetrics;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.utils.Utils;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.examples.api.spout.TestWordSpout;
+import org.apache.heron.simulator.Simulator;
+
+
+/**
+ * This is a basic example of a Storm topology.
+ */
+public final class ComponentConfigTopology {
+
+ private ComponentConfigTopology() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("word", new TestWordSpout(), 2)
+ // Set rate limit to 1000 bytes per second (since parallelism is set
to 2,
+ // each instance is rate limited as 500 bps).
+ .addConfiguration(Config.TOPOLOGY_COMPONENT_OUTPUT_BPS, 1000);
+ builder.setBolt("exclaim1", new ExclamationBolt(), 2)
+ .shuffleGrouping("word");
+
+ Config conf = new Config();
+ conf.setDebug(true);
+ conf.setMaxSpoutPending(10);
+
+ // component resource configuration
+ conf.setComponentCpu("word", 0.5);
+ conf.setComponentRam("word", ByteAmount.fromMegabytes(512));
+ conf.setComponentDisk("word", ByteAmount.fromMegabytes(512));
+ conf.setComponentCpu("exclaim1", 0.5);
+ conf.setComponentRam("exclaim1", ByteAmount.fromMegabytes(512));
+ conf.setComponentDisk("exclaim1", ByteAmount.fromMegabytes(512));
+
+ // container resource configuration
+ conf.setContainerDiskRequested(ByteAmount.fromGigabytes(2));
+ conf.setContainerRamRequested(ByteAmount.fromGigabytes(3));
+ conf.setContainerCpuRequested(2);
+
+ // Specify the size of RAM padding to per container.
+ // Notice, this config will be considered as a hint,
+ // and it's up to the packing algorithm to determine whether to apply this
hint
+ conf.setContainerRamPadding(ByteAmount.fromGigabytes(2));
+
+ if (args != null && args.length > 0) {
+ conf.setNumStmgrs(2);
+ HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());
+ } else {
+ Simulator simulator = new Simulator();
+ simulator.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(10000);
+ simulator.killTopology("test");
+ simulator.shutdown();
+ }
+ }
+
+ public static class ExclamationBolt extends BaseRichBolt {
+ private static final long serialVersionUID = 2165326630789117557L;
+ private long nItems;
+ private long startTime;
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void prepare(
+ Map conf,
+ TopologyContext context,
+ OutputCollector collector) {
+ nItems = 0;
+ startTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (++nItems % 100000 == 0) {
+ long latency = System.currentTimeMillis() - startTime;
+ System.out.println("Bolt processed " + nItems + " tuples in " +
latency + " ms");
+ GlobalMetrics.incr("selected_items");
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/api/Config.java
b/heron/api/src/java/org/apache/heron/api/Config.java
index bc7ca17..3bce3a1 100644
--- a/heron/api/src/java/org/apache/heron/api/Config.java
+++ b/heron/api/src/java/org/apache/heron/api/Config.java
@@ -378,7 +378,11 @@ public class Config extends HashMap<String, Object> {
"topology.droptuples.upon.backpressure";
/**
- * The per component output bytes per second in this topology.
+ * The per component output bytes per second (rate limit) in this topology.
It works with
+ * the addConfiguration() function in ComponentConfigurationDeclarer class.
+ * Example:
+ *
builder.setSpout(...).addConfiguration(Config.TOPOLOGY_COMPONENT_OUTPUT_BPS,
1000);
+ *
builder.setBolt(...).addConfiguration(Config.TOPOLOGY_COMPONENT_OUTPUT_BPS,
1000);
*/
public static final String TOPOLOGY_COMPONENT_OUTPUT_BPS =
"topology.component.output.bps";
@@ -646,16 +650,24 @@ public class Config extends HashMap<String, Object> {
conf.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, hooks);
}
- public static void setTopologyComponentOutputBPS(Map<String, Object> conf,
long bps) {
- conf.put(Config.TOPOLOGY_COMPONENT_OUTPUT_BPS, String.valueOf(bps));
- }
-
@SuppressWarnings("unchecked")
public static List<String> getAutoTaskHooks(Map<String, Object> conf) {
return (List<String>) conf.get(Config.TOPOLOGY_AUTO_TASK_HOOKS);
}
/**
+ * This function should not be used to set rate limiter in topology config.
+ * @deprecated use the TOPOLOGY_COMPONENT_OUTPUT_BPS config with
ComponentConfigurationDeclarer's
+ * addConfiguration() instead.
+ * Example:
+ *
builder.setSpout(...).addConfiguration(Config.TOPOLOGY_COMPONENT_OUTPUT_BPS,
1000);
+ */
+ @Deprecated
+ public static void setTopologyComponentOutputBPS(Map<String, Object> conf,
long bps) {
+ conf.put(Config.TOPOLOGY_COMPONENT_OUTPUT_BPS, String.valueOf(bps));
+ }
+
+ /**
* Users should use the version of this method at uses ByteAmount
* @deprecated use
* setComponentRam(Map<String, Object> conf, String component,
ByteAmount ramInBytes)
@@ -1030,6 +1042,14 @@ public class Config extends HashMap<String, Object> {
this.put(Config.TOPOLOGY_DROPTUPLES_UPON_BACKPRESSURE,
String.valueOf(dropTuples));
}
+ /**
+ * This function should not be used to set rate limiter in topology config.
+ * @deprecated use the TOPOLOGY_COMPONENT_OUTPUT_BPS config with
ComponentConfigurationDeclarer's
+ * addConfiguration() instead.
+ * Example:
+ *
builder.setSpout(...).addConfiguration(Config.TOPOLOGY_COMPONENT_OUTPUT_BPS,
1000);
+ */
+ @Deprecated
public void setTopologyComponentOutputBPS(long bps) {
this.put(Config.TOPOLOGY_COMPONENT_OUTPUT_BPS, String.valueOf(bps));
}