[
https://issues.apache.org/jira/browse/HUDI-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ethan Guo updated HUDI-5788:
----------------------------
Description:
Currently, there are many APIs of of constructing Hudi Spark configs in a
similar way (search for "spark.kryo.registrator" for example). We should
simplify them into a couple of APIs for reuse, instead of duplicating the logic.
HoodieExampleSparkUtils
{code:java}
private static Map<String, String> defaultConf() {
Map<String, String> additionalConfigs = new HashMap<>();
additionalConfigs.put("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
additionalConfigs.put("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar");
additionalConfigs.put("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
return additionalConfigs;
} {code}
SparkUtil
{code:java}
public static SparkConf getDefaultConf(final String appName, final
Option<String> sparkMaster) {
final Properties properties = System.getProperties();
SparkConf sparkConf = new SparkConf().setAppName(appName);
// Configure the sparkMaster
String sparkMasterNode = DEFAULT_SPARK_MASTER;
if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null) {
sparkMasterNode =
properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
}
if (sparkMaster.isPresent() && !sparkMaster.get().trim().isEmpty()) {
sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
}
sparkConf.setMaster(sparkMasterNode);
// Configure driver
sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER,
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar");
// Configure hadoop conf
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
"true");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE,
"BLOCK");
return sparkConf;
} {code}
HoodieClientTestUtils
{code:java}
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf().setAppName(appName)
.setMaster("local[4]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
.set("spark.sql.shuffle.partitions", "4")
.set("spark.default.parallelism", "4");
// NOTE: This utility is used in modules where this class might not be
present, therefore
// to avoid littering output w/ [[ClassNotFoundException]]s we will
skip adding it
// in case this utility is used in the module not providing it
if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")) {
sparkConf.set("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
} {code}
SparkProvider
{code:java}
default SparkConf conf(Map<String, String> overwritingConfigs) {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.app.name", getClass().getName());
sparkConf.set("spark.master", "local[*]");
sparkConf.set("spark.default.parallelism", "4");
sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar");
overwritingConfigs.forEach(sparkConf::set);
return sparkConf;
} {code}
TestHoodieSparkUtils
{code:java}
@Test
def testCreateRddSchemaEvol(): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate {code}
UtilHelpers
{code:java}
private static SparkConf buildSparkConf(String appName, String defaultMaster,
Map<String, String> additionalConfigs) {
final SparkConf sparkConf = new SparkConf().setAppName(appName);
String master = sparkConf.get("spark.master", defaultMaster);
sparkConf.setMaster(master);
if (master.startsWith("yarn")) {
sparkConf.set("spark.eventLog.overwrite", "true");
sparkConf.set("spark.eventLog.enabled", "true");
}
sparkConf.set("spark.ui.port", "8090");
sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar");
sparkConf.set("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf.set("spark.driver.allowMultipleContexts", "true");
additionalConfigs.forEach(sparkConf::set);
return sparkConf;
} {code}
was:Currently there are many APIs
> Simplify APIs of constructing Hudi Spark configs
> ------------------------------------------------
>
> Key: HUDI-5788
> URL: https://issues.apache.org/jira/browse/HUDI-5788
> Project: Apache Hudi
> Issue Type: Improvement
> Reporter: Ethan Guo
> Priority: Major
> Fix For: 0.14.0
>
>
> Currently, there are many APIs of of constructing Hudi Spark configs in a
> similar way (search for "spark.kryo.registrator" for example). We should
> simplify them into a couple of APIs for reuse, instead of duplicating the
> logic.
> HoodieExampleSparkUtils
> {code:java}
> private static Map<String, String> defaultConf() {
> Map<String, String> additionalConfigs = new HashMap<>();
> additionalConfigs.put("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> additionalConfigs.put("spark.kryo.registrator",
> "org.apache.spark.HoodieSparkKryoRegistrar");
> additionalConfigs.put("spark.sql.extensions",
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
> additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
> return additionalConfigs;
> } {code}
> SparkUtil
> {code:java}
> public static SparkConf getDefaultConf(final String appName, final
> Option<String> sparkMaster) {
> final Properties properties = System.getProperties();
> SparkConf sparkConf = new SparkConf().setAppName(appName);
> // Configure the sparkMaster
> String sparkMasterNode = DEFAULT_SPARK_MASTER;
> if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null) {
> sparkMasterNode =
> properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
> }
> if (sparkMaster.isPresent() && !sparkMaster.get().trim().isEmpty()) {
> sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
> }
> sparkConf.setMaster(sparkMasterNode);
> // Configure driver
> sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
> sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
> sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
> sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER,
> "org.apache.spark.serializer.KryoSerializer");
> sparkConf.set("spark.kryo.registrator",
> "org.apache.spark.HoodieSparkKryoRegistrar");
> // Configure hadoop conf
> sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
> sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
> "true");
> sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
> "org.apache.hadoop.io.compress.GzipCodec");
> sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE,
> "BLOCK");
> return sparkConf;
> } {code}
> HoodieClientTestUtils
> {code:java}
> public static SparkConf getSparkConfForTest(String appName) {
> SparkConf sparkConf = new SparkConf().setAppName(appName)
> .setMaster("local[4]")
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrator",
> "org.apache.spark.HoodieSparkKryoRegistrar")
> .set("spark.sql.shuffle.partitions", "4")
> .set("spark.default.parallelism", "4");
> // NOTE: This utility is used in modules where this class might not be
> present, therefore
> // to avoid littering output w/ [[ClassNotFoundException]]s we will
> skip adding it
> // in case this utility is used in the module not providing it
> if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")) {
> sparkConf.set("spark.sql.extensions",
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
> } {code}
> SparkProvider
> {code:java}
> default SparkConf conf(Map<String, String> overwritingConfigs) {
> SparkConf sparkConf = new SparkConf();
> sparkConf.set("spark.app.name", getClass().getName());
> sparkConf.set("spark.master", "local[*]");
> sparkConf.set("spark.default.parallelism", "4");
> sparkConf.set("spark.sql.shuffle.partitions", "4");
> sparkConf.set("spark.driver.maxResultSize", "2g");
> sparkConf.set("spark.hadoop.mapred.output.compress", "true");
> sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
> sparkConf.set("spark.hadoop.mapred.output.compression.codec",
> "org.apache.hadoop.io.compress.GzipCodec");
> sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> sparkConf.set("spark.kryo.registrator",
> "org.apache.spark.HoodieSparkKryoRegistrar");
> overwritingConfigs.forEach(sparkConf::set);
> return sparkConf;
> } {code}
> TestHoodieSparkUtils
> {code:java}
> @Test
> def testCreateRddSchemaEvol(): Unit = {
> val spark = SparkSession.builder
> .appName("Hoodie Datasource test")
> .master("local[2]")
> .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .config("spark.kryo.registrator",
> "org.apache.spark.HoodieSparkKryoRegistrar")
> .config("spark.sql.extensions",
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
> .getOrCreate {code}
> UtilHelpers
> {code:java}
> private static SparkConf buildSparkConf(String appName, String defaultMaster,
> Map<String, String> additionalConfigs) {
> final SparkConf sparkConf = new SparkConf().setAppName(appName);
> String master = sparkConf.get("spark.master", defaultMaster);
> sparkConf.setMaster(master);
> if (master.startsWith("yarn")) {
> sparkConf.set("spark.eventLog.overwrite", "true");
> sparkConf.set("spark.eventLog.enabled", "true");
> }
> sparkConf.set("spark.ui.port", "8090");
> sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> sparkConf.set("spark.kryo.registrator",
> "org.apache.spark.HoodieSparkKryoRegistrar");
> sparkConf.set("spark.sql.extensions",
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
> sparkConf.set("spark.hadoop.mapred.output.compress", "true");
> sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
> sparkConf.set("spark.hadoop.mapred.output.compression.codec",
> "org.apache.hadoop.io.compress.GzipCodec");
> sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
> sparkConf.set("spark.driver.allowMultipleContexts", "true");
> additionalConfigs.forEach(sparkConf::set);
> return sparkConf;
> } {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)