This is an automated email from the ASF dual-hosted git repository.

karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new ffae7af  Joshfischer/eco stateful topology (#2851)
ffae7af is described below

commit ffae7af382984534b824358b4397733267d20eb5
Author: Josh Fischer <j...@joshfischer.io>
AuthorDate: Fri Apr 6 00:04:43 2018 -0500

    Joshfischer/eco stateful topology (#2851)
    
    * saving work.
    
    * adding examples for stateful topologies with ECO
---
 .../twitter/heron/examples/eco/RandomString.java   | 51 +++++++++++++
 .../heron/examples/eco/StatefulConsumerBolt.java   | 66 +++++++++++++++++
 .../heron/examples/eco/StatefulNumberSpout.java    | 83 ++++++++++++++++++++++
 .../heron/examples/eco/StatefulRandomIntSpout.java | 73 +++++++++++++++++++
 .../heron/examples/eco/StatefulWindowSumBolt.java  | 67 +++++++++++++++++
 .../com/twitter/heron/examples/eco/WordSpout.java  | 64 +++++++++++++++++
 .../examples/eco/heron-stateful-windowing.yaml     | 52 ++++++++++++++
 .../examples/eco/heron-stateful-word-count.yaml    | 38 ++++++++++
 8 files changed, 494 insertions(+)

diff --git 
a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/RandomString.java 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/RandomString.java
new file mode 100644
index 0000000..6ece43c
--- /dev/null
+++ 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/RandomString.java
@@ -0,0 +1,51 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Random;
+
+public class RandomString {
+  private  char[] symbols;
+
+  private Random random = new Random();
+
+  private char[] buf;
+
+  public RandomString(int length) {
+    // Construct the symbol set
+    StringBuilder tmp = new StringBuilder();
+    for (char ch = '0'; ch <= '9'; ++ch) {
+      tmp.append(ch);
+    }
+
+    for (char ch = 'a'; ch <= 'z'; ++ch) {
+      tmp.append(ch);
+    }
+
+    symbols = tmp.toString().toCharArray();
+    if (length < 1) {
+      throw new IllegalArgumentException("length < 1: " + length);
+    }
+
+    buf = new char[length];
+  }
+
+  public String nextString() {
+    for (int idx = 0; idx < buf.length; ++idx) {
+      buf[idx] = symbols[random.nextInt(symbols.length)];
+    }
+
+    return new String(buf);
+  }
+}
diff --git 
a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulConsumerBolt.java
 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulConsumerBolt.java
new file mode 100644
index 0000000..bd5cbfe
--- /dev/null
+++ 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulConsumerBolt.java
@@ -0,0 +1,66 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+
+import com.twitter.heron.api.bolt.BaseRichBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.IStatefulComponent;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Tuple;
+
+
+public class StatefulConsumerBolt extends BaseRichBolt
+    implements IStatefulComponent<Integer, Integer> {
+  private static final long serialVersionUID = -5470591933906954522L;
+
+  private OutputCollector collector;
+  private State<Integer, Integer> myState;
+
+  @Override
+  public void initState(State<Integer, Integer> state) {
+    this.myState = state;
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    // Nothing really since we operate out of the system supplied state
+  }
+
+  @SuppressWarnings("rawtypes")
+  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
+    collector = outputCollector;
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    int key = tuple.getInteger(0);
+    System.out.println("looking in state for: " + key);
+    if (myState.get(key) == null) {
+      System.out.println("did not find " + key + " in state: ");
+      myState.put(key, 1);
+    } else {
+      System.out.println("found in state: " + key);
+      Integer val = myState.get(key);
+      myState.put(key, ++val);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+  }
+}
diff --git 
a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulNumberSpout.java
 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulNumberSpout.java
new file mode 100644
index 0000000..c3fb3db
--- /dev/null
+++ 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulNumberSpout.java
@@ -0,0 +1,83 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.logging.Logger;
+
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.IStatefulComponent;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.utils.Utils;
+
+@SuppressWarnings("HiddenField")
+public class StatefulNumberSpout extends BaseRichSpout
+    implements IStatefulComponent<String, Long> {
+  private static final Logger LOG = 
Logger.getLogger(StatefulNumberSpout.class.getName());
+  private static final long serialVersionUID = 5454291010750852782L;
+  private SpoutOutputCollector collector;
+  private Random rand;
+  private long msgId;
+  private State<String, Long> state;
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("value", "ts", "msgid"));
+  }
+
+  @Override
+  public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector
+      collector) {
+    this.collector = collector;
+    this.rand = new Random();
+  }
+
+  @Override
+  public void nextTuple() {
+    Utils.sleep(1000);
+    long val = msgId;
+    long randomNumber = System.currentTimeMillis() - (24 * 60 * 60 * 1000);
+    System.out.println("Emitting: " + val);
+    collector.emit(new Values(val,
+        randomNumber, msgId), msgId);
+    msgId++;
+  }
+
+  @Override
+  public void ack(Object msgId) {
+    LOG.fine("Got ACK for msgId : " + msgId);
+  }
+
+  @Override
+  public void fail(Object msgId) {
+    LOG.fine("Got FAIL for msgId : " + msgId);
+  }
+
+  @Override
+  public void initState(State<String, Long> state) {
+    this.state = state;
+    this.msgId = this.state.getOrDefault("msgId", 0L);
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    this.state.put("msgId", msgId);
+  }
+}
diff --git 
a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulRandomIntSpout.java
 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulRandomIntSpout.java
new file mode 100644
index 0000000..e128840
--- /dev/null
+++ 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulRandomIntSpout.java
@@ -0,0 +1,73 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.IStatefulComponent;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Values;
+
+import backtype.storm.utils.Utils;
+
+public class StatefulRandomIntSpout extends BaseRichSpout
+    implements IStatefulComponent<String, Integer> {
+  private SpoutOutputCollector spoutOutputCollector;
+  private State<String, Integer> count;
+
+  public StatefulRandomIntSpout() {
+  }
+
+  // Generates a random integer between 1 and 100
+  private int randomInt() {
+    return ThreadLocalRandom.current().nextInt(1, 101);
+  }
+
+  // These two methods are required to implement the IStatefulComponent 
interface
+  @Override
+  public void preSave(String checkpointId) {
+    System.out.println(String.format("Saving spout state at checkpoint %s", 
checkpointId));
+  }
+
+  @Override
+  public void initState(State<String, Integer> state) {
+    count = state;
+  }
+
+  // These three methods are required to extend the BaseRichSpout abstract 
class
+  @Override
+  public void open(Map<String, Object> map, TopologyContext ctx, 
SpoutOutputCollector collector) {
+    spoutOutputCollector = collector;
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("random-int"));
+  }
+
+  @Override
+  public void nextTuple() {
+    Utils.sleep(2000);
+    int randomInt = randomInt();
+    System.out.println("Emitting Value: " + randomInt);
+    spoutOutputCollector.emit(new Values(randomInt));
+  }
+}
+
diff --git 
a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulWindowSumBolt.java
 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulWindowSumBolt.java
new file mode 100644
index 0000000..8f721e4
--- /dev/null
+++ 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulWindowSumBolt.java
@@ -0,0 +1,67 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+
+import com.twitter.heron.api.bolt.BaseStatefulWindowedBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.windowing.TupleWindow;
+
+@SuppressWarnings("HiddenField")
+public class StatefulWindowSumBolt extends BaseStatefulWindowedBolt<String, 
Long> {
+  private static final long serialVersionUID = -539382497249834244L;
+  private State<String, Long> state;
+  private long sum;
+
+  private OutputCollector collector;
+
+  @Override
+  public void prepare(Map<String, Object> topoConf, TopologyContext context,
+                      OutputCollector collector) {
+    this.collector = collector;
+  }
+
+  @Override
+  public void initState(State<String, Long> state) {
+    this.state = state;
+    sum = state.getOrDefault("sum", 0L);
+  }
+
+  @Override
+  public void execute(TupleWindow inputWindow) {
+    for (Tuple tuple : inputWindow.get()) {
+      System.out.println("Adding to sum: " + tuple.getLongByField("value"));
+      sum += tuple.getLongByField("value");
+      System.out.println("Sum is now: " + sum);
+    }
+    collector.emit(new Values(sum));
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("sum"));
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    state.put("sum", sum);
+  }
+}
diff --git 
a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/WordSpout.java 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/WordSpout.java
new file mode 100644
index 0000000..3f3b6f1
--- /dev/null
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/WordSpout.java
@@ -0,0 +1,64 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+import java.util.Random;
+
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Values;
+
+@SuppressWarnings("HiddenField")
+public class WordSpout extends BaseRichSpout {
+  private static final long serialVersionUID = 4322775001819135036L;
+
+  private static final int ARRAY_LENGTH = 128 * 1024;
+  private static final int WORD_LENGTH = 20;
+
+  private final String[] words = new String[ARRAY_LENGTH];
+
+  private final Random rnd = new Random(31);
+
+  private SpoutOutputCollector collector;
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("word"));
+  }
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public void open(Map map, TopologyContext topologyContext,
+                   SpoutOutputCollector spoutOutputCollector) {
+    System.out.println("open spout");
+    RandomString randomString = new RandomString(WORD_LENGTH);
+
+    for (int i = 0; i < ARRAY_LENGTH; i++) {
+      words[i] = randomString.nextString();
+    }
+
+    collector = spoutOutputCollector;
+  }
+
+  @Override
+  public void nextTuple() {
+    System.out.println("next tuple");
+    int nextInt = rnd.nextInt(ARRAY_LENGTH);
+    collector.emit(new Values(words[nextInt]));
+  }
+}
diff --git 
a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-windowing.yaml
 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-windowing.yaml
new file mode 100644
index 0000000..9c9d5cf
--- /dev/null
+++ 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-windowing.yaml
@@ -0,0 +1,52 @@
+#  Copyright 2017 Twitter. All rights reserved.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#  http:#www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+name: "stateful-windowing"
+type: "heron"
+
+config:
+  topology.workers: 1
+  topology.reliability.mode: "EFFECTIVELY_ONCE"
+
+components:
+
+  - id: "windowLength"
+    className: "com.twitter.heron.api.bolt.BaseWindowedBolt$Count"
+    constructorArgs:
+      - 5
+
+  - id: "slidingInterval"
+    className: "com.twitter.heron.api.bolt.BaseWindowedBolt$Count"
+    constructorArgs:
+      - 3
+
+spouts:
+  - id: "integer-spout"
+    className: "com.twitter.heron.examples.eco.StatefulNumberSpout"
+    parallelism: 1
+
+bolts:
+  - id: "stateful-window-sum-bolt"
+    className: "com.twitter.heron.examples.eco.StatefulWindowSumBolt"
+    configMethods:
+      - name: "withWindow"
+        args: [ref: "windowLength", ref: "slidingInterval"]
+    parallelism: 1
+
+
+streams:
+  - from: "integer-spout"
+    to: "stateful-window-sum-bolt"
+    grouping:
+      type: SHUFFLE
\ No newline at end of file
diff --git 
a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-word-count.yaml
 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-word-count.yaml
new file mode 100644
index 0000000..e5838af
--- /dev/null
+++ 
b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-word-count.yaml
@@ -0,0 +1,38 @@
+#  Copyright 2017 Twitter. All rights reserved.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#  http:#www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+name: "stateful-word-count"
+type: "heron"
+
+config:
+  topology.workers: 1
+  topology.reliability.mode: "EFFECTIVELY_ONCE"
+
+
+spouts:
+  - id: "int-spout"
+    className: "com.twitter.heron.examples.eco.StatefulRandomIntSpout"
+    parallelism: 1
+
+bolts:
+  - id: "stateful-consumer-bolt"
+    className: "com.twitter.heron.examples.eco.StatefulConsumerBolt"
+    parallelism: 1
+
+
+streams:
+  - from: "int-spout"
+    to: "stateful-consumer-bolt"
+    grouping:
+      type: SHUFFLE
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
karth...@apache.org.

Reply via email to