Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2218#discussion_r130225709
--- Diff:
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
---
@@ -0,0 +1,596 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import org.apache.storm.Config;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateFactory;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.DefaultEvictionContext;
+import org.apache.storm.windowing.Event;
+import org.apache.storm.windowing.EventImpl;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
+import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
+import static
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
+
+/**
+ * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses
state and the underlying
+ * checkpointing mechanisms to save the tuples in window to state. The
tuples are also kept in-memory
+ * by transparently caching the window partitions and checkpointing them
as needed.
+ */
+public class PersistentWindowedBoltExecutor<T extends State> extends
WindowedBoltExecutor implements IStatefulBolt<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
+ private final IStatefulWindowedBolt<T> statefulWindowedBolt;
+ private transient TopologyContext topologyContext;
+ private transient OutputCollector outputCollector;
+ private transient WindowState<Tuple> state;
+ private transient boolean stateInitialized;
+ private transient boolean prePrepared;
+
+ public PersistentWindowedBoltExecutor(IStatefulWindowedBolt<T> bolt) {
+ super(bolt);
+ statefulWindowedBolt = bolt;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext
context, OutputCollector collector) {
+ List<String> registrations = (List<String>)
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
+ registrations.add(ConcurrentLinkedQueue.class.getName());
+ registrations.add(LinkedList.class.getName());
+ registrations.add(AtomicInteger.class.getName());
+ registrations.add(EventImpl.class.getName());
+ registrations.add(WindowPartition.class.getName());
+ registrations.add(DefaultEvictionContext.class.getName());
+ topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
+ prepare(topoConf, context, collector,
+ getWindowState(topoConf, context),
+ getPartitionState(topoConf, context),
+ getWindowSystemState(topoConf, context));
+ }
+
+ @Override
+ protected void validate(Map<String, Object> topoConf,
+ BaseWindowedBolt.Count windowLengthCount,
+ BaseWindowedBolt.Duration windowLengthDuration,
+ BaseWindowedBolt.Count slidingIntervalCount,
+ BaseWindowedBolt.Duration
slidingIntervalDuration) {
+ if (windowLengthCount == null && windowLengthDuration == null) {
+ throw new IllegalArgumentException("Window length is not
specified");
+ }
+ int interval = getCheckpointIntervalMillis(topoConf);
+ int timeout = getTopologyTimeoutMillis(topoConf);
+ if (interval > timeout) {
+ throw new
IllegalArgumentException(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL + interval +
+ " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
+ " value " + timeout);
+ }
+ }
+
+ private int getCheckpointIntervalMillis(Map<String, Object> topoConf) {
+ int checkpointInterval = Integer.MAX_VALUE;
+ if (topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL) !=
null) {
+ checkpointInterval = ((Number)
topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
+ }
+ return checkpointInterval;
+ }
+
+ // package access for unit tests
+ void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector,
+ KeyValueState<Long, WindowPartition<Tuple>> windowState,
+ KeyValueState<String, Deque<Long>> partitionState,
+ KeyValueState<String, Optional<?>> windowSystemState) {
+ init(topoConf, context, collector, windowState, partitionState,
windowSystemState);
+ doPrepare(topoConf, context, new NoAckOutputCollector(collector),
state, true);
+ Map<String, Optional<?>> wstate = new HashMap<>();
+ windowSystemState.forEach(s -> wstate.put(s.getKey(),
s.getValue()));
+ restoreState(wstate);
+ }
+
+ @Override
+ protected void start() {
+ if (stateInitialized) {
+ super.start();
+ } else {
+ LOG.debug("Will invoke start after state is initialized");
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (!stateInitialized) {
+ throw new IllegalStateException("execute invoked before
initState with input tuple " + input);
+ }
+ super.execute(input);
+ // StatefulBoltExecutor does the actual ack when the state is
saved.
+ outputCollector.ack(input);
+ }
+
+ @Override
+ public void initState(T state) {
+ if (stateInitialized) {
+ String msg = "initState invoked when the state is already
initialized";
+ LOG.warn(msg);
+ throw new IllegalStateException(msg);
+ } else {
+ statefulWindowedBolt.initState(state);
+ stateInitialized = true;
+ start();
+ }
+ }
+
+ @Override
+ public void prePrepare(long txid) {
+ if (stateInitialized) {
+ LOG.debug("Prepare streamState, txid {}", txid);
+ state.prepareCommit(txid);
+ prePrepared = true;
+ } else {
+ String msg = "Cannot prepare before initState";
+ LOG.warn(msg);
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ @Override
+ public void preCommit(long txid) {
+ // preCommit can be invoked during recovery before the state is
initialized
+ if (prePrepared || !stateInitialized) {
+ LOG.debug("Commit streamState, txid {}", txid);
+ state.commit(txid);
+ } else {
+ String msg = "preCommit before prePrepare in initialized
state";
+ LOG.warn(msg);
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ @Override
+ public void preRollback() {
+ LOG.debug("Rollback streamState, stateInitialized {}",
stateInitialized);
+ state.rollback();
+ }
+
+ @Override
+ protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
+ return new WindowLifecycleListener<Tuple>() {
+ @Override
+ public void onExpiry(List<Tuple> events) {
+ /*
+ * NO-OP: the events are ack-ed in execute
+ */
+ }
+
+ @Override
+ public void onActivation(Supplier<Iterator<Tuple>> eventsIt,
+ Supplier<Iterator<Tuple>> newEventsIt,
+ Supplier<Iterator<Tuple>> expiredIt,
+ Long timestamp) {
+ /*
+ * Here we don't set the tuples in
windowedOutputCollector's context and emit un-anchored.
+ * The checkpoint tuple will trigger a checkpoint in the
receiver with the emitted tuples.
+ */
+ boltExecute(eventsIt, newEventsIt, expiredIt, timestamp);
+ state.clearIteratorPins();
+ }
+ };
+ }
+
+ private void init(Map<String, Object> topoConf, TopologyContext
context, OutputCollector collector,
+ KeyValueState<Long, WindowPartition<Tuple>>
windowState,
+ KeyValueState<String, Deque<Long>> partitionState,
+ KeyValueState<String, Optional<?>>
windowSystemState) {
+ topologyContext = context;
+ outputCollector = collector;
+ state = new WindowState<>(windowState, partitionState,
windowSystemState, this::getState,
+ statefulWindowedBolt.maxEventsInMemory());
+ }
+
+ private KeyValueState<Long, WindowPartition<Tuple>>
getWindowState(Map<String, Object> topoConf, TopologyContext context) {
+ String namespace = context.getThisComponentId() + "-" +
context.getThisTaskId() + "-window";
+ return (KeyValueState<Long, WindowPartition<Tuple>>)
StateFactory.getState(namespace, topoConf, context);
+ }
+
+ private KeyValueState<String, Deque<Long>>
getPartitionState(Map<String, Object> topoConf, TopologyContext context) {
+ String namespace = context.getThisComponentId() + "-" +
context.getThisTaskId() + "-window-partitions";
+ return (KeyValueState<String, Deque<Long>>)
StateFactory.getState(namespace, topoConf, context);
+ }
+
+ private KeyValueState<String, Optional<?>>
getWindowSystemState(Map<String, Object> topoConf, TopologyContext context) {
+ String namespace = context.getThisComponentId() + "-" +
context.getThisTaskId() + "-window-systemstate";
+ return (KeyValueState<String, Optional<?>>)
StateFactory.getState(namespace, topoConf, context);
+ }
+
+ // a wrapper around the window related states that are checkpointed
+ private static class WindowState<T> extends
AbstractCollection<Event<T>> {
+ // number of events per window-partition
+ private static final int MAX_PARTITION_EVENTS = 1000;
+ private static final int MIN_PARTITIONS = 10;
+ private static final String PARTITION_IDS_KEY = "pk";
+ private final KeyValueState<String, Deque<Long>> partitionIdsState;
+ private final KeyValueState<Long, WindowPartition<T>>
windowPartitionsState;
+ private final KeyValueState<String, Optional<?>> windowSystemState;
+ // ordered partition keys
+ private Deque<Long> partitionIds;
+ private volatile long latestPartitionId;
+ private WindowPartitionCache<Long, WindowPartition<T>> cache;
+ private Supplier<Map<String, Optional<?>>>
windowSystemStateSupplier;
+ private final ReentrantLock partitionIdsLock = new
ReentrantLock(true);
+ private final ReentrantLock windowPartitionsLock = new
ReentrantLock(true);
+ private final long maxEventsInMemory;
+ private WindowPartition<T> latestPartition;
+ private Set<Long> iteratorPins = new HashSet<>();
+
+ WindowState(KeyValueState<Long, WindowPartition<T>>
windowPartitionsState,
+ KeyValueState<String, Deque<Long>> partitionIdsState,
+ KeyValueState<String, Optional<?>> windowSystemState,
+ Supplier<Map<String, Optional<?>>>
windowSystemStateSupplier,
+ long maxEventsInMemory) {
+ this.windowPartitionsState = windowPartitionsState;
+ this.partitionIdsState = partitionIdsState;
+ this.windowSystemState = windowSystemState;
+ this.windowSystemStateSupplier = windowSystemStateSupplier;
+ this.maxEventsInMemory = Math.max(MAX_PARTITION_EVENTS *
MIN_PARTITIONS, maxEventsInMemory);
+ initCache();
+ initPartitions();
+ }
+
+ @Override
+ public boolean add(Event<T> event) {
+ if (latestPartition.size() >= MAX_PARTITION_EVENTS) {
+ cache.unpin(latestPartition.getId());
+ latestPartition = getPinnedPartition(getNextPartitionId());
+ }
+ latestPartition.add(event);
+ return true;
+ }
+
+ @Override
+ public Iterator<Event<T>> iterator() {
+
+ return new Iterator<Event<T>>() {
+ private Iterator<Long> ids = getIds();
+ private Iterator<Event<T>> current = emptyIterator();
+ private Iterator<Event<T>> removeFrom;
+ private WindowPartition<T> curPartition;
+
+ private Iterator<Long> getIds() {
+ try {
+ partitionIdsLock.lock();
+ LOG.debug("Iterator partitionIds: {}",
partitionIds);
+ return new ArrayList<>(partitionIds).iterator();
+ } finally {
+ partitionIdsLock.unlock();
+ }
+ }
+
+ @Override
+ public void remove() {
+ if (removeFrom == null) {
+ throw new IllegalStateException("No calls to
next() since last call to remove()");
+ }
+ removeFrom.remove();
+ removeFrom = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean curHasNext = current.hasNext();
+ while (!curHasNext && ids.hasNext()) {
+ if (curPartition != null) {
+ unpin(curPartition.getId());
+ }
+ curPartition = getPinnedPartition(ids.next());
+ if (curPartition != null) {
+ iteratorPins.add(curPartition.getId());
+ current = curPartition.iterator();
+ curHasNext = current.hasNext();
+ }
+ }
+ // un-pin the last partition
+ if (!curHasNext && curPartition != null) {
+ unpin(curPartition.getId());
+ curPartition = null;
+ }
+ return curHasNext;
+ }
+
+ @Override
+ public Event<T> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ removeFrom = current;
+ return current.next();
+ }
+
+ private void unpin(long id) {
+ cache.unpin(id);
+ iteratorPins.remove(id);
+ }
+ };
+ }
+
+ void clearIteratorPins() {
+ LOG.debug("clearIteratorPins '{}'", iteratorPins);
+ Iterator<Long> it = iteratorPins.iterator();
+ while (it.hasNext()) {
+ cache.unpin(it.next());
+ it.remove();
+ }
+ }
+
+ @Override
+ public int size() {
+ throw new UnsupportedOperationException();
+ }
+
+ void prepareCommit(long txid) {
+ flush();
+ partitionIdsState.prepareCommit(txid);
+ windowPartitionsState.prepareCommit(txid);
+ windowSystemState.prepareCommit(txid);
+ }
+
+ void commit(long txid) {
+ partitionIdsState.commit(txid);
+ windowPartitionsState.commit(txid);
+ windowSystemState.commit(txid);
+ }
+
+ void rollback() {
+ partitionIdsState.rollback();
+ windowPartitionsState.rollback();
+ windowSystemState.rollback();
+ }
+
+ private void initPartitions() {
+ partitionIds = partitionIdsState.get(PARTITION_IDS_KEY, new
LinkedList<>());
+ if (partitionIds.isEmpty()) {
+ partitionIds.add(0L);
+ partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
+ }
+ latestPartitionId = partitionIds.peekLast();
+ latestPartition = cache.getPinned(latestPartitionId);
+ }
+
+ private void initCache() {
+ long size = maxEventsInMemory / MAX_PARTITION_EVENTS;
+ LOG.info("maxEventsInMemory: {}, partition size: {}, number of
partitions: {}",
+ maxEventsInMemory, MAX_PARTITION_EVENTS, size);
+ cache = SimpleWindowPartitionCache.<Long,
WindowPartition<T>>newBuilder()
+ .maximumSize(size)
+ .removalListener(new RemovalListener<Long,
WindowPartition<T>>() {
+ @Override
+ public void onRemoval(Long pid, WindowPartition<T> p,
RemovalCause removalCause) {
+ Objects.requireNonNull(pid, "Null partition id");
+ Objects.requireNonNull(p, "Null window partition");
+ LOG.debug("onRemoval for id '{}', WindowPartition
'{}'", pid, p);
+ try {
+ windowPartitionsLock.lock();
+ if (p.isEmpty() && pid != latestPartitionId) {
+ // if the empty partition was not
invalidated by flush, but evicted from cache
+ if (removalCause != RemovalCause.EXPLICIT)
{
+ deletePartition(pid);
+ windowPartitionsState.delete(pid);
+ }
+ } else if (p.isModified()) {
+ windowPartitionsState.put(pid, p);
+ } else {
+ LOG.debug("WindowPartition '{}' is not
modified", pid);
+ }
+ } finally {
+ windowPartitionsLock.unlock();
+ }
+ }
+ }).build(new CacheLoader<Long, WindowPartition<T>>() {
+ @Override
+ public WindowPartition<T> load(Long id) {
+ LOG.debug("Load partition: {}", id);
+ // load from state
+ try {
+ windowPartitionsLock.lock();
+ return windowPartitionsState.get(id, new
WindowPartition<>(id));
+ } finally {
+ windowPartitionsLock.unlock();
+ }
+ }
+ }
+ );
+ }
+
+ private void deletePartition(long pid) {
+ LOG.debug("Delete partition: {}", pid);
+ try {
+ partitionIdsLock.lock();
+ partitionIds.remove(pid);
+ partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
+ } finally {
+ partitionIdsLock.unlock();
+ }
+ }
+
+ private long getNextPartitionId() {
+ try {
+ partitionIdsLock.lock();
+ partitionIds.add(++latestPartitionId);
+ partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
+ } finally {
+ partitionIdsLock.unlock();
+ }
+ return latestPartitionId;
+ }
+
+ private WindowPartition<T> getPinnedPartition(long id) {
+ return cache.getPinned(id);
+ }
+
+ private void flush() {
+ LOG.debug("Flushing modified partitions");
+ cache.asMap().forEach((pid, p) -> {
+ Long pidToInvalidate = null;
+ try {
+ windowPartitionsLock.lock();
+ if (p.isEmpty() && pid != latestPartitionId) {
+ LOG.debug("Invalidating empty partition {}", pid);
+ deletePartition(pid);
+ windowPartitionsState.delete(pid);
+ pidToInvalidate = pid;
+ } else if (p.isModified()) {
+ LOG.debug("Updating modified partition {}", pid);
+ p.clearModified();
+ windowPartitionsState.put(pid, p);
+ }
+ } finally {
+ windowPartitionsLock.unlock();
+ }
+ // invalidate after releasing the lock
+ if (pidToInvalidate != null) {
+ cache.invalidate(pidToInvalidate);
+ }
+ });
+
windowSystemStateSupplier.get().forEach(windowSystemState::put);
+ }
+ }
+
+ // the window partition that holds the events
+ public static class WindowPartition<T> implements Iterable<Event<T>> {
+ private final ConcurrentLinkedQueue<Event<T>> events = new
ConcurrentLinkedQueue<>();
+ private final AtomicInteger size = new AtomicInteger();
+ private final long id;
+ private transient volatile boolean modified;
+
+ public WindowPartition(long id) {
+ this.id = id;
+ }
+
+ void add(Event<T> event) {
+ events.add(event);
+ size.incrementAndGet();
+ setModified();
+ }
+
+ boolean isModified() {
+ return modified;
+ }
+
+ void setModified() {
+ if (!modified) {
+ modified = true;
+ }
+ }
+
+ void clearModified() {
+ if (modified) {
--- End diff --
This doesn't seem necessary?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---