hudi-bot opened a new issue, #15789:
URL: https://github.com/apache/hudi/issues/15789
Currently, there are many APIs of of constructing Hudi Spark configs in a
similar way (search for "spark.kryo.registrator" for example). We should
simplify them into a couple of APIs for reuse, instead of duplicating the logic.
HoodieExampleSparkUtils
{code:java}
private static Map<String, String> defaultConf() {
Map<String, String> additionalConfigs = new HashMap<>();
additionalConfigs.put("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
additionalConfigs.put("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar");
additionalConfigs.put("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
return additionalConfigs;
} {code}
SparkUtil
{code:java}
public static SparkConf getDefaultConf(final String appName, final
Option<String> sparkMaster) {
final Properties properties = System.getProperties();
SparkConf sparkConf = new SparkConf().setAppName(appName);
// Configure the sparkMaster
String sparkMasterNode = DEFAULT_SPARK_MASTER;
if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null)
{
sparkMasterNode =
properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
}
if (sparkMaster.isPresent() && !sparkMaster.get().trim().isEmpty()) {
sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
}
sparkConf.setMaster(sparkMasterNode);
// Configure driver
sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER,
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar");
// Configure hadoop conf
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
"true");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE,
"BLOCK");
return sparkConf;
} {code}
HoodieClientTestUtils
{code:java}
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf().setAppName(appName)
.setMaster("local[4]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
.set("spark.sql.shuffle.partitions", "4")
.set("spark.default.parallelism", "4");
// NOTE: This utility is used in modules where this class might not be
present, therefore
// to avoid littering output w/ [[ClassNotFoundException]]s we will
skip adding it
// in case this utility is used in the module not providing it
if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension"))
{
sparkConf.set("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
} {code}
SparkProvider
{code:java}
default SparkConf conf(Map<String, String> overwritingConfigs) {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.app.name", getClass().getName());
sparkConf.set("spark.master", "local[*]");
sparkConf.set("spark.default.parallelism", "4");
sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar");
overwritingConfigs.forEach(sparkConf::set);
return sparkConf;
} {code}
TestHoodieSparkUtils
{code:java}
@Test
def testCreateRddSchemaEvol(): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate {code}
UtilHelpers
{code:java}
private static SparkConf buildSparkConf(String appName, String
defaultMaster, Map<String, String> additionalConfigs) {
final SparkConf sparkConf = new SparkConf().setAppName(appName);
String master = sparkConf.get("spark.master", defaultMaster);
sparkConf.setMaster(master);
if (master.startsWith("yarn")) {
sparkConf.set("spark.eventLog.overwrite", "true");
sparkConf.set("spark.eventLog.enabled", "true");
}
sparkConf.set("spark.ui.port", "8090");
sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar");
sparkConf.set("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf.set("spark.driver.allowMultipleContexts", "true");
additionalConfigs.forEach(sparkConf::set);
return sparkConf;
} {code}
## JIRA info
- Link: https://issues.apache.org/jira/browse/HUDI-5788
- Type: Improvement
- Fix version(s):
- 1.1.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]