This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2503c67c8453f150a751ede182445dabb90466d6 Author: xiarui <[email protected]> AuthorDate: Thu Jan 16 15:28:00 2025 +0800 [FLINK-37028][State] Introduce namespace wrapped state for window operator. --- .../AggregatingStateWithDeclaredNamespace.java | 105 ++++++++++++++ .../state/ListStateWithDeclaredNamespace.java | 107 ++++++++++++++ .../state/MapStateWithDeclaredNamespace.java | 158 +++++++++++++++++++++ .../state/ReducingStateWithDeclaredNamespace.java | 105 ++++++++++++++ .../declare/state/StateWithDeclaredNamespace.java | 95 +++++++++++++ .../state/ValueStateWithDeclaredNamespace.java | 70 +++++++++ 6 files changed, 640 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/AggregatingStateWithDeclaredNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/AggregatingStateWithDeclaredNamespace.java new file mode 100644 index 000000000000..e38e0aa031e9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/AggregatingStateWithDeclaredNamespace.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.declare.state; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; +import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState; + +import java.util.Collection; + +/** Aggregating state wrapped with declared namespace. */ +class AggregatingStateWithDeclaredNamespace<K, N, IN, ACC, OUT> + extends StateWithDeclaredNamespace<K, N, ACC> + implements InternalAggregatingState<K, N, IN, ACC, OUT> { + private final InternalAggregatingState<K, N, IN, ACC, OUT> state; + + public AggregatingStateWithDeclaredNamespace( + InternalAggregatingState<K, N, IN, ACC, OUT> state, + DeclaredVariable<N> declaredNamespace) { + super(state, declaredNamespace); + this.state = state; + } + + @Override + public StateFuture<OUT> asyncGet() { + resetNamespace(); + return state.asyncGet(); + } + + @Override + public StateFuture<Void> asyncAdd(IN value) { + resetNamespace(); + return state.asyncAdd(value); + } + + @Override + public StateFuture<Void> asyncClear() { + resetNamespace(); + return state.asyncClear(); + } + + @Override + public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> sources) { + resetNamespace(); + return state.asyncMergeNamespaces(target, sources); + } + + @Override + public StateFuture<ACC> asyncGetInternal() { + resetNamespace(); + return state.asyncGetInternal(); + } + + @Override + public StateFuture<Void> asyncUpdateInternal(ACC valueToStore) { + resetNamespace(); + return state.asyncUpdateInternal(valueToStore); + } + + @Override + public OUT get() { + return state.get(); + } + + @Override + public void add(IN value) { + state.add(value); + } + + @Override + public void clear() { + state.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection<N> sources) { + state.mergeNamespaces(target, sources); + } + + @Override + public ACC getInternal() { + return state.getInternal(); + } + + @Override + public void updateInternal(ACC valueToStore) { + state.updateInternal(valueToStore); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ListStateWithDeclaredNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ListStateWithDeclaredNamespace.java new file mode 100644 index 000000000000..2a2b0798b657 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ListStateWithDeclaredNamespace.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.declare.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; +import org.apache.flink.runtime.state.v2.internal.InternalListState; + +import java.util.Collection; +import java.util.List; + +/** ListState wrapped with declared namespace. */ +@Internal +class ListStateWithDeclaredNamespace<K, N, V> extends StateWithDeclaredNamespace<K, N, V> + implements InternalListState<K, N, V> { + private final InternalListState<K, N, V> state; + + public ListStateWithDeclaredNamespace( + InternalListState<K, N, V> state, DeclaredVariable<N> declaredNamespace) { + super(state, declaredNamespace); + this.state = state; + } + + @Override + public StateFuture<StateIterator<V>> asyncGet() { + resetNamespace(); + return state.asyncGet(); + } + + @Override + public StateFuture<Void> asyncAdd(V value) { + resetNamespace(); + return state.asyncAdd(value); + } + + @Override + public StateFuture<Void> asyncUpdate(List<V> values) { + resetNamespace(); + return state.asyncUpdate(values); + } + + @Override + public StateFuture<Void> asyncAddAll(List<V> values) { + resetNamespace(); + return state.asyncAddAll(values); + } + + @Override + public StateFuture<Void> asyncClear() { + resetNamespace(); + return state.asyncClear(); + } + + @Override + public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> sources) { + resetNamespace(); + return state.asyncMergeNamespaces(target, sources); + } + + @Override + public Iterable<V> get() { + return state.get(); + } + + @Override + public void add(V value) { + state.add(value); + } + + @Override + public void update(List<V> values) { + state.update(values); + } + + @Override + public void addAll(List<V> values) { + state.addAll(values); + } + + @Override + public void clear() { + state.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection<N> sources) { + state.mergeNamespaces(target, sources); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/MapStateWithDeclaredNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/MapStateWithDeclaredNamespace.java new file mode 100644 index 000000000000..b5914195c922 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/MapStateWithDeclaredNamespace.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.declare.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; +import org.apache.flink.runtime.state.v2.internal.InternalMapState; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +/** MapState wrapped with declared namespace. */ +@Internal +class MapStateWithDeclaredNamespace<K, N, UK, UV> extends StateWithDeclaredNamespace<K, N, UV> + implements InternalMapState<K, N, UK, UV> { + + private final InternalMapState<K, N, UK, UV> state; + + public MapStateWithDeclaredNamespace( + InternalMapState<K, N, UK, UV> state, DeclaredVariable<N> declaredNamespace) { + super(state, declaredNamespace); + this.state = state; + } + + @Override + public StateFuture<UV> asyncGet(UK key) { + resetNamespace(); + return state.asyncGet(key); + } + + @Override + public StateFuture<Void> asyncPut(UK key, UV value) { + resetNamespace(); + return state.asyncPut(key, value); + } + + @Override + public StateFuture<Void> asyncPutAll(Map<UK, UV> map) { + resetNamespace(); + return state.asyncPutAll(map); + } + + @Override + public StateFuture<Void> asyncRemove(UK key) { + resetNamespace(); + return state.asyncRemove(key); + } + + @Override + public StateFuture<Boolean> asyncContains(UK key) { + resetNamespace(); + return state.asyncContains(key); + } + + @Override + public StateFuture<StateIterator<Entry<UK, UV>>> asyncEntries() { + resetNamespace(); + return state.asyncEntries(); + } + + @Override + public StateFuture<StateIterator<UK>> asyncKeys() { + resetNamespace(); + return state.asyncKeys(); + } + + @Override + public StateFuture<StateIterator<UV>> asyncValues() { + resetNamespace(); + return state.asyncValues(); + } + + @Override + public StateFuture<Boolean> asyncIsEmpty() { + resetNamespace(); + return state.asyncIsEmpty(); + } + + @Override + public StateFuture<Void> asyncClear() { + resetNamespace(); + return state.asyncClear(); + } + + @Override + public UV get(UK key) { + return state.get(key); + } + + @Override + public void put(UK key, UV value) { + state.put(key, value); + } + + @Override + public void putAll(Map<UK, UV> map) { + state.putAll(map); + } + + @Override + public void remove(UK key) { + state.remove(key); + } + + @Override + public boolean contains(UK key) { + return state.contains(key); + } + + @Override + public Iterable<Entry<UK, UV>> entries() { + return state.entries(); + } + + @Override + public Iterable<UK> keys() { + return state.keys(); + } + + @Override + public Iterable<UV> values() { + return state.values(); + } + + @Override + public Iterator<Entry<UK, UV>> iterator() { + return state.iterator(); + } + + @Override + public boolean isEmpty() { + return state.isEmpty(); + } + + @Override + public void clear() { + state.clear(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ReducingStateWithDeclaredNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ReducingStateWithDeclaredNamespace.java new file mode 100644 index 000000000000..e9ab82eb7ffc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ReducingStateWithDeclaredNamespace.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.declare.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; +import org.apache.flink.runtime.state.v2.internal.InternalReducingState; + +import java.util.Collection; + +/** Reducing state wrapped with declared namespace. */ +@Internal +class ReducingStateWithDeclaredNamespace<K, N, T> extends StateWithDeclaredNamespace<K, N, T> + implements InternalReducingState<K, N, T> { + private final InternalReducingState<K, N, T> state; + + public ReducingStateWithDeclaredNamespace( + InternalReducingState<K, N, T> state, DeclaredVariable<N> declaredNamespace) { + super(state, declaredNamespace); + this.state = state; + } + + @Override + public StateFuture<T> asyncGet() { + resetNamespace(); + return state.asyncGet(); + } + + @Override + public StateFuture<Void> asyncAdd(T value) { + resetNamespace(); + return state.asyncAdd(value); + } + + @Override + public StateFuture<Void> asyncClear() { + resetNamespace(); + return state.asyncClear(); + } + + @Override + public StateFuture<T> asyncGetInternal() { + resetNamespace(); + return state.asyncGetInternal(); + } + + @Override + public StateFuture<Void> asyncUpdateInternal(T valueToStore) { + resetNamespace(); + return state.asyncUpdateInternal(valueToStore); + } + + @Override + public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> sources) { + resetNamespace(); + return state.asyncMergeNamespaces(target, sources); + } + + @Override + public T get() { + return state.get(); + } + + @Override + public void add(T value) { + state.add(value); + } + + @Override + public void clear() { + state.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection<N> sources) { + state.mergeNamespaces(target, sources); + } + + @Override + public T getInternal() { + return state.getInternal(); + } + + @Override + public void updateInternal(T valueToStore) { + state.updateInternal(valueToStore); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/StateWithDeclaredNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/StateWithDeclaredNamespace.java new file mode 100644 index 000000000000..969cc2297700 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/StateWithDeclaredNamespace.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.declare.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; +import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState; +import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; +import org.apache.flink.runtime.state.v2.internal.InternalListState; +import org.apache.flink.runtime.state.v2.internal.InternalMapState; +import org.apache.flink.runtime.state.v2.internal.InternalReducingState; +import org.apache.flink.runtime.state.v2.internal.InternalValueState; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +/** + * A partitioned state that wraps a declared namespace and hide the namespace switching from user. + * User will only use the state just like the public state APIs without any consideration on + * namespace. + * + * <p>This wrap class is useful in DataStream window operation, where namespace is managed by the + * operator and user function is free from namespace manipulation. + */ +@Internal +public abstract class StateWithDeclaredNamespace<K, N, V> implements InternalKeyedState<K, N, V> { + @Nonnull private final InternalKeyedState<K, N, V> state; + @Nonnull private final DeclaredVariable<N> declaredNamespace; + + public StateWithDeclaredNamespace( + @Nonnull InternalKeyedState<K, N, V> state, + @Nonnull DeclaredVariable<N> declaredNamespace) { + Preconditions.checkNotNull(state); + Preconditions.checkNotNull(declaredNamespace); + + this.state = state; + this.declaredNamespace = declaredNamespace; + } + + @Override + public void setCurrentNamespace(N namespace) { + declaredNamespace.set(namespace); + state.setCurrentNamespace(namespace); + } + + /** Automatically called before any async state access. */ + protected void resetNamespace() { + state.setCurrentNamespace(declaredNamespace.get()); + } + + @SuppressWarnings("unchecked") + public static <N, S extends State> S create(S state, DeclaredVariable<N> declaredNamespace) { + if (state instanceof InternalReducingState) { + return (S) + new ReducingStateWithDeclaredNamespace<>( + (InternalReducingState<?, N, ?>) state, declaredNamespace); + } else if (state instanceof InternalAggregatingState) { + return (S) + new AggregatingStateWithDeclaredNamespace<>( + (InternalAggregatingState<?, N, ?, ?, ?>) state, declaredNamespace); + } else if (state instanceof InternalValueState) { + return (S) + new ValueStateWithDeclaredNamespace<>( + (InternalValueState<?, N, ?>) state, declaredNamespace); + } else if (state instanceof InternalMapState) { + return (S) + new MapStateWithDeclaredNamespace<>( + (InternalMapState<?, N, ?, ?>) state, declaredNamespace); + } else if (state instanceof InternalListState) { + return (S) + new ListStateWithDeclaredNamespace<>( + (InternalListState<?, N, ?>) state, declaredNamespace); + } else { + throw new IllegalArgumentException( + "Unsupported state type: " + state.getClass().getName()); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ValueStateWithDeclaredNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ValueStateWithDeclaredNamespace.java new file mode 100644 index 000000000000..06545d818f3a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ValueStateWithDeclaredNamespace.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.declare.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; +import org.apache.flink.runtime.state.v2.internal.InternalValueState; + +/** Value state wrapped with declared namespace. */ +@Internal +class ValueStateWithDeclaredNamespace<K, N, V> extends StateWithDeclaredNamespace<K, N, V> + implements InternalValueState<K, N, V> { + private final InternalValueState<K, N, V> state; + + public ValueStateWithDeclaredNamespace( + InternalValueState<K, N, V> state, DeclaredVariable<N> declaredNamespace) { + super(state, declaredNamespace); + this.state = state; + } + + @Override + public StateFuture<Void> asyncClear() { + resetNamespace(); + return state.asyncClear(); + } + + @Override + public StateFuture<V> asyncValue() { + resetNamespace(); + return state.asyncValue(); + } + + @Override + public StateFuture<Void> asyncUpdate(V value) { + resetNamespace(); + return state.asyncUpdate(value); + } + + @Override + public void clear() { + state.clear(); + } + + @Override + public V value() { + return state.value(); + } + + @Override + public void update(V value) { + state.update(value); + } +}
