This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch process-func-api-poc-weijie in repository https://gitbox.apache.org/repos/asf/flink.git
commit 43ca07e726c40c296ddc349eab87ab98a9926303 Author: Weijie Guo <[email protected]> AuthorDate: Tue Jun 6 21:02:53 2023 +0800 ProcessOperator supports operator list state. --- .../flink/api/common/state/AppendingState.java | 0 .../apache/flink/api/common/state/ListState.java | 0 .../api/common/state/ListStateDeclaration.java | 30 +++++++++++-------- .../flink/api/common/state/MergingState.java | 0 .../org/apache/flink/api/common/state/State.java | 0 .../flink/api/common/state/StateDeclaration.java | 5 ++-- .../{State.java => StateDeclarationConverter.java} | 27 ++++++++--------- .../flink/processfunction/api/ProcessFunction.java | 5 ++-- .../flink/processfunction/api/RuntimeContext.java | 4 ++- .../flink/processfunction/api/StateDescriptor.java | 21 ------------- .../examples/SimpleStatefulMap.java | 35 ++++++++++++++-------- .../processfunction/operators/ProcessOperator.java | 30 +++++++++++++++---- 12 files changed, 86 insertions(+), 71 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/AppendingState.java similarity index 100% rename from flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java rename to flink-core-api/src/main/java/org/apache/flink/api/common/state/AppendingState.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListState.java similarity index 100% rename from flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java rename to flink-core-api/src/main/java/org/apache/flink/api/common/state/ListState.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java similarity index 60% copy from flink-core/src/main/java/org/apache/flink/api/common/state/State.java copy to flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java index bd5211df618..a9ee9f219f0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java @@ -18,19 +18,23 @@ package org.apache.flink.api.common.state; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; -/** - * Interface that different types of partitioned state must implement. - * - * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is - * automatically supplied by the system, so the function always sees the value mapped to the key of - * the current element. That way, the system can handle stream and state partitioning consistently - * together. - */ -@PublicEvolving -public interface State { +public class ListStateDeclaration<T> implements StateDeclaration { + private final String name; + + private final TypeDescriptor<T> elementTypeDescriptor; + + public ListStateDeclaration(String name, TypeDescriptor<T> elementTypeDescriptor) { + this.name = name; + this.elementTypeDescriptor = elementTypeDescriptor; + } + + public String getName() { + return name; + } - /** Removes the value mapped under the current key. */ - void clear(); + public TypeDescriptor<T> getElementTypeDescriptor() { + return elementTypeDescriptor; + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MergingState.java similarity index 100% rename from flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java rename to flink-core-api/src/main/java/org/apache/flink/api/common/state/MergingState.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/State.java similarity index 100% copy from flink-core/src/main/java/org/apache/flink/api/common/state/State.java copy to flink-core-api/src/main/java/org/apache/flink/api/common/state/State.java diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/State.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java similarity index 88% rename from flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/State.java rename to flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java index 3fc48ab137d..5070f2ac8f6 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/State.java +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java @@ -16,6 +16,7 @@ * limitations under the License. */ -package org.apache.flink.processfunction.api; +package org.apache.flink.api.common.state; -public interface State {} +/** Declaration for state. */ +public interface StateDeclaration {} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDeclarationConverter.java similarity index 55% rename from flink-core/src/main/java/org/apache/flink/api/common/state/State.java rename to flink-core/src/main/java/org/apache/flink/api/common/state/StateDeclarationConverter.java index bd5211df618..363c21ec619 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDeclarationConverter.java @@ -18,19 +18,18 @@ package org.apache.flink.api.common.state; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.TypeInformationUtils; -/** - * Interface that different types of partitioned state must implement. - * - * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is - * automatically supplied by the system, so the function always sees the value mapped to the key of - * the current element. That way, the system can handle stream and state partitioning consistently - * together. - */ -@PublicEvolving -public interface State { - - /** Removes the value mapped under the current key. */ - void clear(); +/** Utils to convert {@link StateDeclaration} to {@link StateDeclaration}. */ +public class StateDeclarationConverter { + public static <T> ListStateDescriptor<T> getListStateDescriptor( + ListStateDeclaration<T> stateDeclaration) { + //noinspection unchecked + return new ListStateDescriptor<>( + stateDeclaration.getName(), + (TypeInformation<T>) + TypeInformationUtils.fromTypeDescriptor( + stateDeclaration.getElementTypeDescriptor())); + } } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java index 55919cd8307..f83c7967137 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java @@ -19,6 +19,7 @@ package org.apache.flink.processfunction.api; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.state.StateDeclaration; import java.util.Collections; import java.util.Map; @@ -26,10 +27,10 @@ import java.util.function.Consumer; @FunctionalInterface public interface ProcessFunction<IN, OUT> extends Function { - void processRecord(IN record, Consumer<OUT> output, RuntimeContext ctx); + void processRecord(IN record, Consumer<OUT> output, RuntimeContext ctx) throws Exception; // Explicitly declares states upfront. See FLIP-22. - default Map<String, StateDescriptor> usesStates() { // stateId -> stateDescriptor + default Map<String, StateDeclaration> usesStates() { // stateId -> stateDeclaration return Collections.emptyMap(); } } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java index 908eeb8367a..f5db74d87fa 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java @@ -18,6 +18,8 @@ package org.apache.flink.processfunction.api; +import org.apache.flink.api.common.state.ListState; + public interface RuntimeContext { - State getState(String stateId); + <T> ListState<T> getListState(String stateId) throws Exception; } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/StateDescriptor.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/StateDescriptor.java deleted file mode 100644 index 838de57d9c0..00000000000 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/StateDescriptor.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.processfunction.api; - -public interface StateDescriptor {} diff --git a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java index 5f849dbf8e4..86097640448 100644 --- a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java +++ b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java @@ -18,11 +18,13 @@ package org.apache.flink.processfunction.examples; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; import org.apache.flink.processfunction.api.ExecutionEnvironment; import org.apache.flink.processfunction.api.ProcessFunction; import org.apache.flink.processfunction.api.RuntimeContext; -import org.apache.flink.processfunction.api.State; -import org.apache.flink.processfunction.api.StateDescriptor; import java.util.Collections; import java.util.Map; @@ -36,26 +38,35 @@ public class SimpleStatefulMap { .process(new CalcTimeDiffFunc()) // Don't use Lambda reference as PrintStream is not serializable. .tmpToConsumerSink( - (timeDiff) -> System.out.println("%d milliseconds since last timestamp.")); + (timeDiff) -> + System.out.printf( + "%d milliseconds since last timestamp. \n", timeDiff)); env.execute(); } private static class CalcTimeDiffFunc implements ProcessFunction<Long, Long> { + static final String STATE_ID = "lastTimestamp"; @Override - public void processRecord(Long record, Consumer<Long> output, RuntimeContext ctx) { - State state = ctx.getState(STATE_ID); - // TODO: - // long diff = record - state.getValue(); - // state.setValue(record) - // output.accept(diff); + public void processRecord(Long record, Consumer<Long> output, RuntimeContext ctx) + throws Exception { + ListState<Long> state = ctx.getListState(STATE_ID); + if (!state.get().iterator().hasNext()) { + // for first record + output.accept(0L); + } else { + long diff = record - state.get().iterator().next(); + output.accept(diff); + } + state.update(Collections.singletonList(record)); } @Override - public Map<String, StateDescriptor> usesStates() { - // TODO: state descriptor for type LONG - return Collections.singletonMap(STATE_ID, null); + public Map<String, StateDeclaration> usesStates() { + ListStateDeclaration<Long> listStateDeclaration = + new ListStateDeclaration<>(STATE_ID, TypeDescriptors.LONG); + return Collections.singletonMap(STATE_ID, listStateDeclaration); } } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java index 70700ede5eb..693e9078337 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java @@ -18,14 +18,20 @@ package org.apache.flink.processfunction.operators; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.state.StateDeclarationConverter; import org.apache.flink.processfunction.api.ProcessFunction; import org.apache.flink.processfunction.api.RuntimeContext; -import org.apache.flink.processfunction.api.State; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import java.util.Map; import java.util.function.Consumer; /** Operator for {@link org.apache.flink.processfunction.api.ProcessFunction}. */ @@ -51,7 +57,7 @@ public class ProcessOperator<IN, OUT> } @Override - public void processElement(StreamRecord<IN> element) { + public void processElement(StreamRecord<IN> element) throws Exception { userFunction.processRecord(element.getValue(), outputCollector, context); } @@ -65,12 +71,24 @@ public class ProcessOperator<IN, OUT> } } - private static class ContextImpl implements RuntimeContext { + private class ContextImpl implements RuntimeContext { + private final Map<String, StateDeclaration> allRegisteredStates; + + private ContextImpl() { + allRegisteredStates = userFunction.usesStates(); + } @Override - public State getState(String stateId) { - // TODO return state. - return null; + public <T> ListState<T> getListState(String stateId) throws Exception { + StateDeclaration stateDeclaration = allRegisteredStates.get(stateId); + Preconditions.checkNotNull(stateDeclaration, "No list state with id: " + stateId); + Preconditions.checkState(stateDeclaration instanceof ListStateDeclaration); + + //noinspection unchecked + ListStateDescriptor<T> listStateDescriptor = + StateDeclarationConverter.getListStateDescriptor( + (ListStateDeclaration<T>) stateDeclaration); + return getOperatorStateBackend().getListState(listStateDescriptor); } } }
