masteryhx commented on code in PR #24595: URL: https://github.com/apache/flink/pull/24595#discussion_r1546073433
########## flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +import java.util.Map; + +/** + * {@link State} interface for partitioned key-value state. The key-value pair can be added, updated + * and retrieved. + * + * <p>The state is accessed and modified by user functions, and checkpointed consistently by the + * system as part of the distributed snapshots. + * + * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is + * automatically supplied by the system, so the function always sees the value mapped to the key of + * the current element. That way, the system can handle stream and state partitioning consistently + * together. + * + * @param <UK> Type of the keys in the state. + * @param <UV> Type of the values in the state. + */ +@Experimental +public interface MapState<UK, UV> extends State { + + /** + * Returns the current value associated with the given key asynchronously. When the state is not Review Comment: How about adding some descriptions about null value ? (Also for other put/remove/... methods) ########## flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AppendingState.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +/** + * Base interface for partitioned state that supports adding elements and inspecting the current + * state. Elements can either be kept in a buffer (list-like) or aggregated into one value. + * + * <p>The state is accessed and modified by user functions, and checkpointed consistently by the + * system as part of the distributed snapshots. + * + * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is + * automatically supplied by the system, so the function always sees the value mapped to the key of + * the current element. That way, the system can handle stream and state partitioning consistently + * together. + * + * @param <IN> Type of the value that can be added to the state. + * @param <OUT> Type of the value that can be retrieved from the state. + */ +@Experimental +public interface AppendingState<IN, OUT> extends State { + + /** + * Returns the current value for the state. When the state is not partitioned the returned value + * is the same for all inputs in a given operator instance. If state partitioning is applied, + * the value returned depends on the current operator input, as the operator maintains an + * independent state for each partition. + * + * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method should return {@code + * null} wrapped by a StateFuture. + * + * @return The operator state value corresponding to the current input or {@code null} if the Review Comment: ```suggestion * @return The operator state value corresponding to the current input or {@code null} wrapped by a StateFuture if the ``` ########## flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/State.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +/** + * Interface that different types of partitioned state must implement. Review Comment: How about adding some descriptions about 'thread safe' ? Users may call this in their own threads. ########## flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +import java.util.Map; + +/** + * {@link State} interface for partitioned key-value state. The key-value pair can be added, updated + * and retrieved. + * + * <p>The state is accessed and modified by user functions, and checkpointed consistently by the + * system as part of the distributed snapshots. + * + * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is + * automatically supplied by the system, so the function always sees the value mapped to the key of + * the current element. That way, the system can handle stream and state partitioning consistently + * together. + * + * @param <UK> Type of the keys in the state. + * @param <UV> Type of the values in the state. + */ +@Experimental +public interface MapState<UK, UV> extends State { + + /** + * Returns the current value associated with the given key asynchronously. When the state is not + * partitioned the returned value is the same for all inputs in a given operator instance. If + * state partitioning is applied, the value returned depends on the current operator input, as + * the operator maintains an independent state for each partition. + * + * @return The {@link StateFuture} that will return value corresponding to the current input. + */ + StateFuture<UV> asyncGet(UK key); + + /** + * Update the current value associated with the given key asynchronously. When the state is not + * partitioned the value is updated for all inputs in a given operator instance. If state + * partitioning is applied, the updated value depends on the current operator input, as the + * operator maintains an independent state for each partition. + * + * @param key The key that will be updated. + * @param value The new value for the key. + * @return The {@link StateFuture} that will trigger the callback when update finishes. + */ + StateFuture<Void> asyncPut(UK key, UV value); + + /** + * Update all of the mappings from the given map into the state asynchronously. When the state + * is not partitioned the value is updated for all inputs in a given operator instance. If state + * partitioning is applied, the updated mapping depends on the current operator input, as the + * operator maintains an independent state for each partition. + * + * @param map The mappings to be stored in this state. + * @return The {@link StateFuture} that will trigger the callback when update finishes. + */ + StateFuture<Void> asyncPutAll(Map<UK, UV> map); Review Comment: How it behaves if map is null or empty ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
