This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 363731ebcb8 Add KafkaIO Stress test (#30467)
363731ebcb8 is described below
commit 363731ebcb8e6b8932d97c353348106c5619ae2e
Author: akashorabek <[email protected]>
AuthorDate: Thu Mar 7 04:13:12 2024 +0600
Add KafkaIO Stress test (#30467)
* Add KafkaIO Stress test
* run precommit tasks
* add empty line to build.gradle
---
it/google-cloud-platform/build.gradle | 3 +-
it/kafka/build.gradle | 6 +
.../java/org/apache/beam/it/kafka/KafkaIOST.java | 480 +++++++++++++++++++++
3 files changed, 488 insertions(+), 1 deletion(-)
diff --git a/it/google-cloud-platform/build.gradle
b/it/google-cloud-platform/build.gradle
index 681b034adbf..9717b5f8c84 100644
--- a/it/google-cloud-platform/build.gradle
+++ b/it/google-cloud-platform/build.gradle
@@ -84,7 +84,8 @@ dependencies {
tasks.register("GCSPerformanceTest",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform',
'FileBasedIOLT', ['configuration':'large','project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
tasks.register("BigTablePerformanceTest",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform',
'BigTableIOLT', ['configuration':'large','project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
tasks.register("BigQueryPerformanceTest",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform',
'BigQueryIOLT', ['configuration':'medium','project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
-tasks.register("BigQueryStressTest",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform',
'BigQueryIOST', ['configuration':'medium','project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
+tasks.register("BigQueryStressTestMedium",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform',
'BigQueryIOST', ['configuration':'medium','project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
+tasks.register("BigQueryStressTestLarge",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform',
'BigQueryIOST', ['configuration':'large','project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
tasks.register("BigQueryStorageApiStreamingPerformanceTest",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform',
'BigQueryStreamingLT', ['configuration':'large',
'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("PubSubPerformanceTest",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform',
'PubSubIOLT', ['configuration':'large','project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
tasks.register("WordCountIntegrationTest",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform',
'WordCountIT', ['project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
diff --git a/it/kafka/build.gradle b/it/kafka/build.gradle
index faae88bb9f5..b1b8147e72a 100644
--- a/it/kafka/build.gradle
+++ b/it/kafka/build.gradle
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import org.apache.beam.gradle.IoPerformanceTestUtilities
plugins { id 'org.apache.beam.module' }
applyJavaNature(
@@ -40,5 +41,10 @@ dependencies {
testImplementation project(path: ":sdks:java:io:kafka")
testImplementation project(path: ":sdks:java:io:synthetic")
testImplementation project(path: ":runners:direct-java")
+ testImplementation project(path: ":runners:google-cloud-dataflow-java")
testImplementation project(path: ":sdks:java:testing:test-utils")
+ testImplementation project(path:
":sdks:java:extensions:google-cloud-platform-core", configuration:
"testRuntimeMigration")
}
+
+tasks.register("KafkaStressTestMedium",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST',
['configuration':'medium','bootstrapServers':System.getProperty("bootstrapServers"),'project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
+tasks.register("KafkaStressTestLarge",
IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST',
['configuration':'large','bootstrapServers':System.getProperty("bootstrapServers"),'project':'apache-beam-testing',
'artifactBucket':'io-performance-temp'])
diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
new file mode 100644
index 00000000000..2a830400a0a
--- /dev/null
+++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.cloud.Timestamp;
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.it.common.PipelineOperator;
+import org.apache.beam.it.common.TestProperties;
+import org.apache.beam.it.gcp.IOLoadTestBase;
+import
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * KafkaIO stress tests. The test is designed to assess the performance of
KafkaIO under various
+ * conditions. To run the test, a live remote Kafka broker is required. You
can deploy Kafka within
+ * a Kubernetes cluster following the example described here: {@link
+ * .github/workflows/beam_PerformanceTests_Kafka_IO.yml} If you choose to use
Kubernetes, it's
+ * important to remember that each pod should have a minimum of 10GB memory
allocated. Additionally,
+ * when running the test, it's necessary to pass the addresses of Kafka
bootstrap servers as an
+ * argument.
+ *
+ * <p>Usage: <br>
+ * - To run medium-scale stress tests: {@code gradle
:it:kafka:KafkaStressTestMedium
+ * -DbootstrapServers="0.0.0.0:32400,1.1.1.1:32400"} <br>
+ * - To run large-scale stress tests: {@code gradle
:it:kafka:KafkaStressTestLarge
+ * -DbootstrapServers="0.0.0.0:32400,1.1.1.1:32400"}
+ */
+public final class KafkaIOST extends IOLoadTestBase {
+ /**
+ * The load will initiate at 1x, progressively increase to 2x and 4x, then
decrease to 2x and
+ * eventually return to 1x.
+ */
+ private static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};
+
+ private static InfluxDBSettings influxDBSettings;
+ private static final String WRITE_ELEMENT_METRIC_NAME = "write_count";
+ private static final String READ_ELEMENT_METRIC_NAME = "read_count";
+ private static final int DEFAULT_ROWS_PER_SECOND = 1000;
+ private Configuration configuration;
+ private AdminClient adminClient;
+ private String testConfigName;
+ private String tempLocation;
+ private String kafkaTopic;
+
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+ @Before
+ public void setup() {
+ // parse configuration
+ testConfigName =
+ TestProperties.getProperty("configuration", "medium",
TestProperties.Type.PROPERTY);
+ configuration = TEST_CONFIGS_PRESET.get(testConfigName);
+ if (configuration == null) {
+ try {
+ configuration = Configuration.fromJsonString(testConfigName,
Configuration.class);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown test configuration: [%s]. Pass to a valid
configuration json, or use"
+ + " config presets: %s",
+ testConfigName, TEST_CONFIGS_PRESET.keySet()));
+ }
+ }
+ configuration.bootstrapServers =
+ TestProperties.getProperty("bootstrapServers", null,
TestProperties.Type.PROPERTY);
+
+ adminClient =
+ AdminClient.create(ImmutableMap.of("bootstrap.servers",
configuration.bootstrapServers));
+ kafkaTopic =
+ "io-kafka-"
+ + DateTimeFormatter.ofPattern("MMddHHmmssSSS")
+ .withZone(ZoneId.of("UTC"))
+ .format(java.time.Instant.now())
+ + UUID.randomUUID().toString().substring(0, 10);
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(kafkaTopic, 1, (short) 3)));
+
+ // tempLocation needs to be set for DataflowRunner
+ if (!Strings.isNullOrEmpty(tempBucketName)) {
+ tempLocation = String.format("gs://%s/temp/", tempBucketName);
+
writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation);
+ writePipeline.getOptions().setTempLocation(tempLocation);
+
readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation);
+ readPipeline.getOptions().setTempLocation(tempLocation);
+ }
+ // Use streaming pipeline to write and read records
+ writePipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
+ readPipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
+ }
+
+ private static final Map<String, Configuration> TEST_CONFIGS_PRESET;
+
+ static {
+ try {
+ TEST_CONFIGS_PRESET =
+ ImmutableMap.of(
+ "medium",
+ Configuration.fromJsonString(
+
"{\"rowsPerSecond\":25000,\"minutes\":30,\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\"}",
+ Configuration.class),
+ "large",
+ Configuration.fromJsonString(
+
"{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":300,\"runner\":\"DataflowRunner\"}",
+ Configuration.class));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Run stress test with configurations specified by TestProperties. */
+ @Test
+ public void testWriteAndRead() throws IOException, ParseException,
InterruptedException {
+ if (configuration.exportMetricsToInfluxDB) {
+ influxDBSettings =
+ InfluxDBSettings.builder()
+ .withHost(configuration.influxHost)
+ .withDatabase(configuration.influxDatabase)
+ .withMeasurement(configuration.influxMeasurement + "_" +
testConfigName)
+ .get();
+ }
+
+ PipelineLauncher.LaunchInfo writeInfo = generateDataAndWrite();
+ PipelineLauncher.LaunchInfo readInfo = readData();
+
+ try {
+ PipelineOperator.Result readResult =
+ pipelineOperator.waitUntilDone(
+ createConfig(readInfo,
Duration.ofMinutes(configuration.pipelineTimeout)));
+ assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, readResult);
+ // streaming read pipeline does not end itself
+ assertEquals(
+ PipelineLauncher.JobState.RUNNING,
+ pipelineLauncher.getJobStatus(project, region, readInfo.jobId()));
+
+ // Delete topic after test run
+ adminClient.deleteTopics(Collections.singleton(kafkaTopic));
+
+ double writeNumRecords =
+ pipelineLauncher.getMetric(
+ project,
+ region,
+ writeInfo.jobId(),
+ getBeamMetricsName(PipelineMetricsType.COUNTER,
WRITE_ELEMENT_METRIC_NAME));
+ double readNumRecords =
+ pipelineLauncher.getMetric(
+ project,
+ region,
+ readInfo.jobId(),
+ getBeamMetricsName(PipelineMetricsType.COUNTER,
READ_ELEMENT_METRIC_NAME));
+ assertEquals(writeNumRecords, readNumRecords, 0);
+ } finally {
+ // clean up pipelines
+ if (pipelineLauncher.getJobStatus(project, region, writeInfo.jobId())
+ == PipelineLauncher.JobState.RUNNING) {
+ pipelineLauncher.cancelJob(project, region, writeInfo.jobId());
+ }
+ if (pipelineLauncher.getJobStatus(project, region, readInfo.jobId())
+ == PipelineLauncher.JobState.RUNNING) {
+ pipelineLauncher.cancelJob(project, region, readInfo.jobId());
+ }
+ }
+
+ // export metrics
+ MetricsConfiguration writeMetricsConfig =
+ MetricsConfiguration.builder()
+ .setInputPCollection("Reshuffle fanout/Values/Values/Map.out0")
+ .setInputPCollectionV2("Reshuffle
fanout/Values/Values/Map/ParMultiDo(Anonymous).out0")
+ .setOutputPCollection("Counting element.out0")
+ .setOutputPCollectionV2("Counting
element/ParMultiDo(Counting).out0")
+ .build();
+
+ MetricsConfiguration readMetricsConfig =
+ MetricsConfiguration.builder()
+ .setOutputPCollection("Counting element.out0")
+ .setOutputPCollectionV2("Counting
element/ParMultiDo(Counting).out0")
+ .build();
+
+ exportMetrics(writeInfo, writeMetricsConfig);
+ exportMetrics(readInfo, readMetricsConfig);
+ }
+
+ /**
+ * The method creates a pipeline to simulate data generation and write
operations to Kafka topic,
+ * based on the specified configuration parameters. The stress test involves
varying the load
+ * dynamically over time, with options to use configurable parameters.
+ */
+ private PipelineLauncher.LaunchInfo generateDataAndWrite() throws
IOException {
+ // The PeriodicImpulse source will generate an element every this many
millis:
+ int fireInterval = 1;
+ // Each element from PeriodicImpulse will fan out to this many elements:
+ int startMultiplier =
+ Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) /
DEFAULT_ROWS_PER_SECOND;
+ long stopAfterMillis =
+
org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis();
+ long totalRows = startMultiplier * stopAfterMillis / fireInterval;
+ List<LoadPeriod> loadPeriods =
+ getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);
+
+ PCollection<byte[]> source =
+ writePipeline
+ .apply(
+ PeriodicImpulse.create()
+ .stopAfter(org.joda.time.Duration.millis(stopAfterMillis -
1))
+ .withInterval(org.joda.time.Duration.millis(fireInterval)))
+ .apply(
+ "Extract values",
+ MapElements.into(TypeDescriptor.of(byte[].class))
+ .via(instant -> Longs.toByteArray(instant.getMillis() %
totalRows)));
+ if (startMultiplier > 1) {
+ source =
+ source
+ .apply(
+ "One input to multiple outputs",
+ ParDo.of(new MultiplierDoFn(startMultiplier, loadPeriods)))
+ .apply("Reshuffle fanout", Reshuffle.viaRandomKey())
+ .apply("Counting element", ParDo.of(new
CountingFn<>(WRITE_ELEMENT_METRIC_NAME)));
+ }
+ source.apply(
+ "Write to Kafka",
+ KafkaIO.<byte[], byte[]>write()
+ .withBootstrapServers(configuration.bootstrapServers)
+ .withTopic(kafkaTopic)
+ .withValueSerializer(ByteArraySerializer.class)
+ .withProducerConfigUpdates(
+ ImmutableMap.of(
+ ProducerConfig.RETRIES_CONFIG, 10,
+ ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 600000,
+ ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 5000))
+ .values());
+
+ PipelineLauncher.LaunchConfig options =
+ PipelineLauncher.LaunchConfig.builder("write-kafka")
+ .setSdk(PipelineLauncher.Sdk.JAVA)
+ .setPipeline(writePipeline)
+ .addParameter("runner", configuration.runner)
+ .addParameter(
+ "autoscalingAlgorithm",
+
DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED
+ .toString())
+ .addParameter("numWorkers",
String.valueOf(configuration.numWorkers))
+ .addParameter("maxNumWorkers",
String.valueOf(configuration.maxNumWorkers))
+ .addParameter("experiments", "use_runner_v2")
+ .build();
+
+ return pipelineLauncher.launch(project, region, options);
+ }
+
+ /** The method reads data from Kafka topic in streaming mode. */
+ private PipelineLauncher.LaunchInfo readData() throws IOException {
+ KafkaIO.Read<byte[], byte[]> readFromKafka =
+ KafkaIO.readBytes()
+ .withBootstrapServers(configuration.bootstrapServers)
+ .withTopic(kafkaTopic)
+ .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset",
"earliest"));
+
+ readPipeline
+ .apply("Read from unbounded Kafka", readFromKafka)
+ .apply("Counting element", ParDo.of(new
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
+
+ PipelineLauncher.LaunchConfig options =
+ PipelineLauncher.LaunchConfig.builder("read-kafka")
+ .setSdk(PipelineLauncher.Sdk.JAVA)
+ .setPipeline(readPipeline)
+ .addParameter("numWorkers",
String.valueOf(configuration.numWorkers))
+ .addParameter("runner", configuration.runner)
+ .addParameter("experiments", "use_runner_v2")
+ .build();
+
+ return pipelineLauncher.launch(project, region, options);
+ }
+
+ private void exportMetrics(
+ PipelineLauncher.LaunchInfo launchInfo, MetricsConfiguration
metricsConfig)
+ throws IOException, ParseException, InterruptedException {
+
+ Map<String, Double> metrics = getMetrics(launchInfo, metricsConfig);
+ String testId = UUID.randomUUID().toString();
+ String testTimestamp = Timestamp.now().toString();
+
+ if (configuration.exportMetricsToInfluxDB) {
+ Collection<NamedTestResult> namedTestResults = new ArrayList<>();
+ for (Map.Entry<String, Double> entry : metrics.entrySet()) {
+ NamedTestResult metricResult =
+ NamedTestResult.create(testId, testTimestamp, entry.getKey(),
entry.getValue());
+ namedTestResults.add(metricResult);
+ }
+ IOITMetrics.publishToInflux(testId, testTimestamp, namedTestResults,
influxDBSettings);
+ } else {
+ exportMetricsToBigQuery(launchInfo, metrics);
+ }
+ }
+
+ /**
+ * Custom Apache Beam DoFn designed for use in stress testing scenarios. It
introduces a dynamic
+ * load increase over time, multiplying the input elements based on the
elapsed time since the
+ * start of processing. This class aims to simulate various load levels
during stress testing.
+ */
+ private static class MultiplierDoFn extends DoFn<byte[], byte[]> {
+ private final int startMultiplier;
+ private final long startTimesMillis;
+ private final List<LoadPeriod> loadPeriods;
+
+ MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
+ this.startMultiplier = startMultiplier;
+ this.startTimesMillis = Instant.now().getMillis();
+ this.loadPeriods = loadPeriods;
+ }
+
+ @DoFn.ProcessElement
+ public void processElement(
+ @Element byte[] element,
+ OutputReceiver<byte[]> outputReceiver,
+ @DoFn.Timestamp Instant timestamp) {
+
+ int multiplier = this.startMultiplier;
+ long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;
+
+ for (LoadPeriod loadPeriod : loadPeriods) {
+ if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
+ && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
+ multiplier *= loadPeriod.getLoadIncreaseMultiplier();
+ break;
+ }
+ }
+ for (int i = 0; i < multiplier; i++) {
+ outputReceiver.output(element);
+ }
+ }
+ }
+
+ /**
+ * Generates and returns a list of LoadPeriod instances representing periods
of load increase
+ * based on the specified load increase array and total duration in minutes.
+ *
+ * @param minutesTotal The total duration in minutes for which the load
periods are generated.
+ * @return A list of LoadPeriod instances defining periods of load increase.
+ */
+ private List<LoadPeriod> getLoadPeriods(int minutesTotal, int[]
loadIncreaseArray) {
+
+ List<LoadPeriod> loadPeriods = new ArrayList<>();
+ long periodDurationMillis =
+ Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
+ long startTimeMillis = 0;
+
+ for (int loadIncreaseMultiplier : loadIncreaseArray) {
+ long endTimeMillis = startTimeMillis + periodDurationMillis;
+ loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis,
endTimeMillis));
+
+ startTimeMillis = endTimeMillis;
+ }
+ return loadPeriods;
+ }
+
+ /** Options for Kafka IO stress test. */
+ static class Configuration extends SyntheticSourceOptions {
+ /** Pipeline timeout in minutes. Must be a positive value. */
+ @JsonProperty public int pipelineTimeout = 20;
+
+ /** Runner specified to run the pipeline. */
+ @JsonProperty public String runner = "DirectRunner";
+
+ /** Number of workers for the pipeline. */
+ @JsonProperty public int numWorkers = 20;
+
+ /** Maximum number of workers for the pipeline. */
+ @JsonProperty public int maxNumWorkers = 100;
+
+ /**
+ * Rate of generated elements sent to the source table. Will run with a
minimum of 1k rows per
+ * second.
+ */
+ @JsonProperty public int rowsPerSecond = DEFAULT_ROWS_PER_SECOND;
+
+ /** Rows will be generated for this many minutes. */
+ @JsonProperty public int minutes = 15;
+
+ /** Kafka bootstrap server addresses. */
+ @JsonProperty public String bootstrapServers;
+
+ /**
+ * Determines the destination for exporting metrics. If set to true,
metrics will be exported to
+ * InfluxDB and displayed using Grafana. If set to false, metrics will be
exported to BigQuery
+ * and displayed with Looker Studio.
+ */
+ @JsonProperty public boolean exportMetricsToInfluxDB = false;
+
+ /** InfluxDB measurement to publish results to. * */
+ @JsonProperty public String influxMeasurement = KafkaIOST.class.getName();
+
+ /** InfluxDB host to publish metrics. * */
+ @JsonProperty public String influxHost;
+
+ /** InfluxDB database to publish metrics. * */
+ @JsonProperty public String influxDatabase;
+ }
+
+ /**
+ * Represents a period of time with associated load increase properties for
stress testing
+ * scenarios.
+ */
+ private static class LoadPeriod implements Serializable {
+ private final int loadIncreaseMultiplier;
+ private final long periodStartMillis;
+ private final long periodEndMillis;
+
+ public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long
periodEndMin) {
+ this.loadIncreaseMultiplier = loadIncreaseMultiplier;
+ this.periodStartMillis = periodStartMillis;
+ this.periodEndMillis = periodEndMin;
+ }
+
+ public int getLoadIncreaseMultiplier() {
+ return loadIncreaseMultiplier;
+ }
+
+ public long getPeriodStartMillis() {
+ return periodStartMillis;
+ }
+
+ public long getPeriodEndMillis() {
+ return periodEndMillis;
+ }
+ }
+}