This is an automated email from the ASF dual-hosted git repository.
nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new ed6fd81 Add an example with Streamlet API using component config
(#3496)
ed6fd81 is described below
commit ed6fd81924c8ea35546a0737abda98ccb21cbaa9
Author: Ning Wang <[email protected]>
AuthorDate: Wed Apr 8 13:16:01 2020 -0700
Add an example with Streamlet API using component config (#3496)
An example Streamlet topology demonstrating how to set cpu and memory
limits per component.
---
.../streamlet/ComponentConfigTopology.java | 95 ++++++++++++++++++++++
1 file changed, 95 insertions(+)
diff --git
a/examples/src/java/org/apache/heron/examples/streamlet/ComponentConfigTopology.java
b/examples/src/java/org/apache/heron/examples/streamlet/ComponentConfigTopology.java
new file mode 100644
index 0000000..5ab8532
--- /dev/null
+++
b/examples/src/java/org/apache/heron/examples/streamlet/ComponentConfigTopology.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.examples.streamlet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.examples.streamlet.utils.StreamletUtils;
+import org.apache.heron.streamlet.Builder;
+import org.apache.heron.streamlet.Config;
+import org.apache.heron.streamlet.Runner;
+
+/**
+ * This topology is an implementation of a simple topology using the
+ * Streamlet API. CPU and ram are configured for each component.
+ */
+public final class ComponentConfigTopology {
+ private ComponentConfigTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(ComponentConfigTopology.class.getName());
+
+ private static final List<String> SENTENCES = Arrays.asList(
+ "I have nothing to declare but my genius",
+ "You can even",
+ "Compassion is an action word with no boundaries",
+ "To thine own self be true"
+ );
+
+ public static void main(String[] args) throws Exception {
+ Builder processingGraphBuilder = Builder.newBuilder();
+
+ processingGraphBuilder
+ // The origin of the processing graph: an indefinite series of
sentences chosen
+ // from the list
+ .newSource(() -> StreamletUtils.randomFromList(SENTENCES))
+ .setName("random-sentences-source")
+ // Each sentence is "flattened" into a Streamlet<String> of individual
words
+ .flatMap(sentence ->
Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .setName("flatten-into-individual-words")
+ // The final output is logged using a user-supplied format
+ .consume(w -> {
+ String logMessage = String.format("(word: %s)", w);
+ LOG.info(logMessage);
+ })
+ .setName("consumer");
+
+ // The topology's parallelism (the number of containers across which the
topology's
+ // processing instance will be split) can be defined via the second
command-line
+ // argument (or else the default of 2 will be used).
+ int topologyParallelism = StreamletUtils.getParallelism(args, 2);
+
+ Config config = Config.newBuilder()
+ .setNumContainers(topologyParallelism)
+ .setPerContainerCpu(1)
+ .build();
+
+ config.getHeronConfig().setComponentCpu("random-sentences-source", 0.3);
+ config.getHeronConfig().setComponentRam("random-sentences-source",
+ ByteAmount.fromMegabytes(300));
+ config.getHeronConfig().setComponentCpu("flatten-into-individual-words",
0.3);
+ config.getHeronConfig().setComponentRam("flatten-into-individual-words",
+ ByteAmount.fromMegabytes(300));
+ config.getHeronConfig().setComponentCpu("consumer", 0.2);
+ config.getHeronConfig().setComponentRam("consumer",
ByteAmount.fromMegabytes(200));
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the
Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, processingGraphBuilder);
+ }
+}