Repository: flink Updated Branches: refs/heads/master 24408e190 -> 03889ae1f
[FLINK-6041] [streaming api] Move StreamingFunctionUtils to 'org.apache.flink.streaming.util' This close #3532 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03889ae1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03889ae1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03889ae1 Branch: refs/heads/master Commit: 03889ae1f25bab0fc42ae695bdf45cf45658eab6 Parents: 40a156e Author: liuyuzhong7 <[email protected]> Authored: Tue Mar 14 20:23:25 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Thu Mar 16 14:43:27 2017 +0100 ---------------------------------------------------------------------- .../functions/util/StreamingFunctionUtils.java | 215 ------------------- .../operators/AbstractUdfStreamOperator.java | 2 +- .../util/functions/StreamingFunctionUtils.java | 215 +++++++++++++++++++ .../functions/InternalWindowFunctionTest.java | 2 +- 4 files changed, 217 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/03889ae1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java deleted file mode 100644 index 679ef0b..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.util; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.translation.WrappingFunction; -import org.apache.flink.runtime.state.DefaultOperatorStateBackend; -import org.apache.flink.runtime.state.OperatorStateBackend; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; -import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; -import org.apache.flink.util.Preconditions; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -/** - * Utility class that contains helper methods to work with Flink Streaming - * {@link Function Functions}. This is similar to - * {@link org.apache.flink.api.common.functions.util.FunctionUtils} but has additional methods - * for invoking interfaces that only exist in the streaming API. - */ -@Internal -public final class StreamingFunctionUtils { - - @SuppressWarnings("unchecked") - public static <T> void setOutputType( - Function userFunction, - TypeInformation<T> outTypeInfo, - ExecutionConfig executionConfig) { - - Preconditions.checkNotNull(outTypeInfo); - Preconditions.checkNotNull(executionConfig); - - while (true) { - if (trySetOutputType(userFunction, outTypeInfo, executionConfig)) { - break; - } - - // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function - if (userFunction instanceof WrappingFunction) { - userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); - } else { - break; - } - - } - } - - @SuppressWarnings("unchecked") - private static <T> boolean trySetOutputType( - Function userFunction, - TypeInformation<T> outTypeInfo, - ExecutionConfig executionConfig) { - - Preconditions.checkNotNull(outTypeInfo); - Preconditions.checkNotNull(executionConfig); - - if (OutputTypeConfigurable.class.isAssignableFrom(userFunction.getClass())) { - ((OutputTypeConfigurable<T>) userFunction).setOutputType(outTypeInfo, executionConfig); - return true; - } - return false; - } - - public static void snapshotFunctionState( - StateSnapshotContext context, - OperatorStateBackend backend, - Function userFunction) throws Exception { - - Preconditions.checkNotNull(context); - Preconditions.checkNotNull(backend); - - while (true) { - - if (trySnapshotFunctionState(context, backend, userFunction)) { - break; - } - - // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function - if (userFunction instanceof WrappingFunction) { - userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); - } else { - break; - } - } - } - - private static boolean trySnapshotFunctionState( - StateSnapshotContext context, - OperatorStateBackend backend, - Function userFunction) throws Exception { - - if (userFunction instanceof CheckpointedFunction) { - ((CheckpointedFunction) userFunction).snapshotState(context); - - return true; - } - - if (userFunction instanceof ListCheckpointed) { - @SuppressWarnings("unchecked") - List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction). - snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp()); - - ListState<Serializable> listState = backend. - getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); - - listState.clear(); - - if (null != partitionableState) { - try { - for (Serializable statePartition : partitionableState) { - listState.add(statePartition); - } - } catch (Exception e) { - listState.clear(); - - throw new Exception("Could not write partitionable state to operator " + - "state backend.", e); - } - } - - return true; - } - - return false; - } - - public static void restoreFunctionState( - StateInitializationContext context, - Function userFunction) throws Exception { - - Preconditions.checkNotNull(context); - - while (true) { - - if (tryRestoreFunction(context, userFunction)) { - break; - } - - // inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function - if (userFunction instanceof WrappingFunction) { - userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); - } else { - break; - } - } - } - - private static boolean tryRestoreFunction( - StateInitializationContext context, - Function userFunction) throws Exception { - - if (userFunction instanceof CheckpointedFunction) { - ((CheckpointedFunction) userFunction).initializeState(context); - - return true; - } - - if (context.isRestored() && userFunction instanceof ListCheckpointed) { - @SuppressWarnings("unchecked") - ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction; - - ListState<Serializable> listState = context.getOperatorStateStore(). - getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); - - List<Serializable> list = new ArrayList<>(); - - for (Serializable serializable : listState.get()) { - list.add(serializable); - } - - try { - listCheckpointedFun.restoreState(list); - } catch (Exception e) { - - throw new Exception("Failed to restore state to function: " + e.getMessage(), e); - } - - return true; - } - - return false; - } - - /** - * Private constructor to prevent instantiation. - */ - private StreamingFunctionUtils() { - throw new RuntimeException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/03889ae1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 166287b..19559e1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; -import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils; +import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; http://git-wip-us.apache.org/repos/asf/flink/blob/03889ae1/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java new file mode 100644 index 0000000..4482431 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class that contains helper methods to work with Flink Streaming + * {@link Function Functions}. This is similar to + * {@link org.apache.flink.api.common.functions.util.FunctionUtils} but has additional methods + * for invoking interfaces that only exist in the streaming API. + */ +@Internal +public final class StreamingFunctionUtils { + + @SuppressWarnings("unchecked") + public static <T> void setOutputType( + Function userFunction, + TypeInformation<T> outTypeInfo, + ExecutionConfig executionConfig) { + + Preconditions.checkNotNull(outTypeInfo); + Preconditions.checkNotNull(executionConfig); + + while (true) { + if (trySetOutputType(userFunction, outTypeInfo, executionConfig)) { + break; + } + + // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function + if (userFunction instanceof WrappingFunction) { + userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); + } else { + break; + } + + } + } + + @SuppressWarnings("unchecked") + private static <T> boolean trySetOutputType( + Function userFunction, + TypeInformation<T> outTypeInfo, + ExecutionConfig executionConfig) { + + Preconditions.checkNotNull(outTypeInfo); + Preconditions.checkNotNull(executionConfig); + + if (OutputTypeConfigurable.class.isAssignableFrom(userFunction.getClass())) { + ((OutputTypeConfigurable<T>) userFunction).setOutputType(outTypeInfo, executionConfig); + return true; + } + return false; + } + + public static void snapshotFunctionState( + StateSnapshotContext context, + OperatorStateBackend backend, + Function userFunction) throws Exception { + + Preconditions.checkNotNull(context); + Preconditions.checkNotNull(backend); + + while (true) { + + if (trySnapshotFunctionState(context, backend, userFunction)) { + break; + } + + // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function + if (userFunction instanceof WrappingFunction) { + userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); + } else { + break; + } + } + } + + private static boolean trySnapshotFunctionState( + StateSnapshotContext context, + OperatorStateBackend backend, + Function userFunction) throws Exception { + + if (userFunction instanceof CheckpointedFunction) { + ((CheckpointedFunction) userFunction).snapshotState(context); + + return true; + } + + if (userFunction instanceof ListCheckpointed) { + @SuppressWarnings("unchecked") + List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction). + snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp()); + + ListState<Serializable> listState = backend. + getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); + + listState.clear(); + + if (null != partitionableState) { + try { + for (Serializable statePartition : partitionableState) { + listState.add(statePartition); + } + } catch (Exception e) { + listState.clear(); + + throw new Exception("Could not write partitionable state to operator " + + "state backend.", e); + } + } + + return true; + } + + return false; + } + + public static void restoreFunctionState( + StateInitializationContext context, + Function userFunction) throws Exception { + + Preconditions.checkNotNull(context); + + while (true) { + + if (tryRestoreFunction(context, userFunction)) { + break; + } + + // inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function + if (userFunction instanceof WrappingFunction) { + userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); + } else { + break; + } + } + } + + private static boolean tryRestoreFunction( + StateInitializationContext context, + Function userFunction) throws Exception { + + if (userFunction instanceof CheckpointedFunction) { + ((CheckpointedFunction) userFunction).initializeState(context); + + return true; + } + + if (context.isRestored() && userFunction instanceof ListCheckpointed) { + @SuppressWarnings("unchecked") + ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction; + + ListState<Serializable> listState = context.getOperatorStateStore(). + getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); + + List<Serializable> list = new ArrayList<>(); + + for (Serializable serializable : listState.get()) { + list.add(serializable); + } + + try { + listCheckpointedFun.restoreState(list); + } catch (Exception e) { + + throw new Exception("Failed to restore state to function: " + e.getMessage(), e); + } + + return true; + } + + return false; + } + + /** + * Private constructor to prevent instantiation. + */ + private StreamingFunctionUtils() { + throw new RuntimeException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/03889ae1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java index 8f795e9..d4fefa2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils; +import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
