This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6911abf1755 [MINOR] Streamer test setup performance (#10806)
6911abf1755 is described below
commit 6911abf1755c5752599e30a623cfff56f3e4ffed
Author: Tim Brown <[email protected]>
AuthorDate: Fri Apr 26 12:33:43 2024 -0700
[MINOR] Streamer test setup performance (#10806)
---
.../apache/hudi/common/testutils/RawTripTestPayload.java | 7 +++----
.../org/apache/hudi/common/testutils/SchemaTestUtil.java | 4 ++--
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 10 ++++------
.../utilities/deltastreamer/TestHoodieDeltaStreamer.java | 10 ++++++++--
.../hudi/utilities/testutils/UtilitiesTestBase.java | 15 ++++++++++++++-
5 files changed, 31 insertions(+), 15 deletions(-)
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
index de262ce0d64..3ec4901823a 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
@@ -63,6 +63,7 @@ public class RawTripTestPayload implements
HoodieRecordPayload<RawTripTestPayloa
public static final String JSON_DATA_SCHEMA_STR =
"{\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"number\",\"type\":[\"null\",\"int\"],\"default\":null},"
+
"{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"}]}";
public static final Schema JSON_DATA_SCHEMA = new
Schema.Parser().parse(JSON_DATA_SCHEMA_STR);
+ private static final MercifulJsonConverter JSON_CONVERTER = new
MercifulJsonConverter();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private String partitionPath;
@@ -206,8 +207,7 @@ public class RawTripTestPayload implements
HoodieRecordPayload<RawTripTestPayloa
if (isDeleted) {
return Option.empty();
} else {
- MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
- return Option.of(jsonConverter.convert(getJsonData(), schema));
+ return Option.of(JSON_CONVERTER.convert(getJsonData(), schema));
}
}
@@ -217,8 +217,7 @@ public class RawTripTestPayload implements
HoodieRecordPayload<RawTripTestPayloa
}
public IndexedRecord getRecordToInsert(Schema schema) throws IOException {
- MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
- return jsonConverter.convert(getJsonData(), schema);
+ return JSON_CONVERTER.convert(getJsonData(), schema);
}
@Override
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index 9ee16174973..3e16a18c282 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -67,6 +67,7 @@ import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.genPseudo
public final class SchemaTestUtil {
private static final String RESOURCE_SAMPLE_DATA = "/sample.data";
+ private static final MercifulJsonConverter CONVERTER = new
MercifulJsonConverter();
private final Random random = new Random(0xDEED);
@@ -267,8 +268,7 @@ public final class SchemaTestUtil {
public static GenericRecord generateAvroRecordFromJson(Schema schema, int
recordNumber, String instantTime,
String fileId, boolean populateMetaFields) throws IOException {
SampleTestRecord record = new SampleTestRecord(instantTime, recordNumber,
fileId, populateMetaFields);
- MercifulJsonConverter converter = new MercifulJsonConverter();
- return converter.convert(record.toJsonString(), schema);
+ return CONVERTER.convert(record.toJsonString(), schema);
}
public static Schema getSchemaFromResource(Class<?> clazz, String name,
boolean withHoodieMetadata) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 05df4017305..433549f56b4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -137,8 +137,6 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
testUtils.setup();
topicName = "topic" + testNum;
prepareInitialConfigs(storage, basePath, testUtils.brokerAddress());
- prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
- prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
}
@AfterEach
@@ -156,9 +154,9 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, true, false);
// basePath is defined in UtilitiesTestBase.initTestServices
- PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
- ORC_SOURCE_ROOT = basePath + "/orcFiles";
- JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
+ PARQUET_SOURCE_ROOT = basePath + "parquetFiles";
+ ORC_SOURCE_ROOT = basePath + "orcFiles";
+ JSON_KAFKA_SOURCE_ROOT = basePath + "jsonKafkaFiles";
}
@AfterAll
@@ -677,7 +675,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
Thread.sleep(2000);
ret = condition.apply(true);
} catch (Throwable error) {
- LOG.warn("Got error :", error);
+ LOG.debug("Got error waiting for condition", error);
ret = false;
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index b0a9f14aa21..ff1e5ae2981 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -147,6 +147,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -717,7 +718,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
static void deltaStreamerTestRunner(HoodieDeltaStreamer ds,
HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition, String
jobId) throws Exception {
- Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future dsFuture = executor.submit(() -> {
try {
ds.sync();
} catch (Exception ex) {
@@ -732,6 +734,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
ds.shutdownGracefully();
dsFuture.get();
}
+ executor.shutdown();
}
static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws
InterruptedException {
@@ -1421,7 +1424,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" :
"");
// generate data asynchronously.
- Future inputGenerationFuture =
Executors.newSingleThreadExecutor().submit(() -> {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future inputGenerationFuture = executor.submit(() -> {
try {
int counter = 2;
while (counter < 100) { // lets keep going. if the test times out, we
will cancel the future within finally. So, safe to generate 100 batches.
@@ -1461,6 +1465,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
ds.shutdownGracefully();
inputGenerationFuture.cancel(true);
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+ executor.shutdown();
}
testNum++;
}
@@ -1797,6 +1802,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
private void testORCDFSSource(boolean useSchemaProvider, List<String>
transformerClassNames) throws Exception {
// prepare ORCDFSSource
+ prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
TypedProperties orcProps = new TypedProperties();
// Properties used for testing delta-streamer with orc source
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 175191436ad..e166e418d43 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -87,7 +87,9 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import scala.Tuple2;
@@ -163,7 +165,7 @@ public class UtilitiesTestBase {
zookeeperTestService.start();
}
- jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() +
"-hoodie", "local[8]");
+ jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() +
"-hoodie", "local[4]", sparkConf());
context = new HoodieSparkEngineContext(jsc);
sqlContext = new SQLContext(jsc);
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
@@ -266,6 +268,17 @@ public class UtilitiesTestBase {
TestDataSource.resetDataGen();
}
+ private static Map<String, String> sparkConf() {
+ Map<String, String> conf = new HashMap<>();
+ conf.put("spark.default.parallelism", "2");
+ conf.put("spark.sql.shuffle.partitions", "2");
+ conf.put("spark.executor.memory", "1G");
+ conf.put("spark.driver.memory", "1G");
+ conf.put("spark.hadoop.mapred.output.compress", "true");
+ conf.put("spark.ui.enable", "false");
+ return conf;
+ }
+
/**
* Helper to get hive sync config.
*