This is an automated email from the ASF dual-hosted git repository. karthikz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new 805a401 make EvitionContext extend Serializable (#2935) 805a401 is described below commit 805a401a14d5f211679262cb226176e0742340ea Author: Neng Lu <freen...@gmail.com> AuthorDate: Sat Jun 23 11:00:30 2018 -0700 make EvitionContext extend Serializable (#2935) * make EvitionContext extend Serializable * fix typo --- .../api/StatefulTumblingWindowTopology.java | 181 +++++++++++++++++++++ .../heron/api/windowing/EvictionContext.java | 4 +- 2 files changed, 184 insertions(+), 1 deletion(-) diff --git a/examples/src/java/org/apache/heron/examples/api/StatefulTumblingWindowTopology.java b/examples/src/java/org/apache/heron/examples/api/StatefulTumblingWindowTopology.java new file mode 100644 index 0000000..b39e3ff --- /dev/null +++ b/examples/src/java/org/apache/heron/examples/api/StatefulTumblingWindowTopology.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.examples.api; + +import java.time.Duration; +import java.util.Map; +import java.util.Random; +import java.util.logging.Logger; + +import org.apache.heron.api.Config; +import org.apache.heron.api.HeronSubmitter; +import org.apache.heron.api.bolt.BaseStatefulWindowedBolt; +import org.apache.heron.api.bolt.OutputCollector; +import org.apache.heron.api.spout.BaseRichSpout; +import org.apache.heron.api.spout.SpoutOutputCollector; +import org.apache.heron.api.state.State; +import org.apache.heron.api.topology.IStatefulComponent; +import org.apache.heron.api.topology.OutputFieldsDeclarer; +import org.apache.heron.api.topology.TopologyBuilder; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.api.tuple.Fields; +import org.apache.heron.api.tuple.Tuple; +import org.apache.heron.api.tuple.Values; +import org.apache.heron.api.utils.Utils; +import org.apache.heron.api.windowing.TupleWindow; +import org.apache.heron.common.basics.ByteAmount; +import org.apache.heron.examples.api.bolt.PrinterBolt; + +/** + * A sample topology that demonstrates the usage of {@link org.apache.heron.api.bolt.IStatefulWindowedBolt} + * to calculate tumbling window sum. Topology also demonstrates how stateful window processing is done + * in conjunction with effectively once guarantees + */ +public final class StatefulTumblingWindowTopology { + + private static final Logger LOG = Logger.getLogger(StatefulSlidingWindowTopology.class.getName()); + + private StatefulTumblingWindowTopology() { + } + + private static class WindowSumBolt extends BaseStatefulWindowedBolt<String, Long> { + private static final long serialVersionUID = -539382497249834244L; + private State<String, Long> state; + private long sum; + + private OutputCollector collector; + + @Override + @SuppressWarnings("HiddenField") + public void prepare(Map<String, Object> topoConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + } + + @Override + @SuppressWarnings("HiddenField") + public void initState(State<String, Long> state) { + this.state = state; + sum = state.getOrDefault("sum", 0L); + } + + @Override + public void execute(TupleWindow inputWindow) { + for (Tuple tuple : inputWindow.get()) { + sum += tuple.getLongByField("value"); + } + collector.emit(new Values(sum)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sum")); + } + + @Override + public void preSave(String checkpointId) { + state.put("sum", sum); + } + } + + public static class IntegerSpout extends BaseRichSpout + implements IStatefulComponent<String, Long> { + private static final Logger LOG = Logger.getLogger(IntegerSpout.class.getName()); + private static final long serialVersionUID = 5454291010750852782L; + private SpoutOutputCollector collector; + private Random rand; + private long msgId; + private State<String, Long> state; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("value", "ts", "msgid")); + } + + @Override + @SuppressWarnings("HiddenField") + public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector + collector) { + this.collector = collector; + this.rand = new Random(); + } + + @Override + public void nextTuple() { + Utils.sleep(1000); + long val = msgId; + collector.emit(new Values(val, + System.currentTimeMillis() - (24 * 60 * 60 * 1000), msgId), msgId); + msgId++; + } + + @Override + @SuppressWarnings("HiddenField") + public void ack(Object msgId) { + LOG.fine("Got ACK for msgId : " + msgId); + } + + @Override + @SuppressWarnings("HiddenField") + public void fail(Object msgId) { + LOG.fine("Got FAIL for msgId : " + msgId); + } + + @Override + @SuppressWarnings("HiddenField") + public void initState(State<String, Long> state) { + this.state = state; + this.msgId = this.state.getOrDefault("msgId", 0L); + } + + @Override + public void preSave(String checkpointId) { + this.state.put("msgId", msgId); + } + } + + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("integer", new IntegerSpout(), 1); + WindowSumBolt windowSumBolt = new WindowSumBolt(); + windowSumBolt.withTumblingWindow(Duration.ofSeconds(10)); + builder.setBolt("sumbolt", windowSumBolt, 1).shuffleGrouping("integer"); + builder.setBolt("printer", new PrinterBolt()).shuffleGrouping("sumbolt"); + Config conf = new Config(); + conf.setDebug(true); + String topoName = "test"; + + Config.setComponentRam(conf, "integer", ByteAmount.fromGigabytes(1)); + Config.setComponentRam(conf, "sumbolt", ByteAmount.fromGigabytes(1)); + Config.setComponentRam(conf, "printer", ByteAmount.fromGigabytes(1)); + + Config.setContainerDiskRequested(conf, ByteAmount.fromGigabytes(5)); + Config.setContainerCpuRequested(conf, 4); + + conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE); + conf.setTopologyStatefulCheckpointIntervalSecs(20); + conf.setMaxSpoutPending(1000); + + if (args != null && args.length > 0) { + topoName = args[0]; + } + HeronSubmitter.submitTopology(topoName, conf, builder.createTopology()); + } +} diff --git a/heron/api/src/java/org/apache/heron/api/windowing/EvictionContext.java b/heron/api/src/java/org/apache/heron/api/windowing/EvictionContext.java index 9aeb37d..1282e83 100644 --- a/heron/api/src/java/org/apache/heron/api/windowing/EvictionContext.java +++ b/heron/api/src/java/org/apache/heron/api/windowing/EvictionContext.java @@ -19,10 +19,12 @@ package org.apache.heron.api.windowing; +import java.io.Serializable; + /** * Context information that can be used by the eviction policy */ -public interface EvictionContext { +public interface EvictionContext extends Serializable { /** * Returns the reference time that the eviction policy could use to * evict the events. In the case of event time processing, this would be