hudi-bot opened a new issue, #15789:
URL: https://github.com/apache/hudi/issues/15789

   Currently, there are many APIs of of constructing Hudi Spark configs in a 
similar way (search for "spark.kryo.registrator" for example).  We should 
simplify them into a couple of APIs for reuse, instead of duplicating the logic.
   
   HoodieExampleSparkUtils
   {code:java}
   private static Map<String, String> defaultConf() {
     Map<String, String> additionalConfigs = new HashMap<>();
     additionalConfigs.put("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
     additionalConfigs.put("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");
     additionalConfigs.put("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
     additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
     return additionalConfigs;
   } {code}
   SparkUtil
   {code:java}
   public static SparkConf getDefaultConf(final String appName, final 
Option<String> sparkMaster) {
     final Properties properties = System.getProperties();
     SparkConf sparkConf = new SparkConf().setAppName(appName);
   
     // Configure the sparkMaster
     String sparkMasterNode = DEFAULT_SPARK_MASTER;
     if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null) 
{
       sparkMasterNode = 
properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
     }
     if (sparkMaster.isPresent() && !sparkMaster.get().trim().isEmpty()) {
       sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
     }
     sparkConf.setMaster(sparkMasterNode);
   
     // Configure driver
     sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
     sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
     sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
     sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, 
"org.apache.spark.serializer.KryoSerializer");
     sparkConf.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");
   
     // Configure hadoop conf
     sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
     sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, 
"true");
     sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, 
"org.apache.hadoop.io.compress.GzipCodec");
     sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, 
"BLOCK");
   
     return sparkConf;
   } {code}
   HoodieClientTestUtils
   {code:java}
   public static SparkConf getSparkConfForTest(String appName) {
     SparkConf sparkConf = new SparkConf().setAppName(appName)
         .setMaster("local[4]")
         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
         .set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
         .set("spark.sql.shuffle.partitions", "4")
         .set("spark.default.parallelism", "4");
   
     // NOTE: This utility is used in modules where this class might not be 
present, therefore
     //       to avoid littering output w/ [[ClassNotFoundException]]s we will 
skip adding it
     //       in case this utility is used in the module not providing it
     if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")) 
{
       sparkConf.set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
     } {code}
   SparkProvider
   {code:java}
   default SparkConf conf(Map<String, String> overwritingConfigs) {
     SparkConf sparkConf = new SparkConf();
     sparkConf.set("spark.app.name", getClass().getName());
     sparkConf.set("spark.master", "local[*]");
     sparkConf.set("spark.default.parallelism", "4");
     sparkConf.set("spark.sql.shuffle.partitions", "4");
     sparkConf.set("spark.driver.maxResultSize", "2g");
     sparkConf.set("spark.hadoop.mapred.output.compress", "true");
     sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
     sparkConf.set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec");
     sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
     sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
     sparkConf.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");
     overwritingConfigs.forEach(sparkConf::set);
     return sparkConf;
   } {code}
   TestHoodieSparkUtils
   {code:java}
   @Test
   def testCreateRddSchemaEvol(): Unit = {
     val spark = SparkSession.builder
       .appName("Hoodie Datasource test")
       .master("local[2]")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .config("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
       .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
       .getOrCreate {code}
   UtilHelpers
   {code:java}
   private static SparkConf buildSparkConf(String appName, String 
defaultMaster, Map<String, String> additionalConfigs) {
     final SparkConf sparkConf = new SparkConf().setAppName(appName);
     String master = sparkConf.get("spark.master", defaultMaster);
     sparkConf.setMaster(master);
     if (master.startsWith("yarn")) {
       sparkConf.set("spark.eventLog.overwrite", "true");
       sparkConf.set("spark.eventLog.enabled", "true");
     }
     sparkConf.set("spark.ui.port", "8090");
     sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
     sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
     sparkConf.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");
     sparkConf.set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
     sparkConf.set("spark.hadoop.mapred.output.compress", "true");
     sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
     sparkConf.set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec");
     sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
     sparkConf.set("spark.driver.allowMultipleContexts", "true");
   
     additionalConfigs.forEach(sparkConf::set);
     return sparkConf;
   } {code}
    
   
    
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-5788
   - Type: Improvement
   - Fix version(s):
     - 1.1.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to