This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 562b182 [FLINK-23399][state] Add a benchmark for rescaling
562b182 is described below
commit 562b182464cc4da8ee0984527e6e16725b0cfe7c
Author: fredia <[email protected]>
AuthorDate: Fri Mar 18 11:34:07 2022 +0800
[FLINK-23399][state] Add a benchmark for rescaling
---
.../flink-statebackend-rocksdb/pom.xml | 2 +-
.../state/benchmark/RescalingBenchmark.java | 181 +++++++++++++++++++++
.../state/benchmark/RescalingBenchmarkBuilder.java | 94 +++++++++++
.../state/benchmark/RescalingBenchmarkTest.java | 140 ++++++++++++++++
.../util/AbstractStreamOperatorTestHarness.java | 7 +
5 files changed, 423 insertions(+), 1 deletion(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
index 66c061a..6cced59 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -100,7 +100,7 @@ under the License.
<include>**/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest*</include>
<!-- Exporting
RocksDBStateBackendConfigTest$TestOptionsFactory for pyflink tests -->
<include>**/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest*</include>
-
<include>**/org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils*</include>
+
<include>**/org/apache/flink/contrib/streaming/state/benchmark/*</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
</includes>
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
new file mode 100644
index 0000000..62f8a63
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmark.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** The benchmark of rescaling from checkpoint. */
+public class RescalingBenchmark<KEY> {
+ private final int maxParallelism;
+
+ private final int parallelismBefore;
+ private final int parallelismAfter;
+
+ private final int managedMemorySize;
+
+ private final StateBackend stateBackend;
+ private final CheckpointStorageAccess checkpointStorageAccess;
+
+ private OperatorSubtaskState stateForRescaling;
+ private OperatorSubtaskState stateForSubtask;
+ private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
+
+ private final StreamRecordGenerator<KEY> streamRecordGenerator;
+ private final Supplier<KeyedProcessFunction<KEY, KEY, Void>>
stateProcessFunctionSupplier;
+
+ public RescalingBenchmark(
+ final int parallelismBefore,
+ final int parallelismAfter,
+ final int maxParallelism,
+ final int managedMemorySize,
+ final StateBackend stateBackend,
+ final CheckpointStorageAccess checkpointStorageAccess,
+ final StreamRecordGenerator<KEY> streamRecordGenerator,
+ final Supplier<KeyedProcessFunction<KEY, KEY, Void>>
stateProcessFunctionSupplier) {
+ this.parallelismBefore = parallelismBefore;
+ this.parallelismAfter = parallelismAfter;
+ this.maxParallelism = maxParallelism;
+ this.managedMemorySize = managedMemorySize;
+ this.stateBackend = stateBackend;
+ this.checkpointStorageAccess = checkpointStorageAccess;
+ this.streamRecordGenerator = streamRecordGenerator;
+ this.stateProcessFunctionSupplier = stateProcessFunctionSupplier;
+ }
+
+ public void setUp() throws Exception {
+ stateForRescaling = prepareState();
+ }
+
+ public void tearDown() throws IOException {
+ stateForRescaling.discardState();
+ }
+
+ /** rescaling on one subtask, this is the benchmark entrance. */
+ public void rescale() throws Exception {
+ subtaskHarness.initializeState(stateForSubtask);
+ }
+
+ /** close operator of one subtask. */
+ public void closeOperator() throws Exception {
+ subtaskHarness.close();
+ }
+
+ /** prepare state for operator of one subtask. */
+ public void prepareStateForOperator(int subtaskIndex) throws Exception {
+ stateForSubtask =
+ AbstractStreamOperatorTestHarness.repartitionOperatorState(
+ stateForRescaling,
+ maxParallelism,
+ parallelismBefore,
+ parallelismAfter,
+ subtaskIndex);
+ subtaskHarness = getTestHarness(x -> x, maxParallelism,
parallelismAfter, subtaskIndex);
+ subtaskHarness.setStateBackend(stateBackend);
+ subtaskHarness.setup();
+ }
+
+ private OperatorSubtaskState prepareState() throws Exception {
+
+ KeyedOneInputStreamOperatorTestHarness<KEY, KEY, Void>[] harnessBefore
=
+ new KeyedOneInputStreamOperatorTestHarness[parallelismBefore];
+ try {
+ for (int i = 0; i < parallelismBefore; i++) {
+
+ harnessBefore[i] = getTestHarness(x -> x, maxParallelism,
parallelismBefore, i);
+ harnessBefore[i].setStateBackend(stateBackend);
+ harnessBefore[i].setup();
+ harnessBefore[i].open();
+ }
+
+ Iterator<StreamRecord<KEY>> iterator =
streamRecordGenerator.generate();
+ while (iterator.hasNext()) {
+ StreamRecord<KEY> next = iterator.next();
+ int keyGroupIndex =
+
KeyGroupRangeAssignment.assignToKeyGroup(next.getValue(), maxParallelism);
+ int subtaskIndex =
+
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+ maxParallelism, parallelismBefore,
keyGroupIndex);
+ harnessBefore[subtaskIndex].processElement(next);
+ }
+
+ OperatorSubtaskState[] subtaskState = new
OperatorSubtaskState[parallelismBefore];
+
+ for (int i = 0; i < parallelismBefore; i++) {
+ subtaskState[i] = harnessBefore[i].snapshot(0, 1);
+ }
+ return
AbstractStreamOperatorTestHarness.repackageState(subtaskState);
+ } finally {
+ closeHarnessArray(harnessBefore);
+ }
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<KEY, KEY, Void>
getTestHarness(
+ KeySelector<KEY, KEY> keySelector,
+ int maxParallelism,
+ int taskParallelism,
+ int subtaskIdx)
+ throws Exception {
+ MockEnvironment env =
+ new MockEnvironmentBuilder()
+ .setTaskName("RescalingTask")
+ .setManagedMemorySize(managedMemorySize)
+ .setMaxParallelism(maxParallelism)
+ .setParallelism(taskParallelism)
+ .setSubtaskIndex(subtaskIdx)
+ .build();
+ env.setCheckpointStorageAccess(checkpointStorageAccess);
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedProcessOperator<>(stateProcessFunctionSupplier.get()),
+ keySelector,
+ streamRecordGenerator.getTypeInformation(),
+ env);
+ }
+
+ private void closeHarnessArray(KeyedOneInputStreamOperatorTestHarness<?,
?, ?>[] harnessArr)
+ throws Exception {
+ for (KeyedOneInputStreamOperatorTestHarness<?, ?, ?> harness :
harnessArr) {
+ if (harness != null) {
+ harness.close();
+ }
+ }
+ }
+
+ /** To use RescalingBenchmark, need to implement StreamRecordGenerator. */
+ public interface StreamRecordGenerator<T> {
+ Iterator<StreamRecord<T>> generate();
+
+ TypeInformation<T> getTypeInformation();
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkBuilder.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkBuilder.java
new file mode 100644
index 0000000..595123c
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkBuilder.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Supplier;
+
+/** Builder for rescalingBenchmark. */
+public class RescalingBenchmarkBuilder<KEY> {
+ private int maxParallelism = 128;
+ private int parallelismBefore = 2;
+ private int parallelismAfter = 1;
+ private int managedMemorySize = 512 * 1024 * 1024;
+ private StateBackend stateBackend = new EmbeddedRocksDBStateBackend();
+ private RescalingBenchmark.StreamRecordGenerator<KEY>
streamRecordGenerator;
+ private Supplier<KeyedProcessFunction<KEY, KEY, Void>>
stateProcessFunctionSupplier;
+ private CheckpointStorageAccess checkpointStorageAccess;
+
+ public RescalingBenchmarkBuilder<KEY> setMaxParallelism(int
maxParallelism) {
+ this.maxParallelism = maxParallelism;
+ return this;
+ }
+
+ public RescalingBenchmarkBuilder<KEY> setParallelismBefore(int
parallelismBefore) {
+ this.parallelismBefore = parallelismBefore;
+ return this;
+ }
+
+ public RescalingBenchmarkBuilder<KEY> setParallelismAfter(int
parallelismAfter) {
+ this.parallelismAfter = parallelismAfter;
+ return this;
+ }
+
+ public RescalingBenchmarkBuilder<KEY> setManagedMemorySize(int
managedMemorySize) {
+ this.managedMemorySize = managedMemorySize;
+ return this;
+ }
+
+ public RescalingBenchmarkBuilder<KEY> setStateBackend(StateBackend
stateBackend) {
+ this.stateBackend = stateBackend;
+ return this;
+ }
+
+ public RescalingBenchmarkBuilder<KEY> setStreamRecordGenerator(
+ RescalingBenchmark.StreamRecordGenerator<KEY> generator) {
+ this.streamRecordGenerator = generator;
+ return this;
+ }
+
+ public RescalingBenchmarkBuilder<KEY> setStateProcessFunctionSupplier(
+ Supplier<KeyedProcessFunction<KEY, KEY, Void>> supplier) {
+ this.stateProcessFunctionSupplier = supplier;
+ return this;
+ }
+
+ public RescalingBenchmarkBuilder<KEY> setCheckpointStorageAccess(
+ CheckpointStorageAccess checkpointStorageAccess) {
+ this.checkpointStorageAccess = checkpointStorageAccess;
+ return this;
+ }
+
+ public RescalingBenchmark<KEY> build() {
+ return new RescalingBenchmark<KEY>(
+ parallelismBefore,
+ parallelismAfter,
+ maxParallelism,
+ managedMemorySize,
+ stateBackend,
+ Preconditions.checkNotNull(checkpointStorageAccess),
+ Preconditions.checkNotNull(streamRecordGenerator),
+ Preconditions.checkNotNull(stateProcessFunctionSupplier));
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
new file mode 100644
index 0000000..239cccf
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RescalingBenchmarkTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Iterator;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Test Rescaling benchmark. */
+public class RescalingBenchmarkTest extends TestLogger {
+
+ @ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
+
+ @Test
+ public void testScalingOut() throws Exception {
+
+ RescalingBenchmark<Integer> benchmark =
+ new RescalingBenchmarkBuilder<Integer>()
+ .setMaxParallelism(128)
+ .setParallelismBefore(1)
+ .setParallelismAfter(2)
+ .setManagedMemorySize(512 * 1024 * 1024)
+ .setCheckpointStorageAccess(
+ new
FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+ .createCheckpointStorage(new JobID()))
+ .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+ .setStreamRecordGenerator(new IntegerRecordGenerator())
+
.setStateProcessFunctionSupplier(TestKeyedFunction::new)
+ .build();
+ benchmark.setUp();
+ benchmark.prepareStateForOperator(0);
+ benchmark.rescale();
+ benchmark.closeOperator();
+ benchmark.tearDown();
+ }
+
+ @Test
+ public void testScalingIn() throws Exception {
+ RescalingBenchmark<Integer> benchmark =
+ new RescalingBenchmarkBuilder<Integer>()
+ .setMaxParallelism(128)
+ .setParallelismBefore(2)
+ .setParallelismAfter(1)
+ .setManagedMemorySize(512 * 1024 * 1024)
+ .setCheckpointStorageAccess(
+ new
FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())
+ .createCheckpointStorage(new JobID()))
+ .setStateBackend(new EmbeddedRocksDBStateBackend(true))
+ .setStreamRecordGenerator(new IntegerRecordGenerator())
+
.setStateProcessFunctionSupplier(TestKeyedFunction::new)
+ .build();
+ benchmark.setUp();
+ benchmark.prepareStateForOperator(0);
+ benchmark.rescale();
+ benchmark.closeOperator();
+ benchmark.tearDown();
+ }
+
+ private static class IntegerRecordGenerator
+ implements RescalingBenchmark.StreamRecordGenerator<Integer> {
+ private final int numberOfKeys = 1000;
+ private int count = 0;
+
+ @Override
+ public Iterator<StreamRecord<Integer>> generate() {
+ return new Iterator<StreamRecord<Integer>>() {
+ @Override
+ public boolean hasNext() {
+ return count < numberOfKeys;
+ }
+
+ @Override
+ public StreamRecord<Integer> next() {
+ count += 1;
+ return new
StreamRecord<>(ThreadLocalRandom.current().nextInt(), 0);
+ }
+ };
+ }
+
+ @Override
+ public TypeInformation getTypeInformation() {
+ return BasicTypeInfo.INT_TYPE_INFO;
+ }
+ }
+
+ private static class TestKeyedFunction extends
KeyedProcessFunction<Integer, Integer, Void> {
+
+ private static final long serialVersionUID = 1L;
+ private ValueState<Integer> randomState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ randomState =
+ this.getRuntimeContext()
+ .getState(new
ValueStateDescriptor<>("RandomState", Integer.class));
+ }
+
+ @Override
+ public void processElement(
+ Integer value,
+ KeyedProcessFunction<Integer, Integer, Void>.Context ctx,
+ Collector<Void> out)
+ throws Exception {
+ randomState.update(ThreadLocalRandom.current().nextInt());
+ }
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2b262b1..86f5c0a 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -305,6 +305,13 @@ public class AbstractStreamOperatorTestHarness<OUT>
implements AutoCloseable {
this.taskMailbox = new TaskMailboxImpl();
+ // TODO remove this once we introduce
AbstractStreamOperatorTestHarnessBuilder.
+ try {
+ this.checkpointStorageAccess =
environment.getCheckpointStorageAccess();
+ } catch (NullPointerException | UnsupportedOperationException e) {
+ // cannot get checkpoint storage from environment, use default one.
+ }
+
mockTask =
new MockStreamTaskBuilder(env)
.setCheckpointLock(checkpointLock)