Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2218#discussion_r129763468
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
    @@ -0,0 +1,563 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.topology;
    +
    +import java.util.AbstractCollection;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Deque;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Supplier;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.state.KeyValueState;
    +import org.apache.storm.state.State;
    +import org.apache.storm.state.StateFactory;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.windowing.DefaultEvictionContext;
    +import org.apache.storm.windowing.Event;
    +import org.apache.storm.windowing.EventImpl;
    +import org.apache.storm.windowing.WindowLifecycleListener;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static java.util.Collections.emptyIterator;
    +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
    +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
    +import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
    +
    +/**
    + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
    + * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
    + * by transparently caching the window partitions and checkpointing them 
as needed.
    + */
    +public class PersistentWindowedBoltExecutor<T extends State> extends 
WindowedBoltExecutor implements IStatefulBolt<T> {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
    +    private final IStatefulWindowedBolt<T> statefulWindowedBolt;
    +    private transient TopologyContext topologyContext;
    +    private transient OutputCollector outputCollector;
    +    private transient WindowState<Tuple> state;
    +    private transient boolean stateInitialized;
    +    private transient boolean prePrepared;
    +
    +    public PersistentWindowedBoltExecutor(IStatefulWindowedBolt<T> bolt) {
    +        super(bolt);
    +        statefulWindowedBolt = bolt;
    +    }
    +
    +    @Override
    +    public void prepare(Map<String, Object> topoConf, TopologyContext 
context, OutputCollector collector) {
    +        List<String> registrations = (List<String>) 
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
    +        registrations.add(ConcurrentLinkedQueue.class.getName());
    +        registrations.add(LinkedList.class.getName());
    +        registrations.add(AtomicInteger.class.getName());
    +        registrations.add(EventImpl.class.getName());
    +        registrations.add(WindowPartition.class.getName());
    +        registrations.add(DefaultEvictionContext.class.getName());
    +        topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
    +        prepare(topoConf, context, collector,
    +            getWindowState(topoConf, context),
    +            getPartitionState(topoConf, context),
    +            getWindowSystemState(topoConf, context));
    +    }
    +
    +    // package access for unit tests
    +    void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector,
    +                 KeyValueState<Long, WindowPartition<Tuple>> windowState,
    +                 KeyValueState<String, Deque<Long>> partitionState,
    +                 KeyValueState<String, Optional<?>> windowSystemState) {
    +        init(topoConf, context, collector, windowState, partitionState, 
windowSystemState);
    +        doPrepare(topoConf, context, new NoAckOutputCollector(collector), 
state, true);
    +        Map<String, Optional<?>> wstate = new HashMap<>();
    +        windowSystemState.forEach(s -> wstate.put(s.getKey(), 
s.getValue()));
    +        restoreState(wstate);
    +    }
    +
    +    @Override
    +    protected void start() {
    +        if (stateInitialized) {
    +            super.start();
    +        } else {
    +            LOG.debug("Will invoke start after state is initialized");
    +        }
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        if (!stateInitialized) {
    +            throw new IllegalStateException("execute invoked before 
initState with input tuple " + input);
    +        }
    +        super.execute(input);
    +        // StatefulBoltExecutor does the actual ack when the state is 
saved.
    +        outputCollector.ack(input);
    +    }
    +
    +    @Override
    +    public void initState(T state) {
    +        if (stateInitialized) {
    +            LOG.warn("State is already initialized. Ignoring initState");
    --- End diff --
    
    If a worker crashes, all worker's state are rolled back and an initState 
message is sent across the topology so that crashed workers can initialize 
their state. This is already handled in `StatefulBoltExecutor` that wraps the 
`PersistentWindowedBoltExecutor` so here its Ok to throw an exception. Will 
address this and revisit the others.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to