Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r160724253
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateManagerImpl.java
 ---
    @@ -0,0 +1,640 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.TaskInfo;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.fs.CloseableRegistry;
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
    +import org.apache.flink.runtime.state.CheckpointStreamFactory;
    +import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
    +import org.apache.flink.runtime.state.KeyGroupsStateHandle;
    +import org.apache.flink.runtime.state.KeyedStateHandle;
    +import org.apache.flink.runtime.state.OperatorStateBackend;
    +import org.apache.flink.runtime.state.OperatorStateHandle;
    +import org.apache.flink.runtime.state.StateBackend;
    +import org.apache.flink.runtime.state.StatePartitionStreamProvider;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.runtime.state.TaskStateManager;
    +import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.CloseableIterable;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.commons.io.IOUtils;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * This class is the main implementation of a {@link 
StreamTaskStateManager}. This class obtains the state to create
    + * {@link StreamOperatorStateContext} objects for stream operators from 
the {@link TaskStateManager} of the task that
    + * runs the stream task and hence the operator.
    + *
    + * <p>This implementation operates on top a {@link TaskStateManager}, from 
which it receives everything required to
    + * restore state in the backends from checkpoints or savepoints.
    + */
    +public class StreamTaskStateManagerImpl implements StreamTaskStateManager {
    +
    +   /**
    +    * The environment of the task. This is required as parameter to 
construct state backends via their factory.
    +    */
    +   private final Environment environment;
    +
    +   /** This processing time service is required to construct an internal 
timer service manager. */
    +   private final ProcessingTimeService processingTimeService;
    +
    +   /** The state manager of the tasks provides the information used to 
restore potential previous state. */
    +   private final TaskStateManager taskStateManager;
    +
    +   /** This object is the factory for everything related to state backends 
and checkpointing. */
    +   private final StateBackend stateBackend;
    +
    +   public StreamTaskStateManagerImpl(
    +           Environment environment,
    +           StateBackend stateBackend,
    +           ProcessingTimeService processingTimeService) {
    +
    +           this.environment = environment;
    +           this.taskStateManager = 
Preconditions.checkNotNull(environment.getTaskStateManager());
    +           this.stateBackend = Preconditions.checkNotNull(stateBackend);
    +           this.processingTimeService = processingTimeService;
    +   }
    +
    +   // 
-----------------------------------------------------------------------------------------------------------------
    +
    +   @Override
    +   public StreamOperatorStateContext streamOperatorStateContext(
    +           AbstractStreamOperator<?> operator,
    +           TypeSerializer<?> keySerializer,
    +           CloseableRegistry streamTaskCloseableRegistry) throws Exception 
{
    +
    +           TaskInfo taskInfo = environment.getTaskInfo();
    +           OperatorSubtaskDescriptionText operatorSubtaskDescription =
    +                   new OperatorSubtaskDescriptionText(
    +                           operator.getOperatorID(),
    +                           operator.getClass(),
    +                           taskInfo.getIndexOfThisSubtask(),
    +                           taskInfo.getNumberOfParallelSubtasks());
    +
    +           final String operatorIdentifierText = 
operatorSubtaskDescription.toString();
    +
    +           final OperatorSubtaskState operatorSubtaskStateFromJobManager =
    +                   
taskStateManager.operatorStates(operator.getOperatorID());
    +
    +           final boolean restoring = (operatorSubtaskStateFromJobManager 
!= null);
    +
    +           AbstractKeyedStateBackend<?> keyedStatedBackend = null;
    +           OperatorStateBackend operatorStateBackend = null;
    +           CloseableIterable<KeyGroupStatePartitionStreamProvider> 
rawKeyedStateInputs = null;
    +           CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs = null;
    +           CheckpointStreamFactory checkpointStreamFactory = null;
    +           InternalTimeServiceManager<?, ?> timeServiceManager = null;
    +
    +           try {
    +
    +                   // -------------- Keyed State Backend --------------
    +                   keyedStatedBackend = keyedStatedBackend(
    +                           keySerializer,
    +                           operatorIdentifierText,
    +                           operatorSubtaskStateFromJobManager,
    +                           streamTaskCloseableRegistry);
    +
    +                   // -------------- Operator State Backend --------------
    +                   operatorStateBackend = operatorStateBackend(
    +                           operatorIdentifierText,
    +                           operatorSubtaskStateFromJobManager,
    +                           streamTaskCloseableRegistry);
    +
    +                   // -------------- Raw State Streams --------------
    +                   rawKeyedStateInputs = 
rawKeyedStateInputs(operatorSubtaskStateFromJobManager);
    +                   
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
    +
    +                   rawOperatorStateInputs = 
rawOperatorStateInputs(operatorSubtaskStateFromJobManager);
    +                   
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
    +
    +                   // -------------- Checkpoint Stream Factory 
--------------
    +                   checkpointStreamFactory = 
streamFactory(operatorIdentifierText);
    +                   
streamTaskCloseableRegistry.registerCloseable(checkpointStreamFactory);
    +
    +                   // -------------- Internal Timer Service Manager 
--------------
    +                   timeServiceManager = 
internalTimeServiceManager(keyedStatedBackend, operator, rawKeyedStateInputs);
    +
    +                   // -------------- Preparing return value --------------
    +
    +                   return new StreamOperatorStateContextImpl(
    +                           restoring,
    +                           operatorStateBackend,
    +                           keyedStatedBackend,
    +                           timeServiceManager,
    +                           rawOperatorStateInputs,
    +                           rawKeyedStateInputs,
    +                           checkpointStreamFactory);
    +           } catch (Exception ex) {
    +
    +                   // cleanup if something went wrong before results got 
published.
    +                   if 
(streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
    +                           IOUtils.closeQuietly(keyedStatedBackend);
    +                   }
    +
    +                   if 
(streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
    +                           IOUtils.closeQuietly(keyedStatedBackend);
    +                   }
    +
    +                   if 
(streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
    +                           IOUtils.closeQuietly(rawKeyedStateInputs);
    +                   }
    +
    +                   if 
(streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
    +                           IOUtils.closeQuietly(rawOperatorStateInputs);
    +                   }
    +
    +                   if 
(streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
    +                           IOUtils.closeQuietly(rawOperatorStateInputs);
    +                   }
    +
    +                   throw new Exception("Exception while creating 
StreamOperatorStateContext.", ex);
    +           }
    +   }
    +
    +   protected <K> InternalTimeServiceManager<?, K> 
internalTimeServiceManager(
    +           AbstractKeyedStateBackend<K> keyedStatedBackend,
    +           KeyContext keyContext, //the operator
    +           Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates) 
throws Exception {
    +
    +           if (keyedStatedBackend == null) {
    +                   return null;
    +           }
    +
    +           final KeyGroupRange keyGroupRange = 
keyedStatedBackend.getKeyGroupRange();
    +
    +           final InternalTimeServiceManager<?, K> timeServiceManager = new 
InternalTimeServiceManager<>(
    +                   keyedStatedBackend.getNumberOfKeyGroups(),
    +                   keyGroupRange,
    +                   keyContext,
    +                   processingTimeService);
    +
    +           // and then initialize the timer services
    +           for (KeyGroupStatePartitionStreamProvider streamProvider : 
rawKeyedStates) {
    +                   int keyGroupIdx = streamProvider.getKeyGroupId();
    +
    +                   
Preconditions.checkArgument(keyGroupRange.contains(keyGroupIdx),
    +                           "Key Group " + keyGroupIdx + " does not belong 
to the local range.");
    +
    +                   timeServiceManager.restoreStateForKeyGroup(
    +                           new 
DataInputViewStreamWrapper(streamProvider.getStream()),
    +                           keyGroupIdx, environment.getUserClassLoader());
    +           }
    +
    +           return timeServiceManager;
    +   }
    +
    +   protected OperatorStateBackend operatorStateBackend(
    +           String operatorIdentifierText,
    +           OperatorSubtaskState operatorSubtaskStateFromJobManager,
    +           CloseableRegistry backendCloseableRegistry) throws Exception {
    +
    +           //TODO search in local state for a local recovery opportunity.
    +
    +           return createOperatorStateBackendFromJobManagerState(
    +                   operatorIdentifierText,
    +                   operatorSubtaskStateFromJobManager,
    +                   backendCloseableRegistry);
    +   }
    +
    +   protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(
    --- End diff --
    
    this is more of a general remark, can you name the methods as 
`getSomething()` or `createSomething()` to communicate what they are actually 
doing? I had to couple of times go to the implementation because it wasn't (at 
least for me :( ) clear from the context which one of it is it.


---

Reply via email to