This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60c771b987af73382f8a41ccf7fecfdbe6dc8b1c
Author: fredia <fredia...@gmail.com>
AuthorDate: Thu Mar 21 17:51:23 2024 +0800

    [FLINK-34986][Runtime/State] Introduce element-control component
---
 .../apache/flink/core/state/StateFutureImpl.java   |   2 +-
 .../taskprocessing/AsyncExecutionController.java   |  80 ++++++
 .../runtime/taskprocessing/KeyAccountingUnit.java  |  82 ++++++
 .../runtime/taskprocessing/OrderPreserveMode.java  |  38 +++
 .../runtime/taskprocessing/ProcessingRequest.java  |  69 +++++
 .../runtime/taskprocessing/RecordContext.java      |  82 ++++++
 .../runtime/taskprocessing/ReferenceCounted.java   |  98 ++++++++
 .../runtime/taskprocessing/StateExecutor.java      |  19 ++
 .../AsyncExecutionControllerTest.java              | 280 +++++++++++++++++++++
 .../taskprocessing/ReferenceCountedTest.java       |  71 ++++++
 .../taskprocessing/keyAccountingUnitTest.java      |  43 ++++
 11 files changed, 863 insertions(+), 1 deletion(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java 
b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
index 3cdc873e789..dfae92c845f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
+++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
@@ -46,7 +46,7 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
     /** The callback runner. */
     CallbackRunner callbackRunner;
 
-    StateFutureImpl(CallbackRunner callbackRunner) {
+    public StateFutureImpl(CallbackRunner callbackRunner) {
         this.completableFuture = new CompletableFuture<>();
         this.callbackRunner = callbackRunner;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java
new file mode 100644
index 00000000000..133e6dcd27a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * The Async Execution Controller (AEC) receives processing requests from 
operators, and put them
+ * into execution according to some strategies.
+ *
+ * <p>It is responsible for:
+ * <li>Preserving the sequence of elements bearing the same key by delaying 
subsequent requests
+ *     until the processing of preceding ones is finalized.
+ * <li>Tracking the in-flight data(records) and blocking the input if too much 
data in flight
+ *     (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause 
current operations,
+ *     allowing for the execution of callbacks (mails in Mailbox).
+ *
+ * @param <R> the type of the record
+ * @param <K> the type of the key
+ */
+@Internal
+public class AsyncExecutionController<R, K> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
+
+    public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+
+    /** The max allow number of in-flight records. */
+    private final int maxInFlightRecordNum;
+
+    /** The key accounting unit which is used to detect the key conflict. */
+    private final KeyAccountingUnit<R, K> keyAccountingUnit;
+
+    /** The mailbox executor, borrowed from {@code StreamTask}. */
+    private final MailboxExecutor mailboxExecutor;
+
+    /** The state executor where the {@link ProcessingRequest} is actually 
executed. */
+    private final StateExecutor stateExecutor;
+
+    public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
+        this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+    }
+
+    public AsyncExecutionController(
+            MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int 
maxInFlightRecords) {
+        this.mailboxExecutor = mailboxExecutor;
+        this.stateExecutor = stateExecutor;
+        this.maxInFlightRecordNum = maxInFlightRecords;
+        this.keyAccountingUnit = new KeyAccountingUnit<>();
+        LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", 
maxInFlightRecords);
+    }
+
+    public <OUT> void handleProcessingRequest(
+            ProcessingRequest<OUT> request, RecordContext<K, R> recordContext) 
{
+        // TODO(implement): preserve key order
+        stateExecutor.executeBatchRequests(Collections.singleton(request));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java
new file mode 100644
index 00000000000..984c53aacb1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskprocessing;
+
+import org.apache.flink.annotation.Internal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Key accounting unit holds the current in-flight key and tracks the 
corresponding ongoing records,
+ * which is used to preserve the ordering of independent chained {@link
+ * org.apache.flink.api.common.state.v2.StateFuture}.
+ *
+ * @param <R> the type of record
+ * @param <K> the type of key
+ */
+@Internal
+public class KeyAccountingUnit<R, K> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KeyAccountingUnit.class);
+
+    /** The in-flight records that are being processed, their keys are 
different from each other. */
+    private final Map<K, R> noConflictInFlightRecords;
+
+    public KeyAccountingUnit() {
+        this.noConflictInFlightRecords = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Check if the record is available for processing. This method should be 
called in main task
+     * thread. For the same record, this method can be reentered.
+     *
+     * @param record the record
+     * @param key the key inside the record
+     * @return true if the key is available
+     */
+    public boolean available(R record, K key) {
+        if (noConflictInFlightRecords.containsKey(key)) {
+            return noConflictInFlightRecords.get(key) == record;
+        }
+        return true;
+    }
+
+    /**
+     * Occupy a key for processing, the subsequent records with the same key 
would be blocked until
+     * the previous key release.
+     */
+    public void occupy(R record, K key) {
+        if (!available(record, key)) {
+            throw new IllegalStateException(
+                    String.format("The record %s(%s) is already occupied.", 
record, key));
+        }
+        noConflictInFlightRecords.put(key, record);
+        LOG.trace("occupy key {} for record {}", key, record);
+    }
+
+    /** Release a key, which is invoked when a {@link RecordContext} is 
released. */
+    public void release(R record, K key) {
+        R existingRecord = noConflictInFlightRecords.remove(key);
+        LOG.trace("release key {} for record {}, existing record {}", key, 
record, existingRecord);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java
new file mode 100644
index 00000000000..48ed767b561
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskprocessing;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Enumeration for the execution order under asynchronous state APIs. For 
synchronous state APIs,
+ * the execution order is always {@link #ELEMENT_ORDER}. {@link #STATE_ORDER} 
generally has higher
+ * performance than {@link #ELEMENT_ORDER}. Note: {@link #STATE_ORDER} is an 
advance option, please
+ * make sure you are aware of possible out-of-order situations under 
asynchronous state APIs.
+ */
+@Internal
+public enum OrderPreserveMode {
+    /** The records with same keys are strictly processed in order of arrival. 
*/
+    ELEMENT_ORDER,
+    /**
+     * For same-key records, state requests and subsequent callbacks are 
processed in the order in
+     * which each record makes its first state request. But the code before 
the first state request
+     * for each record can be processed out-of-order with requests from other 
records.
+     */
+    STATE_ORDER
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java
new file mode 100644
index 00000000000..b74ab5e7d73
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+
+import java.util.Optional;
+
+/**
+ * A processing request encapsulates the parameters, the {@link State} to 
access, and the type of
+ * access.
+ *
+ * @param <OUT> Type of value that request will return.
+ */
+public interface ProcessingRequest<OUT> {
+    /** The underlying state to be accessed, can be empty. */
+    Optional<?> getUnderlyingState();
+
+    /** The parameter of the request. */
+    Parameter getParameter();
+
+    /** The future to collect the result of the request. */
+    StateFuture<OUT> getFuture();
+
+    RequestType getRequestType();
+
+    /** The type of processing request. */
+    enum RequestType {
+        /** Process one record without state access. */
+        SYNC,
+        /** Get from one {@link State}. */
+        /** Delete from one {@link State}. */
+        DELETE,
+        GET,
+        /** Put to one {@link State}. */
+        PUT,
+        /** Merge value to an exist key in {@link State}. Mainly used for 
listState. */
+        MERGE
+    }
+
+    /** The parameter of the request. */
+    interface Parameter<K> {
+        /**
+         * The key of one request. Except for requests of {@link 
RequestType#SYNC}, all other
+         * requests should provide a key.
+         */
+        Optional<K> getKey();
+
+        /** The value of one request. */
+        Optional<?> getValue();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java
new file mode 100644
index 00000000000..0375930db7b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskprocessing;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+/**
+ * A context that preserves the necessary variables required by each 
operation, all operations for
+ * one record will share the same element context.
+ *
+ * @param <R> The type of the record that extends {@link
+ *     org.apache.flink.streaming.runtime.streamrecord.StreamElement}. 
TODO(FLIP-409): move
+ *     StreamElement to flink-core or flink-runtime module.
+ * @param <K> The type of the key inside the record.
+ */
+@Internal
+public class RecordContext<R, K> extends ReferenceCounted {
+
+    /** The record to be processed. */
+    private final R record;
+
+    /** The key inside the record. */
+    private final K key;
+
+    public RecordContext(R record, K key) {
+        super(0);
+        this.record = record;
+        this.key = key;
+    }
+
+    public K getKey() {
+        return this.key;
+    }
+
+    @Override
+    protected void referenceCountReachedZero() {
+        // TODO: release internal resources that this record context holds.
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(record, key);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RecordContext<?, ?> that = (RecordContext<?, ?>) o;
+        if (!Objects.equals(record, that.record)) {
+            return false;
+        }
+        return Objects.equals(key, that.key);
+    }
+
+    @Override
+    public String toString() {
+        return "RecordContext{" + "record=" + record + ", key=" + key + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ReferenceCounted.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ReferenceCounted.java
new file mode 100644
index 00000000000..059b826c55d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ReferenceCounted.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.MemoryUtils;
+
+import sun.misc.Unsafe;
+
+/**
+ * An object that can be reference counted, the internal resource would be 
released when the
+ * reference count reaches zero.
+ */
+@Internal
+public abstract class ReferenceCounted {
+
+    /** The "unsafe", which can be used to perform native memory accesses. */
+    @SuppressWarnings({"restriction", "UseOfSunClasses"})
+    private static final Unsafe unsafe = MemoryUtils.UNSAFE;
+
+    private static final long referenceOffset;
+
+    static {
+        try {
+            referenceOffset =
+                    unsafe.objectFieldOffset(
+                            
ReferenceCounted.class.getDeclaredField("referenceCount"));
+        } catch (SecurityException e) {
+            throw new Error(
+                    "Could not get field 'referenceCount' offset in class 
'ReferenceCounted' for unsafe operations, "
+                            + "permission denied by security manager.",
+                    e);
+        } catch (NoSuchFieldException e) {
+            throw new Error(
+                    "Could not get field 'referenceCount' offset in class 
'ReferenceCounted' for unsafe operations",
+                    e);
+        } catch (Throwable t) {
+            throw new Error(
+                    "Could not get field 'referenceCount' offset in class 
'ReferenceCounted' for unsafe operations,"
+                            + " unclassified error",
+                    t);
+        }
+    }
+
+    private volatile int referenceCount;
+
+    public ReferenceCounted(int initReference) {
+        this.referenceCount = initReference;
+    }
+
+    public int retain() {
+        return unsafe.getAndAddInt(this, referenceOffset, 1) + 1;
+    }
+
+    /**
+     * Try to retain this object. Fail if reference count is already zero.
+     *
+     * @return zero if failed, otherwise current reference count.
+     */
+    public int tryRetain() {
+        int v;
+        do {
+            v = unsafe.getIntVolatile(this, referenceOffset);
+        } while (v != 0 && !unsafe.compareAndSwapInt(this, referenceOffset, v, 
v + 1));
+        return v == 0 ? 0 : v + 1;
+    }
+
+    public int release() {
+        int r = unsafe.getAndAddInt(this, referenceOffset, -1) - 1;
+        if (r == 0) {
+            referenceCountReachedZero();
+        }
+        return r;
+    }
+
+    public int getReferenceCount() {
+        return referenceCount;
+    }
+
+    /** A method called when the reference count reaches zero. */
+    protected abstract void referenceCountReachedZero();
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/StateExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/StateExecutor.java
new file mode 100644
index 00000000000..21ee33aaf25
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/StateExecutor.java
@@ -0,0 +1,19 @@
+package org.apache.flink.runtime.taskprocessing;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Executor for executing batch {@link ProcessingRequest}s.
+ *
+ * @param <K> the type of key.
+ */
+public interface StateExecutor {
+    /**
+     * Execute a batch of state requests.
+     *
+     * @param processingRequests the given batch of processing requests
+     * @return A future can determine whether execution has completed.
+     */
+    CompletableFuture<Boolean> executeBatchRequests(
+            Iterable<ProcessingRequest<?>> processingRequests);
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionControllerTest.java
new file mode 100644
index 00000000000..f64120e7276
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionControllerTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskprocessing;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.core.state.StateFutureImpl;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
+import org.apache.flink.runtime.taskprocessing.ProcessingRequest.RequestType;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link AsyncExecutionController}. */
+class AsyncExecutionControllerTest {
+
+    // todo(20240330): this test is not completed, cause the order 
preservation is not implemented
+    // yet, just for illustrating the interaction between AEC and Async state 
API.
+    @Test
+    void testStateOrder() {
+        AsyncExecutionController aec =
+                new AsyncExecutionController<>(
+                        new SyncMailboxExecutor(), new TestStateExecutor(), 3);
+        TestUnderlyingState underlyingState = new TestUnderlyingState();
+        TestValueState valueState = new TestValueState(aec, underlyingState);
+        AtomicInteger output = new AtomicInteger();
+        Consumer<Void> userCode =
+                empty ->
+                        valueState
+                                .asyncValue()
+                                .thenAccept(
+                                        val -> {
+                                            if (val == null) {
+                                                valueState
+                                                        .asyncUpdate(1)
+                                                        .thenAccept(o -> 
output.set(1));
+                                            } else {
+                                                valueState
+                                                        .asyncUpdate(val + 1)
+                                                        .thenAccept(o -> 
output.set(val + 1));
+                                            }
+                                        });
+
+        // ============================ element1 ============================
+        String record1 = "key1-r1";
+        String key1 = "key1";
+        // Simulate the wrapping in {@link 
RecordProcessorUtils#getRecordProcessor()}, wrapping the
+        // record and key with RecordContext.
+        RecordContext<String, String> recordContext1 = new 
RecordContext<>(record1, key1);
+        valueState.setCurrentRecordCtx(recordContext1);
+        // execute user code
+        userCode.accept(null);
+        recordContext1.release();
+        assertThat(output.get()).isEqualTo(1);
+
+        // ============================ element2 ============================
+        String record2 = "key1-r2";
+        String key2 = "key1";
+        RecordContext<String, String> recordContext2 = new 
RecordContext<>(record2, key2);
+        valueState.setCurrentRecordCtx(recordContext2);
+        // execute user code
+        userCode.accept(null);
+        recordContext2.release();
+        assertThat(output.get()).isEqualTo(2);
+
+        // ============================ element3 ============================
+        String record3 = "key3-r3";
+        String key3 = "key3";
+        RecordContext<String, String> recordContext3 = new 
RecordContext<>(record3, key3);
+        valueState.setCurrentRecordCtx(recordContext3);
+        // execute user code
+        userCode.accept(null);
+        recordContext3.release();
+        assertThat(output.get()).isEqualTo(1);
+    }
+
+    class TestRequestParameter implements ProcessingRequest.Parameter<String> {
+        private String key;
+        private Integer value;
+
+        public TestRequestParameter(String key) {
+            this(key, null);
+        }
+
+        public TestRequestParameter(String key, Integer value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public Optional<String> getKey() {
+            return key == null ? Optional.empty() : Optional.of(key);
+        }
+
+        @Override
+        public Optional<?> getValue() {
+            return value == null ? Optional.empty() : Optional.of(value);
+        }
+    }
+
+    /** Simulate the underlying state that is actually used to execute the 
request. */
+    class TestUnderlyingState {
+
+        private HashMap<String, Integer> hashMap;
+
+        public TestUnderlyingState() {
+            this.hashMap = new HashMap<>();
+        }
+
+        public Integer get(String key) {
+            return hashMap.get(key);
+        }
+
+        public void update(String key, Integer val) {
+            hashMap.put(key, val);
+        }
+    }
+
+    class TestProcessingRequest<OUT> implements ProcessingRequest<OUT> {
+
+        private TestUnderlyingState underlyingState;
+
+        private TestRequestParameter requestParameter;
+
+        private RequestType requestType;
+
+        private InternalStateFuture<OUT> stateFuture;
+
+        public TestProcessingRequest(
+                TestUnderlyingState underlyingState,
+                TestRequestParameter parameter,
+                RequestType requestType,
+                InternalStateFuture<OUT> stateFuture) {
+            this.underlyingState = underlyingState;
+            this.requestParameter = parameter;
+            this.requestType = requestType;
+            this.stateFuture = stateFuture;
+        }
+
+        @Override
+        public Optional<TestUnderlyingState> getUnderlyingState() {
+            if (requestType == RequestType.SYNC) {
+                return Optional.empty();
+            }
+            return Optional.of(underlyingState);
+        }
+
+        @Override
+        public Parameter getParameter() {
+            return requestParameter;
+        }
+
+        @Override
+        public StateFuture<OUT> getFuture() {
+            return stateFuture;
+        }
+
+        @Override
+        public RequestType getRequestType() {
+            return requestType;
+        }
+    }
+
+    class TestValueState implements ValueState<Integer> {
+
+        private AsyncExecutionController asyncExecutionController;
+
+        private TestUnderlyingState underlyingState;
+
+        private StateFutureImpl.CallbackRunner runner = Runnable::run;
+
+        private RecordContext<String, String> currentRecordCtx;
+
+        public TestValueState(AsyncExecutionController aec, 
TestUnderlyingState underlyingState) {
+            this.asyncExecutionController = aec;
+            this.underlyingState = underlyingState;
+        }
+
+        @Override
+        public StateFuture<Void> asyncClear() {
+            StateFutureImpl<Void> stateFuture = new StateFutureImpl<>(runner);
+            TestRequestParameter parameter = new 
TestRequestParameter(currentRecordCtx.getKey());
+            ProcessingRequest<Void> request =
+                    new TestProcessingRequest<Void>(
+                            underlyingState, parameter, RequestType.DELETE, 
stateFuture);
+            asyncExecutionController.handleProcessingRequest(request, 
currentRecordCtx);
+            return stateFuture;
+        }
+
+        @Override
+        public StateFuture<Integer> asyncValue() {
+            StateFutureImpl<Integer> stateFuture = new 
StateFutureImpl<>(runner);
+            TestRequestParameter parameter = new 
TestRequestParameter(currentRecordCtx.getKey());
+            ProcessingRequest<Integer> request =
+                    new TestProcessingRequest<>(
+                            underlyingState, parameter, RequestType.GET, 
stateFuture);
+            asyncExecutionController.handleProcessingRequest(request, 
currentRecordCtx);
+            return stateFuture;
+        }
+
+        @Override
+        public StateFuture<Void> asyncUpdate(Integer value) {
+            StateFutureImpl<Void> stateFuture = new StateFutureImpl<>(runner);
+            TestRequestParameter parameter =
+                    new TestRequestParameter(currentRecordCtx.getKey(), value);
+            ProcessingRequest<Void> request =
+                    new TestProcessingRequest<Void>(
+                            underlyingState, parameter, RequestType.PUT, 
stateFuture);
+            asyncExecutionController.handleProcessingRequest(request, 
currentRecordCtx);
+            return stateFuture;
+        }
+
+        public void setCurrentRecordCtx(RecordContext<String, String> 
recordCtx) {
+            this.currentRecordCtx = recordCtx;
+        }
+    }
+
+    /**
+     * A brief implementation of {@link StateExecutor}, to illustrate the 
interaction between AEC
+     * and StateExecutor.
+     */
+    class TestStateExecutor implements StateExecutor {
+
+        public TestStateExecutor() {}
+
+        @Override
+        public CompletableFuture<Boolean> executeBatchRequests(
+                Iterable<ProcessingRequest<?>> processingRequests) {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            for (ProcessingRequest request : processingRequests) {
+                if (request.getRequestType() == RequestType.GET) {
+                    
Preconditions.checkState(request.getUnderlyingState().isPresent());
+                    TestUnderlyingState underlyingState =
+                            (TestUnderlyingState) 
request.getUnderlyingState().get();
+                    Integer val =
+                            underlyingState.get(
+                                    ((TestRequestParameter) 
request.getParameter()).getKey().get());
+                    ((StateFutureImpl<Integer>) 
request.getFuture()).complete(val);
+                } else if (request.getRequestType() == RequestType.PUT) {
+                    
Preconditions.checkState(request.getUnderlyingState().isPresent());
+                    TestUnderlyingState underlyingState =
+                            (TestUnderlyingState) 
request.getUnderlyingState().get();
+                    TestRequestParameter parameter = (TestRequestParameter) 
request.getParameter();
+                    underlyingState.update(
+                            parameter.getKey().get(), (Integer) 
parameter.getValue().get());
+                    ((StateFutureImpl<Void>) 
request.getFuture()).complete(null);
+                } else {
+                    throw new UnsupportedOperationException("Unsupported 
request type");
+                }
+            }
+            future.complete(true);
+            return future;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/ReferenceCountedTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/ReferenceCountedTest.java
new file mode 100644
index 00000000000..03075759468
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/ReferenceCountedTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskprocessing;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link org.apache.flink.runtime.taskprocessing.ReferenceCounted} 
*/
+class ReferenceCountedTest {
+    @Test
+    void testRefCountReachedZero() {
+        TestReferenceCounted referenceCounted = new TestReferenceCounted();
+        referenceCounted.retain();
+        assertThat(referenceCounted.getReferenceCount()).isEqualTo(1);
+        referenceCounted.release();
+        assertThat(referenceCounted.getReferenceCount()).isEqualTo(0);
+        assertThat(referenceCounted.reachedZero).isTrue();
+    }
+
+    @Test
+    void testConcurrency() throws InterruptedException {
+        TestReferenceCounted referenceCounted = new TestReferenceCounted();
+        List<Thread> threads = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            Thread thread = new Thread(() -> referenceCounted.retain());
+            thread.start();
+            threads.add(thread);
+        }
+        for (int i = 0; i < 5; i++) {
+            Thread thread = new Thread(() -> referenceCounted.release());
+            thread.start();
+            threads.add(thread);
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+        assertThat(referenceCounted.getReferenceCount()).isEqualTo(0);
+    }
+
+    private class TestReferenceCounted extends ReferenceCounted {
+        private boolean reachedZero = false;
+
+        public TestReferenceCounted() {
+            super(0);
+        }
+
+        @Override
+        protected void referenceCountReachedZero() {
+            reachedZero = true;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/keyAccountingUnitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/keyAccountingUnitTest.java
new file mode 100644
index 00000000000..87693631dde
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/keyAccountingUnitTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskprocessing;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link KeyAccountingUnit}. */
+class keyAccountingUnitTest {
+
+    @Test
+    void testBasic() {
+        KeyAccountingUnit<String, Integer> keyAccountingUnit = new 
KeyAccountingUnit<>();
+        assertThat(keyAccountingUnit.available("record1", 1)).isTrue();
+        keyAccountingUnit.occupy("record1", 1);
+        assertThat(keyAccountingUnit.available("record1", 1)).isTrue();
+        assertThat(keyAccountingUnit.available("record2", 2)).isTrue();
+        assertThat(keyAccountingUnit.available("record3", 1)).isFalse();
+        assertThatThrownBy(() -> keyAccountingUnit.occupy("record3", 1))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("The record record3(1) is already occupied.");
+        keyAccountingUnit.release("record1", 1);
+        assertThat(keyAccountingUnit.available("record2", 1)).isTrue();
+    }
+}


Reply via email to