This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4534904769b Fix kafka performance test write failing due to server
overwhelmed (#24964)
4534904769b is described below
commit 4534904769b9d3692f1e293cdc7f11433ce2ff44
Author: Yi Hu <[email protected]>
AuthorDate: Fri Jan 13 17:24:06 2023 -0500
Fix kafka performance test write failing due to server overwhelmed (#24964)
* Fix kafka performance test write failing due to server overwhelmed
* Remove ReShuffle before write
* Minor Optimizations on WriteCallBack
* adjust tearDownTopic order in test
---
.../java/org/apache/beam/sdk/io/kafka/KafkaWriter.java | 9 ++++++---
.../test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 15 ++++++---------
2 files changed, 12 insertions(+), 12 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index 7c2d4245ee3..c0c9772959f 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory;
*/
class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, Void> {
+ protected transient @Nullable Callback callback;
+
@Setup
public void setup() {
if (spec.getProducerFactoryFn() != null) {
@@ -49,6 +51,7 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>,
Void> {
} else {
producer = new KafkaProducer<>(producerConfig);
}
+ callback = new SendCallback();
}
// Suppression since errors are tracked in SendCallback(), and checked in
finishBundle()
@@ -82,7 +85,7 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>,
Void> {
record.key(),
record.value(),
record.headers()),
- new SendCallback());
+ callback);
elementsWritten.inc();
}
@@ -158,9 +161,9 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>,
Void> {
sendException = exception;
}
numSendFailures++;
+ // don't log exception stacktrace here, exception will be propagated
up.
+ LOG.warn("send failed : '{}'", exception.getMessage());
}
- // don't log exception stacktrace here, exception will be propagated up.
- LOG.warn("send failed : '{}'", exception.getMessage());
}
}
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 3799fe2966a..8f6dc10a95d 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -75,7 +75,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
@@ -198,7 +197,6 @@ public class KafkaIOIT {
// Use batch pipeline to write records.
writePipeline
.apply("Generate records", Read.from(new
SyntheticBoundedSource(sourceOptions)))
- .apply("Avoid fusion", Reshuffle.viaRandomKey())
.apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE,
WRITE_TIME_METRIC_NAME)))
.apply("Write to Kafka",
writeToKafka().withTopic(options.getKafkaTopic()));
@@ -212,14 +210,16 @@ public class KafkaIOIT {
PipelineResult writeResult = writePipeline.run();
PipelineResult.State writeState = writeResult.waitUntilFinish();
+ // Fail the test if pipeline failed.
+ assertNotEquals(PipelineResult.State.FAILED, writeState);
PipelineResult readResult = readPipeline.run();
PipelineResult.State readState =
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
- cancelIfTimeouted(readResult, readState);
- // Delete the kafka topic after test pipeline run.
+ // call asynchronous deleteTopics first since cancelIfTimeouted is
blocking.
tearDownTopic(options.getKafkaTopic());
+ cancelIfTimeouted(readResult, readState);
long actualRecords = readElementMetric(readResult, NAMESPACE,
READ_ELEMENT_METRIC_NAME);
assertTrue(
@@ -232,8 +232,6 @@ public class KafkaIOIT {
Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
}
- // Fail the test if pipeline failed.
- assertNotEquals(PipelineResult.State.FAILED, writeState);
assertNotEquals(PipelineResult.State.FAILED, readState);
}
@@ -247,7 +245,6 @@ public class KafkaIOIT {
expectedHashcode = getHashForRecordCount(sourceOptions.numRecords,
expectedHashes);
writePipeline
.apply("Generate records", Read.from(new
SyntheticBoundedSource(sourceOptions)))
- .apply("Avoid fusion", Reshuffle.viaRandomKey())
.apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE,
WRITE_TIME_METRIC_NAME)))
.apply("Write to Kafka",
writeToKafka().withTopic(options.getKafkaTopic()));
@@ -270,9 +267,9 @@ public class KafkaIOIT {
PipelineResult.State readState =
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
- cancelIfTimeouted(readResult, readState);
- // Delete the kafka topic after test pipeline run.
+ // call asynchronous deleteTopics first since cancelIfTimeouted is
blocking.
tearDownTopic(options.getKafkaTopic());
+ cancelIfTimeouted(readResult, readState);
// Fail the test if pipeline failed.
assertEquals(PipelineResult.State.DONE, readState);