This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4534904769b Fix kafka performance test write failing due to server 
overwhelmed (#24964)
4534904769b is described below

commit 4534904769b9d3692f1e293cdc7f11433ce2ff44
Author: Yi Hu <[email protected]>
AuthorDate: Fri Jan 13 17:24:06 2023 -0500

    Fix kafka performance test write failing due to server overwhelmed (#24964)
    
    * Fix kafka performance test write failing due to server overwhelmed
    
    * Remove ReShuffle before write
    
    * Minor Optimizations on WriteCallBack
    
    * adjust tearDownTopic order in test
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaWriter.java    |  9 ++++++---
 .../test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 15 ++++++---------
 2 files changed, 12 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index 7c2d4245ee3..c0c9772959f 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory;
  */
 class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, Void> {
 
+  protected transient @Nullable Callback callback;
+
   @Setup
   public void setup() {
     if (spec.getProducerFactoryFn() != null) {
@@ -49,6 +51,7 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, 
Void> {
     } else {
       producer = new KafkaProducer<>(producerConfig);
     }
+    callback = new SendCallback();
   }
 
   // Suppression since errors are tracked in SendCallback(), and checked in 
finishBundle()
@@ -82,7 +85,7 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, 
Void> {
                 record.key(),
                 record.value(),
                 record.headers()),
-            new SendCallback());
+            callback);
 
     elementsWritten.inc();
   }
@@ -158,9 +161,9 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, 
Void> {
           sendException = exception;
         }
         numSendFailures++;
+        // don't log exception stacktrace here, exception will be propagated 
up.
+        LOG.warn("send failed : '{}'", exception.getMessage());
       }
-      // don't log exception stacktrace here, exception will be propagated up.
-      LOG.warn("send failed : '{}'", exception.getMessage());
     }
   }
 }
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 3799fe2966a..8f6dc10a95d 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -75,7 +75,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Values;
@@ -198,7 +197,6 @@ public class KafkaIOIT {
     // Use batch pipeline to write records.
     writePipeline
         .apply("Generate records", Read.from(new 
SyntheticBoundedSource(sourceOptions)))
-        .apply("Avoid fusion", Reshuffle.viaRandomKey())
         .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, 
WRITE_TIME_METRIC_NAME)))
         .apply("Write to Kafka", 
writeToKafka().withTopic(options.getKafkaTopic()));
 
@@ -212,14 +210,16 @@ public class KafkaIOIT {
 
     PipelineResult writeResult = writePipeline.run();
     PipelineResult.State writeState = writeResult.waitUntilFinish();
+    // Fail the test if pipeline failed.
+    assertNotEquals(PipelineResult.State.FAILED, writeState);
 
     PipelineResult readResult = readPipeline.run();
     PipelineResult.State readState =
         
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
 
-    cancelIfTimeouted(readResult, readState);
-    // Delete the kafka topic after test pipeline run.
+    // call asynchronous deleteTopics first since cancelIfTimeouted is 
blocking.
     tearDownTopic(options.getKafkaTopic());
+    cancelIfTimeouted(readResult, readState);
 
     long actualRecords = readElementMetric(readResult, NAMESPACE, 
READ_ELEMENT_METRIC_NAME);
     assertTrue(
@@ -232,8 +232,6 @@ public class KafkaIOIT {
       Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
       IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
     }
-    // Fail the test if pipeline failed.
-    assertNotEquals(PipelineResult.State.FAILED, writeState);
     assertNotEquals(PipelineResult.State.FAILED, readState);
   }
 
@@ -247,7 +245,6 @@ public class KafkaIOIT {
     expectedHashcode = getHashForRecordCount(sourceOptions.numRecords, 
expectedHashes);
     writePipeline
         .apply("Generate records", Read.from(new 
SyntheticBoundedSource(sourceOptions)))
-        .apply("Avoid fusion", Reshuffle.viaRandomKey())
         .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, 
WRITE_TIME_METRIC_NAME)))
         .apply("Write to Kafka", 
writeToKafka().withTopic(options.getKafkaTopic()));
 
@@ -270,9 +267,9 @@ public class KafkaIOIT {
     PipelineResult.State readState =
         
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
 
-    cancelIfTimeouted(readResult, readState);
-    // Delete the kafka topic after test pipeline run.
+    // call asynchronous deleteTopics first since cancelIfTimeouted is 
blocking.
     tearDownTopic(options.getKafkaTopic());
+    cancelIfTimeouted(readResult, readState);
 
     // Fail the test if pipeline failed.
     assertEquals(PipelineResult.State.DONE, readState);

Reply via email to