[ 
https://issues.apache.org/jira/browse/HUDI-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-5788:
----------------------------
    Description: 
Currently, there are many APIs of of constructing Hudi Spark configs in a 
similar way (search for "spark.kryo.registrator" for example).  We should 
simplify them into a couple of APIs for reuse, instead of duplicating the logic.

HoodieExampleSparkUtils
{code:java}
private static Map<String, String> defaultConf() {
  Map<String, String> additionalConfigs = new HashMap<>();
  additionalConfigs.put("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
  additionalConfigs.put("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");
  additionalConfigs.put("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
  additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
  return additionalConfigs;
} {code}
SparkUtil
{code:java}
public static SparkConf getDefaultConf(final String appName, final 
Option<String> sparkMaster) {
  final Properties properties = System.getProperties();
  SparkConf sparkConf = new SparkConf().setAppName(appName);

  // Configure the sparkMaster
  String sparkMasterNode = DEFAULT_SPARK_MASTER;
  if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null) {
    sparkMasterNode = 
properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
  }
  if (sparkMaster.isPresent() && !sparkMaster.get().trim().isEmpty()) {
    sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
  }
  sparkConf.setMaster(sparkMasterNode);

  // Configure driver
  sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
  sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
  sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
  sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, 
"org.apache.spark.serializer.KryoSerializer");
  sparkConf.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");

  // Configure hadoop conf
  sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
  sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, 
"true");
  sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, 
"org.apache.hadoop.io.compress.GzipCodec");
  sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, 
"BLOCK");

  return sparkConf;
} {code}
HoodieClientTestUtils
{code:java}
public static SparkConf getSparkConfForTest(String appName) {
  SparkConf sparkConf = new SparkConf().setAppName(appName)
      .setMaster("local[4]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
      .set("spark.sql.shuffle.partitions", "4")
      .set("spark.default.parallelism", "4");

  // NOTE: This utility is used in modules where this class might not be 
present, therefore
  //       to avoid littering output w/ [[ClassNotFoundException]]s we will 
skip adding it
  //       in case this utility is used in the module not providing it
  if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")) {
    sparkConf.set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
  } {code}
SparkProvider
{code:java}
default SparkConf conf(Map<String, String> overwritingConfigs) {
  SparkConf sparkConf = new SparkConf();
  sparkConf.set("spark.app.name", getClass().getName());
  sparkConf.set("spark.master", "local[*]");
  sparkConf.set("spark.default.parallelism", "4");
  sparkConf.set("spark.sql.shuffle.partitions", "4");
  sparkConf.set("spark.driver.maxResultSize", "2g");
  sparkConf.set("spark.hadoop.mapred.output.compress", "true");
  sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
  sparkConf.set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec");
  sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
  sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
  sparkConf.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");
  overwritingConfigs.forEach(sparkConf::set);
  return sparkConf;
} {code}
TestHoodieSparkUtils
{code:java}
@Test
def testCreateRddSchemaEvol(): Unit = {
  val spark = SparkSession.builder
    .appName("Hoodie Datasource test")
    .master("local[2]")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
    .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .getOrCreate {code}
UtilHelpers
{code:java}
private static SparkConf buildSparkConf(String appName, String defaultMaster, 
Map<String, String> additionalConfigs) {
  final SparkConf sparkConf = new SparkConf().setAppName(appName);
  String master = sparkConf.get("spark.master", defaultMaster);
  sparkConf.setMaster(master);
  if (master.startsWith("yarn")) {
    sparkConf.set("spark.eventLog.overwrite", "true");
    sparkConf.set("spark.eventLog.enabled", "true");
  }
  sparkConf.set("spark.ui.port", "8090");
  sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
  sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
  sparkConf.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");
  sparkConf.set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
  sparkConf.set("spark.hadoop.mapred.output.compress", "true");
  sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
  sparkConf.set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec");
  sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
  sparkConf.set("spark.driver.allowMultipleContexts", "true");

  additionalConfigs.forEach(sparkConf::set);
  return sparkConf;
} {code}
 

 

  was:Currently there are many APIs 


> Simplify APIs of constructing Hudi Spark configs
> ------------------------------------------------
>
>                 Key: HUDI-5788
>                 URL: https://issues.apache.org/jira/browse/HUDI-5788
>             Project: Apache Hudi
>          Issue Type: Improvement
>            Reporter: Ethan Guo
>            Priority: Major
>             Fix For: 0.14.0
>
>
> Currently, there are many APIs of of constructing Hudi Spark configs in a 
> similar way (search for "spark.kryo.registrator" for example).  We should 
> simplify them into a couple of APIs for reuse, instead of duplicating the 
> logic.
> HoodieExampleSparkUtils
> {code:java}
> private static Map<String, String> defaultConf() {
>   Map<String, String> additionalConfigs = new HashMap<>();
>   additionalConfigs.put("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer");
>   additionalConfigs.put("spark.kryo.registrator", 
> "org.apache.spark.HoodieSparkKryoRegistrar");
>   additionalConfigs.put("spark.sql.extensions", 
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
>   additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
>   return additionalConfigs;
> } {code}
> SparkUtil
> {code:java}
> public static SparkConf getDefaultConf(final String appName, final 
> Option<String> sparkMaster) {
>   final Properties properties = System.getProperties();
>   SparkConf sparkConf = new SparkConf().setAppName(appName);
>   // Configure the sparkMaster
>   String sparkMasterNode = DEFAULT_SPARK_MASTER;
>   if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null) {
>     sparkMasterNode = 
> properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
>   }
>   if (sparkMaster.isPresent() && !sparkMaster.get().trim().isEmpty()) {
>     sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
>   }
>   sparkConf.setMaster(sparkMasterNode);
>   // Configure driver
>   sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
>   sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
>   sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
>   sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, 
> "org.apache.spark.serializer.KryoSerializer");
>   sparkConf.set("spark.kryo.registrator", 
> "org.apache.spark.HoodieSparkKryoRegistrar");
>   // Configure hadoop conf
>   sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
>   sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, 
> "true");
>   sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, 
> "org.apache.hadoop.io.compress.GzipCodec");
>   sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, 
> "BLOCK");
>   return sparkConf;
> } {code}
> HoodieClientTestUtils
> {code:java}
> public static SparkConf getSparkConfForTest(String appName) {
>   SparkConf sparkConf = new SparkConf().setAppName(appName)
>       .setMaster("local[4]")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryo.registrator", 
> "org.apache.spark.HoodieSparkKryoRegistrar")
>       .set("spark.sql.shuffle.partitions", "4")
>       .set("spark.default.parallelism", "4");
>   // NOTE: This utility is used in modules where this class might not be 
> present, therefore
>   //       to avoid littering output w/ [[ClassNotFoundException]]s we will 
> skip adding it
>   //       in case this utility is used in the module not providing it
>   if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")) {
>     sparkConf.set("spark.sql.extensions", 
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
>   } {code}
> SparkProvider
> {code:java}
> default SparkConf conf(Map<String, String> overwritingConfigs) {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.set("spark.app.name", getClass().getName());
>   sparkConf.set("spark.master", "local[*]");
>   sparkConf.set("spark.default.parallelism", "4");
>   sparkConf.set("spark.sql.shuffle.partitions", "4");
>   sparkConf.set("spark.driver.maxResultSize", "2g");
>   sparkConf.set("spark.hadoop.mapred.output.compress", "true");
>   sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
>   sparkConf.set("spark.hadoop.mapred.output.compression.codec", 
> "org.apache.hadoop.io.compress.GzipCodec");
>   sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
>   sparkConf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer");
>   sparkConf.set("spark.kryo.registrator", 
> "org.apache.spark.HoodieSparkKryoRegistrar");
>   overwritingConfigs.forEach(sparkConf::set);
>   return sparkConf;
> } {code}
> TestHoodieSparkUtils
> {code:java}
> @Test
> def testCreateRddSchemaEvol(): Unit = {
>   val spark = SparkSession.builder
>     .appName("Hoodie Datasource test")
>     .master("local[2]")
>     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>     .config("spark.kryo.registrator", 
> "org.apache.spark.HoodieSparkKryoRegistrar")
>     .config("spark.sql.extensions", 
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
>     .getOrCreate {code}
> UtilHelpers
> {code:java}
> private static SparkConf buildSparkConf(String appName, String defaultMaster, 
> Map<String, String> additionalConfigs) {
>   final SparkConf sparkConf = new SparkConf().setAppName(appName);
>   String master = sparkConf.get("spark.master", defaultMaster);
>   sparkConf.setMaster(master);
>   if (master.startsWith("yarn")) {
>     sparkConf.set("spark.eventLog.overwrite", "true");
>     sparkConf.set("spark.eventLog.enabled", "true");
>   }
>   sparkConf.set("spark.ui.port", "8090");
>   sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
>   sparkConf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer");
>   sparkConf.set("spark.kryo.registrator", 
> "org.apache.spark.HoodieSparkKryoRegistrar");
>   sparkConf.set("spark.sql.extensions", 
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
>   sparkConf.set("spark.hadoop.mapred.output.compress", "true");
>   sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
>   sparkConf.set("spark.hadoop.mapred.output.compression.codec", 
> "org.apache.hadoop.io.compress.GzipCodec");
>   sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
>   sparkConf.set("spark.driver.allowMultipleContexts", "true");
>   additionalConfigs.forEach(sparkConf::set);
>   return sparkConf;
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to