This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0bda958007fdd7a2b63de9907526de64301a2062
Author: Tim Brown <[email protected]>
AuthorDate: Fri Apr 26 12:33:43 2024 -0700

    [MINOR] Streamer test setup performance (#10806)
---
 .../apache/hudi/common/testutils/RawTripTestPayload.java  |  7 +++----
 .../org/apache/hudi/common/testutils/SchemaTestUtil.java  |  4 ++--
 .../deltastreamer/HoodieDeltaStreamerTestBase.java        | 10 ++++------
 .../utilities/deltastreamer/TestHoodieDeltaStreamer.java  | 10 ++++++++--
 .../hudi/utilities/testutils/UtilitiesTestBase.java       | 15 ++++++++++++++-
 5 files changed, 31 insertions(+), 15 deletions(-)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
index de262ce0d64..3ec4901823a 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
@@ -63,6 +63,7 @@ public class RawTripTestPayload implements 
HoodieRecordPayload<RawTripTestPayloa
   public static final String JSON_DATA_SCHEMA_STR = 
"{\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"default\":null},"
       + 
"{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"}]}";
   public static final Schema JSON_DATA_SCHEMA = new 
Schema.Parser().parse(JSON_DATA_SCHEMA_STR);
+  private static final MercifulJsonConverter JSON_CONVERTER = new 
MercifulJsonConverter();
 
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private String partitionPath;
@@ -206,8 +207,7 @@ public class RawTripTestPayload implements 
HoodieRecordPayload<RawTripTestPayloa
     if (isDeleted) {
       return Option.empty();
     } else {
-      MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
-      return Option.of(jsonConverter.convert(getJsonData(), schema));
+      return Option.of(JSON_CONVERTER.convert(getJsonData(), schema));
     }
   }
 
@@ -217,8 +217,7 @@ public class RawTripTestPayload implements 
HoodieRecordPayload<RawTripTestPayloa
   }
 
   public IndexedRecord getRecordToInsert(Schema schema) throws IOException {
-    MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
-    return jsonConverter.convert(getJsonData(), schema);
+    return JSON_CONVERTER.convert(getJsonData(), schema);
   }
 
   @Override
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index adc8b6b9d95..37915c826c1 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -68,6 +68,7 @@ import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.genPseudo
 public final class SchemaTestUtil {
 
   private static final String RESOURCE_SAMPLE_DATA = "/sample.data";
+  private static final MercifulJsonConverter CONVERTER = new 
MercifulJsonConverter();
 
   private final Random random = new Random(0xDEED);
 
@@ -268,8 +269,7 @@ public final class SchemaTestUtil {
   public static GenericRecord generateAvroRecordFromJson(Schema schema, int 
recordNumber, String instantTime,
       String fileId, boolean populateMetaFields) throws IOException {
     SampleTestRecord record = new SampleTestRecord(instantTime, recordNumber, 
fileId, populateMetaFields);
-    MercifulJsonConverter converter = new MercifulJsonConverter();
-    return converter.convert(record.toJsonString(), schema);
+    return CONVERTER.convert(record.toJsonString(), schema);
   }
 
   public static Schema getSchemaFromResource(Class<?> clazz, String name, 
boolean withHoodieMetadata) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 81b5be2ed9e..0f2f1e65510 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -145,8 +145,6 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     testUtils.setup();
     topicName = "topic" + testNum;
     prepareInitialConfigs(storage, basePath, testUtils.brokerAddress());
-    prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
-    prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
   }
 
   @AfterEach
@@ -164,9 +162,9 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
   public static void initClass() throws Exception {
     UtilitiesTestBase.initTestServices(false, true, false);
     // basePath is defined in UtilitiesTestBase.initTestServices
-    PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
-    ORC_SOURCE_ROOT = basePath + "/orcFiles";
-    JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
+    PARQUET_SOURCE_ROOT = basePath + "parquetFiles";
+    ORC_SOURCE_ROOT = basePath + "orcFiles";
+    JSON_KAFKA_SOURCE_ROOT = basePath + "jsonKafkaFiles";
   }
 
   @AfterAll
@@ -686,7 +684,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
             Thread.sleep(2000);
             ret = condition.apply(true);
           } catch (Throwable error) {
-            LOG.warn("Got error :", error);
+            LOG.debug("Got error waiting for condition", error);
             ret = false;
           }
         }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 23fd8bd9e78..f4dc792f2a6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -144,6 +144,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -735,7 +736,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, 
HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition, String 
jobId) throws Exception {
-    Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future dsFuture = executor.submit(() -> {
       try {
         ds.sync();
       } catch (Exception ex) {
@@ -750,6 +752,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       ds.shutdownGracefully();
       dsFuture.get();
     }
+    executor.shutdown();
   }
 
   static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws 
InterruptedException {
@@ -1440,7 +1443,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : 
"");
 
     // generate data asynchronously.
-    Future inputGenerationFuture = 
Executors.newSingleThreadExecutor().submit(() -> {
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future inputGenerationFuture = executor.submit(() -> {
       try {
         int counter = 2;
         while (counter < 100) { // lets keep going. if the test times out, we 
will cancel the future within finally. So, safe to generate 100 batches.
@@ -1480,6 +1484,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       ds.shutdownGracefully();
       inputGenerationFuture.cancel(true);
       UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+      executor.shutdown();
     }
     testNum++;
   }
@@ -1826,6 +1831,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   private void testORCDFSSource(boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
     // prepare ORCDFSSource
+    prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
     TypedProperties orcProps = new TypedProperties();
 
     // Properties used for testing delta-streamer with orc source
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 5eec800a060..b75dca6b577 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -87,7 +87,9 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import scala.Tuple2;
@@ -164,7 +166,7 @@ public class UtilitiesTestBase {
       zookeeperTestService.start();
     }
 
-    jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + 
"-hoodie", "local[8]");
+    jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + 
"-hoodie", "local[4]", sparkConf());
     context = new HoodieSparkEngineContext(jsc);
     sqlContext = new SQLContext(jsc);
     sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
@@ -267,6 +269,17 @@ public class UtilitiesTestBase {
     TestDataSource.resetDataGen();
   }
 
+  private static Map<String, String> sparkConf() {
+    Map<String, String> conf = new HashMap<>();
+    conf.put("spark.default.parallelism", "2");
+    conf.put("spark.sql.shuffle.partitions", "2");
+    conf.put("spark.executor.memory", "1G");
+    conf.put("spark.driver.memory", "1G");
+    conf.put("spark.hadoop.mapred.output.compress", "true");
+    conf.put("spark.ui.enable", "false");
+    return conf;
+  }
+
   /**
    * Helper to get hive sync config.
    * 

Reply via email to