fredia commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1554800173
########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.core.state.StateFutureImpl; + +/** + * A state future that holds the {@link RecordContext} and maintains the reference count of it. The + * reason why we maintain the reference here is that the ContextStateFutureImpl can be created + * multiple times since user may chain their code wildly, some of which are only for internal usage + * (See {@link StateFutureImpl}). So maintaining reference counting by the lifecycle of state future + * is relatively simple and less error-prone. + * + * <p>Reference counting added on {@link RecordContext} follows: + * <li>1. +1 when this future created. + * <li>2. -1 when future completed. + * <li>3. +1 when callback registered. + * <li>4. -1 when callback finished. + */ +public class ContextStateFutureImpl<T> extends StateFutureImpl<T> { + + private final RecordContext<?, ?> recordContext; + + ContextStateFutureImpl(CallbackRunner callbackRunner, RecordContext<?, ?> recordContext) { + super(callbackRunner); + this.recordContext = recordContext; + // When state request submitted, ref count +1, as described in FLIP-425: + // To cover the statements without a callback, in addition to the reference count marked + // in Fig.5, each state request itself is also protected by a paired reference count. + recordContext.retain(); + } + + @Override + public <A> StateFutureImpl<A> makeNewStateFuture() { + return new ContextStateFutureImpl<>(callbackRunner, recordContext); + } + + @Override + public void callbackRegistered() { + // When a callback registered, as shown in Fig.5 of FLIP-425, at the point of 3 and 5, the + // ref count -1. Review Comment: +1? ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param <K> Type of partitioned key. + * @param <IN> Type of input of this request. + * @param <OUT> Type of value that request will return. + */ +public class StateRequest<K, IN, OUT> { + + /** The type of processing request. */ + public enum RequestType { + /** Process one record without state access. */ + SYNC, + /** Get from one {@link State}. */ + GET, + /** Put to one {@link State}. */ + PUT, + /** Merge value to an exist key in {@link State}. Mainly used for listState. */ + MERGE, + /** Delete from one {@link State}. */ + DELETE + } + + /** The underlying state to be accessed, can be empty for {@link RequestType#SYNC}. */ + @Nullable private final State state; Review Comment: Should the type of `state` be restricted to `org.apache.flink.api.common.state.v2.State`? IIUC, this underlying state is a lower-level state that directly interacts with the KV engine(e.g. ForSt/RocksDB) , and `org.apache.flink.api.common.state.v2.State` is a user-level interface. WDYT? ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.core.state.StateFutureImpl; + +/** + * A state future that holds the {@link RecordContext} and maintains the reference count of it. The + * reason why we maintain the reference here is that the ContextStateFutureImpl can be created + * multiple times since user may chain their code wildly, some of which are only for internal usage + * (See {@link StateFutureImpl}). So maintaining reference counting by the lifecycle of state future + * is relatively simple and less error-prone. + * + * <p>Reference counting added on {@link RecordContext} follows: + * <li>1. +1 when this future created. + * <li>2. -1 when future completed. + * <li>3. +1 when callback registered. + * <li>4. -1 when callback finished. + */ +public class ContextStateFutureImpl<T> extends StateFutureImpl<T> { + + private final RecordContext<?, ?> recordContext; + + ContextStateFutureImpl(CallbackRunner callbackRunner, RecordContext<?, ?> recordContext) { + super(callbackRunner); + this.recordContext = recordContext; + // When state request submitted, ref count +1, as described in FLIP-425: + // To cover the statements without a callback, in addition to the reference count marked + // in Fig.5, each state request itself is also protected by a paired reference count. + recordContext.retain(); + } + + @Override + public <A> StateFutureImpl<A> makeNewStateFuture() { + return new ContextStateFutureImpl<>(callbackRunner, recordContext); + } + + @Override + public void callbackRegistered() { + // When a callback registered, as shown in Fig.5 of FLIP-425, at the point of 3 and 5, the + // ref count -1. + recordContext.retain(); + } + + @Override + public void postComplete() { Review Comment: -1? ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.state.InternalStateFuture; + +/** + * An internal factory for {@link InternalStateFuture} that build future with necessary context + * switch and wired with mailbox executor. + */ +public class StateFutureFactory<R, K> { Review Comment: Mark `@Internal`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
