This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 562b182  [FLINK-23399][state] Add a benchmark for rescaling
562b182 is described below

commit 562b182464cc4da8ee0984527e6e16725b0cfe7c
Author: fredia <[email protected]>
AuthorDate: Fri Mar 18 11:34:07 2022 +0800

    [FLINK-23399][state] Add a benchmark for rescaling
---
 .../flink-statebackend-rocksdb/pom.xml             |   2 +-
 .../state/benchmark/RescalingBenchmark.java        | 181 +++++++++++++++++++++
 .../state/benchmark/RescalingBenchmarkBuilder.java |  94 +++++++++++
 .../state/benchmark/RescalingBenchmarkTest.java    | 140 ++++++++++++++++
 .../util/AbstractStreamOperatorTestHarness.java    |   7 +
 5 files changed, 423 insertions(+), 1 deletion(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml 
b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
index 66c061a..6cced59 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -100,7 +100,7 @@ under the License.
                                                                
<include>**/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest*</include>
                                                                <!-- Exporting 
RocksDBStateBackendConfigTest$TestOptionsFactory for pyflink tests -->
                                                                
<include>**/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest*</include>
-                                                               
<include>**/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils*</include>
+                                                               
<include>**/org/apache/flink/contrib/streaming/state/benchmark/*</include>
                                                                
<include>META-INF/LICENSE</include>
                                                                
<include>META-INF/NOTICE</include>
                                                        </includes>
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
new file mode 100644
index 0000000..62f8a63
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from checkpoint. */
+public class RescalingBenchmark<KEY> {
+    private final int maxParallelism;
+
+    private final int parallelismBefore;
+    private final int parallelismAfter;
+
+    private final int managedMemorySize;
+
+    private final StateBackend stateBackend;
+    private final CheckpointStorageAccess checkpointStorageAccess;
+
+    private OperatorSubtaskState stateForRescaling;
+    private OperatorSubtaskState stateForSubtask;
+    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+    private final StreamRecordGenerator<KEY> streamRecordGenerator;
+    private final Supplier<KeyedProcessFunction<KEY, KEY, Void>> 
stateProcessFunctionSupplier;
+
+    public RescalingBenchmark(
+            final int parallelismBefore,
+            final int parallelismAfter,
+            final int maxParallelism,
+            final int managedMemorySize,
+            final StateBackend stateBackend,
+            final CheckpointStorageAccess checkpointStorageAccess,
+            final StreamRecordGenerator<KEY> streamRecordGenerator,
+            final Supplier<KeyedProcessFunction<KEY, KEY, Void>> 
stateProcessFunctionSupplier) {
+        this.parallelismBefore = parallelismBefore;
+        this.parallelismAfter = parallelismAfter;
+        this.maxParallelism = maxParallelism;
+        this.managedMemorySize = managedMemorySize;
+        this.stateBackend = stateBackend;
+        this.checkpointStorageAccess = checkpointStorageAccess;
+        this.streamRecordGenerator = streamRecordGenerator;
+        this.stateProcessFunctionSupplier = stateProcessFunctionSupplier;
+    }
+
+    public void setUp() throws Exception {
+        stateForRescaling = prepareState();
+    }
+
+    public void tearDown() throws IOException {
+        stateForRescaling.discardState();
+    }
+
+    /** rescaling on one subtask, this is the benchmark entrance. */
+    public void rescale() throws Exception {
+        subtaskHarness.initializeState(stateForSubtask);
+    }
+
+    /** close operator of one subtask. */
+    public void closeOperator() throws Exception {
+        subtaskHarness.close();
+    }
+
+    /** prepare state for operator of one subtask. */
+    public void prepareStateForOperator(int subtaskIndex) throws Exception {
+        stateForSubtask =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        stateForRescaling,
+                        maxParallelism,
+                        parallelismBefore,
+                        parallelismAfter,
+                        subtaskIndex);
+        subtaskHarness = getTestHarness(x -> x, maxParallelism, 
parallelismAfter, subtaskIndex);
+        subtaskHarness.setStateBackend(stateBackend);
+        subtaskHarness.setup();
+    }
+
+    private OperatorSubtaskState prepareState() throws Exception {
+
+        KeyedOneInputStreamOperatorTestHarness<KEY, KEY, Void>[] harnessBefore 
=
+                new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+        try {
+            for (int i = 0; i < parallelismBefore; i++) {
+
+                harnessBefore[i] = getTestHarness(x -> x, maxParallelism, 
parallelismBefore, i);
+                harnessBefore[i].setStateBackend(stateBackend);
+                harnessBefore[i].setup();
+                harnessBefore[i].open();
+            }
+
+            Iterator<StreamRecord<KEY>> iterator = 
streamRecordGenerator.generate();
+            while (iterator.hasNext()) {
+                StreamRecord<KEY> next = iterator.next();
+                int keyGroupIndex =
+                        
KeyGroupRangeAssignment.assignToKeyGroup(next.getValue(), maxParallelism);
+                int subtaskIndex =
+                        
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                maxParallelism, parallelismBefore, 
keyGroupIndex);
+                harnessBefore[subtaskIndex].processElement(next);
+            }
+
+            OperatorSubtaskState[] subtaskState = new 
OperatorSubtaskState[parallelismBefore];
+
+            for (int i = 0; i < parallelismBefore; i++) {
+                subtaskState[i] = harnessBefore[i].snapshot(0, 1);
+            }
+            return 
AbstractStreamOperatorTestHarness.repackageState(subtaskState);
+        } finally {
+            closeHarnessArray(harnessBefore);
+        }
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<KEY, KEY, Void> 
getTestHarness(
+            KeySelector<KEY, KEY> keySelector,
+            int maxParallelism,
+            int taskParallelism,
+            int subtaskIdx)
+            throws Exception {
+        MockEnvironment env =
+                new MockEnvironmentBuilder()
+                        .setTaskName("RescalingTask")
+                        .setManagedMemorySize(managedMemorySize)
+                        .setMaxParallelism(maxParallelism)
+                        .setParallelism(taskParallelism)
+                        .setSubtaskIndex(subtaskIdx)
+                        .build();
+        env.setCheckpointStorageAccess(checkpointStorageAccess);
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(stateProcessFunctionSupplier.get()),
+                keySelector,
+                streamRecordGenerator.getTypeInformation(),
+                env);
+    }
+
+    private void closeHarnessArray(KeyedOneInputStreamOperatorTestHarness<?, 
?, ?>[] harnessArr)
+            throws Exception {
+        for (KeyedOneInputStreamOperatorTestHarness<?, ?, ?> harness : 
harnessArr) {
+            if (harness != null) {
+                harness.close();
+            }
+        }
+    }
+
+    /** To use RescalingBenchmark, need to implement StreamRecordGenerator. */
+    public interface StreamRecordGenerator<T> {
+        Iterator<StreamRecord<T>> generate();
+
+        TypeInformation<T> getTypeInformation();
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkBuilder.java
new file mode 100644
index 0000000..595123c
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkBuilder.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Supplier;
+
+/** Builder for rescalingBenchmark. */
+public class RescalingBenchmarkBuilder<KEY> {
+    private int maxParallelism = 128;
+    private int parallelismBefore = 2;
+    private int parallelismAfter = 1;
+    private int managedMemorySize = 512 * 1024 * 1024;
+    private StateBackend stateBackend = new EmbeddedRocksDBStateBackend();
+    private RescalingBenchmark.StreamRecordGenerator<KEY> 
streamRecordGenerator;
+    private Supplier<KeyedProcessFunction<KEY, KEY, Void>> 
stateProcessFunctionSupplier;
+    private CheckpointStorageAccess checkpointStorageAccess;
+
+    public RescalingBenchmarkBuilder<KEY> setMaxParallelism(int 
maxParallelism) {
+        this.maxParallelism = maxParallelism;
+        return this;
+    }
+
+    public RescalingBenchmarkBuilder<KEY> setParallelismBefore(int 
parallelismBefore) {
+        this.parallelismBefore = parallelismBefore;
+        return this;
+    }
+
+    public RescalingBenchmarkBuilder<KEY> setParallelismAfter(int 
parallelismAfter) {
+        this.parallelismAfter = parallelismAfter;
+        return this;
+    }
+
+    public RescalingBenchmarkBuilder<KEY> setManagedMemorySize(int 
managedMemorySize) {
+        this.managedMemorySize = managedMemorySize;
+        return this;
+    }
+
+    public RescalingBenchmarkBuilder<KEY> setStateBackend(StateBackend 
stateBackend) {
+        this.stateBackend = stateBackend;
+        return this;
+    }
+
+    public RescalingBenchmarkBuilder<KEY> setStreamRecordGenerator(
+            RescalingBenchmark.StreamRecordGenerator<KEY> generator) {
+        this.streamRecordGenerator = generator;
+        return this;
+    }
+
+    public RescalingBenchmarkBuilder<KEY> setStateProcessFunctionSupplier(
+            Supplier<KeyedProcessFunction<KEY, KEY, Void>> supplier) {
+        this.stateProcessFunctionSupplier = supplier;
+        return this;
+    }
+
+    public RescalingBenchmarkBuilder<KEY> setCheckpointStorageAccess(
+            CheckpointStorageAccess checkpointStorageAccess) {
+        this.checkpointStorageAccess = checkpointStorageAccess;
+        return this;
+    }
+
+    public RescalingBenchmark<KEY> build() {
+        return new RescalingBenchmark<KEY>(
+                parallelismBefore,
+                parallelismAfter,
+                maxParallelism,
+                managedMemorySize,
+                stateBackend,
+                Preconditions.checkNotNull(checkpointStorageAccess),
+                Preconditions.checkNotNull(streamRecordGenerator),
+                Preconditions.checkNotNull(stateProcessFunctionSupplier));
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
new file mode 100644
index 0000000..239cccf
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Iterator;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+    @Test
+    public void testScalingOut() throws Exception {
+
+        RescalingBenchmark<Integer> benchmark =
+                new RescalingBenchmarkBuilder<Integer>()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(1)
+                        .setParallelismAfter(2)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new 
FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        
.setStateProcessFunctionSupplier(TestKeyedFunction::new)
+                        .build();
+        benchmark.setUp();
+        benchmark.prepareStateForOperator(0);
+        benchmark.rescale();
+        benchmark.closeOperator();
+        benchmark.tearDown();
+    }
+
+    @Test
+    public void testScalingIn() throws Exception {
+        RescalingBenchmark<Integer> benchmark =
+                new RescalingBenchmarkBuilder<Integer>()
+                        .setMaxParallelism(128)
+                        .setParallelismBefore(2)
+                        .setParallelismAfter(1)
+                        .setManagedMemorySize(512 * 1024 * 1024)
+                        .setCheckpointStorageAccess(
+                                new 
FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+                                        .createCheckpointStorage(new JobID()))
+                        .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+                        .setStreamRecordGenerator(new IntegerRecordGenerator())
+                        
.setStateProcessFunctionSupplier(TestKeyedFunction::new)
+                        .build();
+        benchmark.setUp();
+        benchmark.prepareStateForOperator(0);
+        benchmark.rescale();
+        benchmark.closeOperator();
+        benchmark.tearDown();
+    }
+
+    private static class IntegerRecordGenerator
+            implements RescalingBenchmark.StreamRecordGenerator<Integer> {
+        private final int numberOfKeys = 1000;
+        private int count = 0;
+
+        @Override
+        public Iterator<StreamRecord<Integer>> generate() {
+            return new Iterator<StreamRecord<Integer>>() {
+                @Override
+                public boolean hasNext() {
+                    return count < numberOfKeys;
+                }
+
+                @Override
+                public StreamRecord<Integer> next() {
+                    count += 1;
+                    return new 
StreamRecord<>(ThreadLocalRandom.current().nextInt(), 0);
+                }
+            };
+        }
+
+        @Override
+        public TypeInformation getTypeInformation() {
+            return BasicTypeInfo.INT_TYPE_INFO;
+        }
+    }
+
+    private static class TestKeyedFunction extends 
KeyedProcessFunction<Integer, Integer, Void> {
+
+        private static final long serialVersionUID = 1L;
+        private ValueState<Integer> randomState;
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            randomState =
+                    this.getRuntimeContext()
+                            .getState(new 
ValueStateDescriptor<>("RandomState", Integer.class));
+        }
+
+        @Override
+        public void processElement(
+                Integer value,
+                KeyedProcessFunction<Integer, Integer, Void>.Context ctx,
+                Collector<Void> out)
+                throws Exception {
+            randomState.update(ThreadLocalRandom.current().nextInt());
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2b262b1..86f5c0a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -305,6 +305,13 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
 
         this.taskMailbox = new TaskMailboxImpl();
 
+        // TODO remove this once we introduce 
AbstractStreamOperatorTestHarnessBuilder.
+        try {
+            this.checkpointStorageAccess = 
environment.getCheckpointStorageAccess();
+        } catch (NullPointerException | UnsupportedOperationException e) {
+            // cannot get checkpoint storage from environment, use default one.
+        }
+
         mockTask =
                 new MockStreamTaskBuilder(env)
                         .setCheckpointLock(checkpointLock)

Reply via email to