This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new be0068065d6 [MINOR] If sync mode is glue, fix the sync tool class
(#11543)
be0068065d6 is described below
commit be0068065d6727e6354e601846a4cf4d5e6d4f53
Author: Prabodh Agarwal <[email protected]>
AuthorDate: Thu Jul 4 13:35:46 2024 +0530
[MINOR] If sync mode is glue, fix the sync tool class (#11543)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 64 ++++++++++++----------
1 file changed, 34 insertions(+), 30 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 50722abfbab..97a1cc29cc3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -17,6 +17,12 @@
package org.apache.hudi
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.shims.ShimLoader
import
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType,
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
@@ -47,6 +53,7 @@ import
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException,
HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
@@ -60,13 +67,6 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
import org.apache.hudi.util.SparkKeyGenUtils
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -78,7 +78,6 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.slf4j.LoggerFactory
import java.util.function.BiConsumer
-
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
@@ -142,8 +141,8 @@ object HoodieSparkSqlWriter {
def getBulkInsertRowConfig(writerSchema:
org.apache.hudi.common.util.Option[Schema], hoodieConfig: HoodieConfig,
basePath: String, tblName: String):
HoodieWriteConfig = {
- var writerSchemaStr : String = null
- if ( writerSchema.isPresent) {
+ var writerSchemaStr: String = null
+ if (writerSchema.isPresent) {
writerSchemaStr = writerSchema.get().toString
}
// Make opts mutable since it could be modified by
tryOverrideParquetWriteLegacyFormatProperty
@@ -232,7 +231,7 @@ class HoodieSparkSqlWriterInternal {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
- var tableConfig = getHoodieTableConfig(sparkContext, path, mode,
streamingWritesParamsOpt.map(
_.hoodieTableConfigOpt).orElse(Option.apply(Option.empty)).get)
+ var tableConfig = getHoodieTableConfig(sparkContext, path, mode,
streamingWritesParamsOpt.map(_.hoodieTableConfigOpt).orElse(Option.apply(Option.empty)).get)
// get params w/o injecting default and validate
val paramsWithoutDefaults =
HoodieWriterUtils.getParamsWithAlternatives(optParams)
val originKeyGeneratorClassName =
HoodieWriterUtils.getOriginKeyGenerator(paramsWithoutDefaults)
@@ -388,7 +387,7 @@ class HoodieSparkSqlWriterInternal {
// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt =
HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
- null, path, tblName,
+ null, path, tblName,
(addSchemaEvolutionParameters(parameters, internalSchemaOpt) -
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
.asInstanceOf[SparkRDDWriteClient[_]]
@@ -427,7 +426,7 @@ class HoodieSparkSqlWriterInternal {
// Issue the delete.
val schemaStr = new
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
- schemaStr, path, tblName,
+ schemaStr, path, tblName,
(parameters -
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
.asInstanceOf[SparkRDDWriteClient[_]]
// Issue delete partitions
@@ -538,7 +537,7 @@ class HoodieSparkSqlWriterInternal {
}
}
- private def handleWriteClientClosure(writeClient: SparkRDDWriteClient[_],
tableConfig : HoodieTableConfig, parameters: Map[String, String],
configuration: Configuration): Unit = {
+ private def handleWriteClientClosure(writeClient: SparkRDDWriteClient[_],
tableConfig: HoodieTableConfig, parameters: Map[String, String], configuration:
Configuration): Unit = {
// close the write client in all cases
val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient,
tableConfig, parameters, configuration)
val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient,
parameters)
@@ -548,7 +547,7 @@ class HoodieSparkSqlWriterInternal {
}
}
- def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String], df: Dataset[Row]): WriteOperationType = {
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults:
Map[String, String], df: Dataset[Row]): WriteOperationType = {
var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
// TODO clean up
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
@@ -577,11 +576,11 @@ class HoodieSparkSqlWriterInternal {
/**
* Resolve wildcards in partitions
*
- * @param partitions list of partitions that may contain wildcards
- * @param jsc instance of java spark context
- * @param storage [[HoodieStorage]] instance
- * @param cfg hoodie config
- * @param basePath base path
+ * @param partitions list of partitions that may contain wildcards
+ * @param jsc instance of java spark context
+ * @param storage [[HoodieStorage]] instance
+ * @param cfg hoodie config
+ * @param basePath base path
* @return Pair of(boolean, table schema), where first entry will be true
only if schema conversion is required.
*/
private def resolvePartitionWildcards(partitions: List[String], jsc:
JavaSparkContext,
@@ -635,7 +634,7 @@ class HoodieSparkSqlWriterInternal {
val schemaValidateEnable = if (schemaEvolutionEnable.toBoolean &&
parameters.getOrElse(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
"false").toBoolean) {
// force disable schema validate, now we support schema evolution, no
need to do validate
"false"
- } else {
+ } else {
parameters.getOrElse(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(),
"true")
}
// correct internalSchema, internalSchema should contain hoodie metadata
columns.
@@ -649,7 +648,7 @@ class HoodieSparkSqlWriterInternal {
}
parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() ->
SerDeHelper.toJson(correctInternalSchema.getOrElse(null)),
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() ->
schemaEvolutionEnable,
- HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key() ->
schemaValidateEnable)
+ HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key() ->
schemaValidateEnable)
}
private def registerAvroSchemasWithKryo(sparkContext: SparkContext,
targetAvroSchemas: Schema*): Unit = {
@@ -834,7 +833,7 @@ class HoodieSparkSqlWriterInternal {
case _ =>
try {
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
- TableInstantInfo(basePath, instantTime,
executor.getCommitActionType, executor.getWriteOperationType), Option.empty)
+ TableInstantInfo(basePath, instantTime,
executor.getCommitActionType, executor.getWriteOperationType), Option.empty)
}
}
@@ -884,12 +883,16 @@ class HoodieSparkSqlWriterInternal {
schema: StructType): Boolean = {
val hiveSyncEnabled =
hoodieConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_ENABLED).toBoolean
var metaSyncEnabled =
hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_ENABLED).toBoolean
- var syncClientToolClassSet = scala.collection.mutable.Set[String]()
+ val syncClientToolClassSet = scala.collection.mutable.Set[String]()
hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS_NAME).split(",").foreach(syncClass
=> syncClientToolClassSet += syncClass)
// for backward compatibility
if (hiveSyncEnabled) metaSyncEnabled = true
+ // for AWS glue compatibility
+ if (hoodieConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE) ==
HiveSyncMode.GLUE.name()) {
+ syncClientToolClassSet +=
"org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool"
+ }
if (metaSyncEnabled) {
val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
@@ -899,7 +902,7 @@ class HoodieSparkSqlWriterInternal {
properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key,
SPARK_VERSION)
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key,
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
if ((fs.getConf.get(HiveConf.ConfVars.METASTOREPWD.varname) == null ||
fs.getConf.get(HiveConf.ConfVars.METASTOREPWD.varname).isEmpty) &&
- (properties.get(HiveSyncConfigHolder.HIVE_PASS.key()) == null ||
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.isEmpty ||
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.equalsIgnoreCase(HiveSyncConfigHolder.HIVE_PASS.defaultValue()))){
+ (properties.get(HiveSyncConfigHolder.HIVE_PASS.key()) == null ||
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.isEmpty ||
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.equalsIgnoreCase(HiveSyncConfigHolder.HIVE_PASS.defaultValue())))
{
try {
val passwd =
ShimLoader.getHadoopShims.getPassword(spark.sparkContext.hadoopConfiguration,
HiveConf.ConfVars.METASTOREPWD.varname)
if (passwd != null && !passwd.isEmpty) {
@@ -912,7 +915,7 @@ class HoodieSparkSqlWriterInternal {
}
}
// Collect exceptions in list because we want all sync to run. Then we
can throw
- val failedMetaSyncs = new mutable.HashMap[String,HoodieException]()
+ val failedMetaSyncs = new mutable.HashMap[String, HoodieException]()
syncClientToolClassSet.foreach(impl => {
try {
SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf,
fs, basePath.toString, baseFileFormat)
@@ -1052,9 +1055,10 @@ class HoodieSparkSqlWriterInternal {
/**
* Fetch table config for an already existing table and if save mode is not
Overwrite.
- * @param sparkContext instance of {@link SparkContext} to use.
- * @param tablePath table base path.
- * @param mode save mode in use.
+ *
+ * @param sparkContext instance of {@link SparkContext} to use.
+ * @param tablePath table base path.
+ * @param mode save mode in use.
* @param hoodieTableConfigOpt return table config from this Option if
present. else poll from a new metaClient.
* @return {@link HoodieTableConfig} is conditions match. if not, returns
null.
*/
@@ -1086,7 +1090,7 @@ class HoodieSparkSqlWriterInternal {
// over-ride only if not explicitly set by the user.
tableConfig.getProps.asScala.filter(kv => !optParams.contains(kv._1))
.foreach { case (key, value) =>
- translatedOptsWithMappedTableConfig += (key -> value)
+ translatedOptsWithMappedTableConfig += (key -> value)
}
}
val mergedParams = mutable.Map.empty ++
HoodieWriterUtils.parametersWithWriteDefaults(translatedOptsWithMappedTableConfig.toMap)