This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch 32506/wm-agg-benchmark in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit 1c031915d5b35ef65a07e89db47e3a60132ea775 Author: fanrui <[email protected]> AuthorDate: Sat Jul 1 01:34:12 2023 +0800 [FLINK-32506][connectors/common] Add the benchmark for watermark aggregation --- .../benchmark/WatermarkAggregationBenchmark.java | 75 ++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java b/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java new file mode 100644 index 0000000..6707d95 --- /dev/null +++ b/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.benchmark; + +import org.apache.flink.runtime.source.coordinator.SourceCoordinatorAlignmentBenchmark; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +/** The watermark aggregation benchmark for source coordinator when enabling the watermark alignment. */ +@BenchmarkMode(Mode.AverageTime) +public class WatermarkAggregationBenchmark extends BenchmarkBase { + + @Param({"5000", "10000", "20000"}) + public int numSubtasks; + + @Param({"MONOTONE_INCREASING", "RANDOMIZE_MILLISECONDS"}) + public SourceCoordinatorAlignmentBenchmark.TimestampType timestampType; + + private SourceCoordinatorAlignmentBenchmark benchmark; + + public static void main(String[] args) throws RunnerException { + Options options = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + WatermarkAggregationBenchmark.class.getCanonicalName() + ".*") + .build(); + + new Runner(options).run(); + } + + @Setup(Level.Trial) + public void setup() throws Exception { + benchmark = new SourceCoordinatorAlignmentBenchmark(); + benchmark.setup(numSubtasks, timestampType); + } + + @Benchmark + public void aggregateWatermark() { + benchmark.sendReportedWatermarkToAllSubtasks(); + } + + @TearDown(Level.Trial) + public void teardown() throws Exception { + benchmark.teardown(); + } + +}
