This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit fb9dbcded793461af82323afb6e3d61141c95b09
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Wed Nov 11 15:51:07 2020 +0800

    [FLINK-20085] [core] Remove RemoteFunctionStateMigrator migration code paths
---
 .../core/functions/FunctionGroupOperator.java      |  15 ---
 .../functions/RemoteFunctionStateMigrator.java     | 148 ---------------------
 2 files changed, 163 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 79e7a53..741575b 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.common.ManagingResources;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactory;
-import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
@@ -139,20 +138,6 @@ public class FunctionGroupOperator extends 
AbstractStreamOperator<Message>
             asyncOperationState);
 
     //
-    // De-multiplex legacy remote function state in versions <= 2.1.x
-    // TODO backwards compatibility path for 2.1.x supported only in 2.2.x, 
remove for 2.3.x
-    //
-    if (configuration.shouldMigrateLegacyRemoteFnState()) {
-      final DynamicallyRegisteredTypes dynamicallyRegisteredTypes =
-          new DynamicallyRegisteredTypes(statefulFunctionsUniverse.types());
-      RemoteFunctionStateMigrator.apply(
-          statefulFunctionsUniverse.functions(),
-          getKeyedStateBackend(),
-          dynamicallyRegisteredTypes.registerType(String.class),
-          dynamicallyRegisteredTypes.registerType(byte[].class));
-    }
-
-    //
     // expire all the pending async operations.
     //
     AsyncOperationFailureNotifier.fireExpiredAsyncOperations(
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java
deleted file mode 100644
index 1b4b33c..0000000
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.functions;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KeyedStateFunction;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
-import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
-import org.apache.flink.statefun.flink.core.state.FlinkState;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-
-/**
- * Performs state migration for legacy remote function state in StateFun 
versions <= 2.1.x.
- *
- * <p>TODO we plan to remove this backwards compatibility path in version 
2.3.0, meaning that TODO
- * users who want to upgrade from 2.1.x to 2.3.x need to first upgrade to 
2.2.x.
- */
-final class RemoteFunctionStateMigrator
-    implements KeyedStateFunction<String, MapState<String, byte[]>> {
-
-  private static final String LEGACY_MUX_STATE_NAME = "states";
-
-  static void apply(
-      Map<FunctionType, StatefulFunctionProvider> functionProviders,
-      KeyedStateBackend<String> keyedStateBackend,
-      TypeInformation<String> keyTypeInfo,
-      TypeInformation<byte[]> valueTypeInfo)
-      throws Exception {
-    functionProviders.entrySet().stream()
-        .filter(RemoteFunctionStateMigrator::isRemoteFunctionProvider)
-        .forEach(
-            remoteFunctionProvider ->
-                migrateRemoteFunctionState(
-                    remoteFunctionProvider, keyedStateBackend, keyTypeInfo, 
valueTypeInfo));
-  }
-
-  private static boolean isRemoteFunctionProvider(
-      Map.Entry<FunctionType, StatefulFunctionProvider> functionProviderEntry) 
{
-    return functionProviderEntry.getValue() instanceof HttpFunctionProvider;
-  }
-
-  private static void migrateRemoteFunctionState(
-      Map.Entry<FunctionType, StatefulFunctionProvider> functionProviderEntry,
-      KeyedStateBackend<String> keyedStateBackend,
-      TypeInformation<String> keyTypeInfo,
-      TypeInformation<byte[]> valueTypeInfo) {
-    final FunctionType functionType = functionProviderEntry.getKey();
-    final HttpFunctionSpec functionSpec =
-        ((HttpFunctionProvider) 
functionProviderEntry.getValue()).getFunctionSpec(functionType);
-
-    try {
-      final RemoteFunctionStateMigrator stateMigrator =
-          new RemoteFunctionStateMigrator(
-              demuxValueStateHandles(
-                  functionSpec.states(), functionType, keyedStateBackend, 
valueTypeInfo));
-
-      keyedStateBackend.applyToAllKeys(
-          VoidNamespace.INSTANCE,
-          VoidNamespaceSerializer.INSTANCE,
-          multiplexedStateDescriptor(functionType, keyTypeInfo, valueTypeInfo),
-          stateMigrator);
-    } catch (Exception e) {
-      throw new RuntimeException(
-          "Error migrating multiplexed state for remote function type " + 
functionType);
-    }
-  }
-
-  /** The value states to de-mux the multiplexed state into. */
-  private final Map<String, ValueState<byte[]>> demuxValueStates;
-
-  private RemoteFunctionStateMigrator(Map<String, ValueState<byte[]>> 
demuxValueStates) {
-    this.demuxValueStates = Objects.requireNonNull(demuxValueStates);
-  }
-
-  @Override
-  public void process(String key, MapState<String, byte[]> multiplexedState) 
throws Exception {
-    for (Map.Entry<String, byte[]> entry : multiplexedState.entries()) {
-      final String stateName = entry.getKey();
-      final byte[] value = entry.getValue();
-
-      final ValueState<byte[]> demuxState = demuxValueStates.get(stateName);
-      // drop state if it is no longer registered, otherwise migrate to value 
state
-      if (demuxState != null) {
-        demuxState.update(value);
-      }
-    }
-    multiplexedState.clear();
-  }
-
-  private static Map<String, ValueState<byte[]>> demuxValueStateHandles(
-      List<StateSpec> stateSpecs,
-      FunctionType functionType,
-      KeyedStateBackend<String> keyedStateBackend,
-      TypeInformation<byte[]> valueTypeInfo)
-      throws Exception {
-    final Map<String, ValueState<byte[]>> valueStates = new 
HashMap<>(stateSpecs.size());
-    for (StateSpec stateSpec : stateSpecs) {
-      valueStates.put(
-          stateSpec.name(),
-          keyedStateBackend.getOrCreateKeyedState(
-              VoidNamespaceSerializer.INSTANCE,
-              demuxValueStateDescriptor(functionType, stateSpec, 
valueTypeInfo)));
-    }
-    return valueStates;
-  }
-
-  private static ValueStateDescriptor<byte[]> demuxValueStateDescriptor(
-      FunctionType functionType, StateSpec stateSpec, TypeInformation<byte[]> 
valueTypeInfo) {
-    return new ValueStateDescriptor<>(
-        FlinkState.flinkStateName(functionType, stateSpec.name()), 
valueTypeInfo);
-  }
-
-  private static MapStateDescriptor<String, byte[]> multiplexedStateDescriptor(
-      FunctionType functionType,
-      TypeInformation<String> keyTypeInfo,
-      TypeInformation<byte[]> valueTypeInfo) {
-    return new MapStateDescriptor<>(
-        FlinkState.flinkStateName(functionType, LEGACY_MUX_STATE_NAME), 
keyTypeInfo, valueTypeInfo);
-  }
-}

Reply via email to