Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r160723465
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateManagerImpl.java
---
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.io.IOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * This class is the main implementation of a {@link
StreamTaskStateManager}. This class obtains the state to create
+ * {@link StreamOperatorStateContext} objects for stream operators from
the {@link TaskStateManager} of the task that
+ * runs the stream task and hence the operator.
+ *
+ * <p>This implementation operates on top a {@link TaskStateManager}, from
which it receives everything required to
+ * restore state in the backends from checkpoints or savepoints.
+ */
+public class StreamTaskStateManagerImpl implements StreamTaskStateManager {
+
+ /**
+ * The environment of the task. This is required as parameter to
construct state backends via their factory.
+ */
+ private final Environment environment;
+
+ /** This processing time service is required to construct an internal
timer service manager. */
+ private final ProcessingTimeService processingTimeService;
+
+ /** The state manager of the tasks provides the information used to
restore potential previous state. */
+ private final TaskStateManager taskStateManager;
+
+ /** This object is the factory for everything related to state backends
and checkpointing. */
+ private final StateBackend stateBackend;
+
+ public StreamTaskStateManagerImpl(
+ Environment environment,
+ StateBackend stateBackend,
+ ProcessingTimeService processingTimeService) {
+
+ this.environment = environment;
+ this.taskStateManager =
Preconditions.checkNotNull(environment.getTaskStateManager());
+ this.stateBackend = Preconditions.checkNotNull(stateBackend);
+ this.processingTimeService = processingTimeService;
+ }
+
+ //
-----------------------------------------------------------------------------------------------------------------
+
+ @Override
+ public StreamOperatorStateContext streamOperatorStateContext(
+ AbstractStreamOperator<?> operator,
+ TypeSerializer<?> keySerializer,
+ CloseableRegistry streamTaskCloseableRegistry) throws Exception
{
+
+ TaskInfo taskInfo = environment.getTaskInfo();
+ OperatorSubtaskDescriptionText operatorSubtaskDescription =
+ new OperatorSubtaskDescriptionText(
+ operator.getOperatorID(),
+ operator.getClass(),
+ taskInfo.getIndexOfThisSubtask(),
+ taskInfo.getNumberOfParallelSubtasks());
+
+ final String operatorIdentifierText =
operatorSubtaskDescription.toString();
+
+ final OperatorSubtaskState operatorSubtaskStateFromJobManager =
+
taskStateManager.operatorStates(operator.getOperatorID());
+
+ final boolean restoring = (operatorSubtaskStateFromJobManager
!= null);
+
+ AbstractKeyedStateBackend<?> keyedStatedBackend = null;
+ OperatorStateBackend operatorStateBackend = null;
+ CloseableIterable<KeyGroupStatePartitionStreamProvider>
rawKeyedStateInputs = null;
+ CloseableIterable<StatePartitionStreamProvider>
rawOperatorStateInputs = null;
+ CheckpointStreamFactory checkpointStreamFactory = null;
+ InternalTimeServiceManager<?, ?> timeServiceManager = null;
+
+ try {
+
+ // -------------- Keyed State Backend --------------
+ keyedStatedBackend = keyedStatedBackend(
+ keySerializer,
+ operatorIdentifierText,
+ operatorSubtaskStateFromJobManager,
+ streamTaskCloseableRegistry);
+
+ // -------------- Operator State Backend --------------
+ operatorStateBackend = operatorStateBackend(
+ operatorIdentifierText,
+ operatorSubtaskStateFromJobManager,
+ streamTaskCloseableRegistry);
+
+ // -------------- Raw State Streams --------------
+ rawKeyedStateInputs =
rawKeyedStateInputs(operatorSubtaskStateFromJobManager);
+
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
+
+ rawOperatorStateInputs =
rawOperatorStateInputs(operatorSubtaskStateFromJobManager);
+
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
+
+ // -------------- Checkpoint Stream Factory
--------------
+ checkpointStreamFactory =
streamFactory(operatorIdentifierText);
+
streamTaskCloseableRegistry.registerCloseable(checkpointStreamFactory);
+
+ // -------------- Internal Timer Service Manager
--------------
+ timeServiceManager =
internalTimeServiceManager(keyedStatedBackend, operator, rawKeyedStateInputs);
+
+ // -------------- Preparing return value --------------
+
+ return new StreamOperatorStateContextImpl(
+ restoring,
+ operatorStateBackend,
+ keyedStatedBackend,
+ timeServiceManager,
+ rawOperatorStateInputs,
+ rawKeyedStateInputs,
+ checkpointStreamFactory);
+ } catch (Exception ex) {
+
+ // cleanup if something went wrong before results got
published.
+ if
(streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
+ IOUtils.closeQuietly(keyedStatedBackend);
+ }
+
+ if
(streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
+ IOUtils.closeQuietly(keyedStatedBackend);
+ }
+
+ if
(streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
+ IOUtils.closeQuietly(rawKeyedStateInputs);
+ }
+
+ if
(streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
+ IOUtils.closeQuietly(rawOperatorStateInputs);
+ }
+
+ if
(streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
+ IOUtils.closeQuietly(rawOperatorStateInputs);
+ }
+
+ throw new Exception("Exception while creating
StreamOperatorStateContext.", ex);
+ }
+ }
+
+ protected <K> InternalTimeServiceManager<?, K>
internalTimeServiceManager(
+ AbstractKeyedStateBackend<K> keyedStatedBackend,
+ KeyContext keyContext, //the operator
+ Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates)
throws Exception {
+
+ if (keyedStatedBackend == null) {
+ return null;
+ }
+
+ final KeyGroupRange keyGroupRange =
keyedStatedBackend.getKeyGroupRange();
+
+ final InternalTimeServiceManager<?, K> timeServiceManager = new
InternalTimeServiceManager<>(
+ keyedStatedBackend.getNumberOfKeyGroups(),
+ keyGroupRange,
+ keyContext,
+ processingTimeService);
+
+ // and then initialize the timer services
+ for (KeyGroupStatePartitionStreamProvider streamProvider :
rawKeyedStates) {
+ int keyGroupIdx = streamProvider.getKeyGroupId();
+
+
Preconditions.checkArgument(keyGroupRange.contains(keyGroupIdx),
+ "Key Group " + keyGroupIdx + " does not belong
to the local range.");
+
+ timeServiceManager.restoreStateForKeyGroup(
+ new
DataInputViewStreamWrapper(streamProvider.getStream()),
+ keyGroupIdx, environment.getUserClassLoader());
+ }
+
+ return timeServiceManager;
+ }
+
+ protected OperatorStateBackend operatorStateBackend(
+ String operatorIdentifierText,
+ OperatorSubtaskState operatorSubtaskStateFromJobManager,
+ CloseableRegistry backendCloseableRegistry) throws Exception {
+
+ //TODO search in local state for a local recovery opportunity.
+
+ return createOperatorStateBackendFromJobManagerState(
+ operatorIdentifierText,
+ operatorSubtaskStateFromJobManager,
+ backendCloseableRegistry);
+ }
+
+ protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(
+ TypeSerializer<K> keySerializer,
+ String operatorIdentifierText,
+ OperatorSubtaskState operatorSubtaskStateFromJobManager,
+ CloseableRegistry backendCloseableRegistry) throws Exception {
+
+ if (keySerializer == null) {
+ return null;
+ }
+
+ //TODO search in local state for a local recovery opportunity.
+
+ return createKeyedStatedBackendFromJobManagerState(
+ keySerializer,
+ operatorIdentifierText,
+ operatorSubtaskStateFromJobManager,
+ backendCloseableRegistry);
+ }
+
+ protected CheckpointStreamFactory streamFactory(String
operatorIdentifierText) throws IOException {
+ return stateBackend.createStreamFactory(environment.getJobID(),
operatorIdentifierText);
+ }
+
+ protected CloseableIterable<StatePartitionStreamProvider>
rawOperatorStateInputs(
+ OperatorSubtaskState operatorSubtaskStateFromJobManager) {
+
+ if (operatorSubtaskStateFromJobManager != null) {
+
+ final CloseableRegistry closeableRegistry = new
CloseableRegistry();
+
+ Collection<OperatorStateHandle> rawOperatorState =
+
operatorSubtaskStateFromJobManager.getRawOperatorState();
+
+ return new
CloseableIterable<StatePartitionStreamProvider>() {
+ @Override
+ public void close() throws IOException {
+ closeableRegistry.close();
+ }
+
+ @Nonnull
+ @Override
+ public Iterator<StatePartitionStreamProvider>
iterator() {
+ return new OperatorStateStreamIterator(
+
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
+ rawOperatorState.iterator(),
closeableRegistry);
+ }
+ };
+ }
+
+ return CloseableIterable.empty();
+ }
+
+ protected CloseableIterable<KeyGroupStatePartitionStreamProvider>
rawKeyedStateInputs(
+ OperatorSubtaskState operatorSubtaskStateFromJobManager) {
+
+ if (operatorSubtaskStateFromJobManager != null) {
+
+ Collection<KeyedStateHandle> rawKeyedState =
operatorSubtaskStateFromJobManager.getRawKeyedState();
+ Collection<KeyGroupsStateHandle> keyGroupsStateHandles
= transform(rawKeyedState);
+ final CloseableRegistry closeableRegistry = new
CloseableRegistry();
+
+ return new
CloseableIterable<KeyGroupStatePartitionStreamProvider>() {
+ @Override
+ public void close() throws IOException {
+ closeableRegistry.close();
+ }
+
+ @Override
+ public
Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
+ return new
KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry);
+ }
+ };
+ }
+
+ return CloseableIterable.empty();
+ }
+
+ //
=================================================================================================================
+
+ private OperatorStateBackend
createOperatorStateBackendFromJobManagerState(
+ String operatorIdentifierText,
+ OperatorSubtaskState operatorSubtaskStateFromJobManager,
+ CloseableRegistry backendCloseableRegistry) throws Exception {
+
+ final OperatorStateBackend operatorStateBackend =
+ stateBackend.createOperatorStateBackend(environment,
operatorIdentifierText);
+
+
backendCloseableRegistry.registerCloseable(operatorStateBackend);
+
+ Collection<OperatorStateHandle> managedOperatorState = null;
+
+ if (operatorSubtaskStateFromJobManager != null) {
+ managedOperatorState =
operatorSubtaskStateFromJobManager.getManagedOperatorState();
+ }
+
+ operatorStateBackend.restore(managedOperatorState);
+
+ return operatorStateBackend;
+ }
+
+ private <K> AbstractKeyedStateBackend<K>
createKeyedStatedBackendFromJobManagerState(
--- End diff --
nit: could you mark methods as `static` if they are not using `this`
variable?
---