This is an automated email from the ASF dual-hosted git repository.

nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new ed6fd81  Add an example with Streamlet API using component config 
(#3496)
ed6fd81 is described below

commit ed6fd81924c8ea35546a0737abda98ccb21cbaa9
Author: Ning Wang <[email protected]>
AuthorDate: Wed Apr 8 13:16:01 2020 -0700

    Add an example with Streamlet API using component config (#3496)
    
    An example Streamlet topology demonstrating how to set cpu and memory 
limits per component.
---
 .../streamlet/ComponentConfigTopology.java         | 95 ++++++++++++++++++++++
 1 file changed, 95 insertions(+)

diff --git 
a/examples/src/java/org/apache/heron/examples/streamlet/ComponentConfigTopology.java
 
b/examples/src/java/org/apache/heron/examples/streamlet/ComponentConfigTopology.java
new file mode 100644
index 0000000..5ab8532
--- /dev/null
+++ 
b/examples/src/java/org/apache/heron/examples/streamlet/ComponentConfigTopology.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.heron.examples.streamlet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.examples.streamlet.utils.StreamletUtils;
+import org.apache.heron.streamlet.Builder;
+import org.apache.heron.streamlet.Config;
+import org.apache.heron.streamlet.Runner;
+
+/**
+ * This topology is an implementation of a simple topology using the
+ * Streamlet API. CPU and ram are configured for each component.
+ */
+public final class ComponentConfigTopology {
+  private ComponentConfigTopology() {
+  }
+
+  private static final Logger LOG =
+      Logger.getLogger(ComponentConfigTopology.class.getName());
+
+  private static final List<String> SENTENCES = Arrays.asList(
+      "I have nothing to declare but my genius",
+      "You can even",
+      "Compassion is an action word with no boundaries",
+      "To thine own self be true"
+  );
+
+  public static void main(String[] args) throws Exception {
+    Builder processingGraphBuilder = Builder.newBuilder();
+
+    processingGraphBuilder
+        // The origin of the processing graph: an indefinite series of 
sentences chosen
+        // from the list
+        .newSource(() -> StreamletUtils.randomFromList(SENTENCES))
+        .setName("random-sentences-source")
+        // Each sentence is "flattened" into a Streamlet<String> of individual 
words
+        .flatMap(sentence -> 
Arrays.asList(sentence.toLowerCase().split("\\s+")))
+        .setName("flatten-into-individual-words")
+        // The final output is logged using a user-supplied format
+        .consume(w -> {
+          String logMessage = String.format("(word: %s)", w);
+          LOG.info(logMessage);
+        })
+        .setName("consumer");
+
+    // The topology's parallelism (the number of containers across which the 
topology's
+    // processing instance will be split) can be defined via the second 
command-line
+    // argument (or else the default of 2 will be used).
+    int topologyParallelism = StreamletUtils.getParallelism(args, 2);
+
+    Config config = Config.newBuilder()
+        .setNumContainers(topologyParallelism)
+        .setPerContainerCpu(1)
+        .build();
+
+    config.getHeronConfig().setComponentCpu("random-sentences-source", 0.3);
+    config.getHeronConfig().setComponentRam("random-sentences-source",
+        ByteAmount.fromMegabytes(300));
+    config.getHeronConfig().setComponentCpu("flatten-into-individual-words", 
0.3);
+    config.getHeronConfig().setComponentRam("flatten-into-individual-words",
+        ByteAmount.fromMegabytes(300));
+    config.getHeronConfig().setComponentCpu("consumer", 0.2);
+    config.getHeronConfig().setComponentRam("consumer", 
ByteAmount.fromMegabytes(200));
+
+    // Fetches the topology name from the first command-line argument
+    String topologyName = StreamletUtils.getTopologyName(args);
+
+    // Finally, the processing graph and configuration are passed to the 
Runner, which converts
+    // the graph into a Heron topology that can be run in a Heron cluster.
+    new Runner().run(topologyName, config, processingGraphBuilder);
+  }
+}

Reply via email to