This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new be0068065d6 [MINOR] If sync mode is glue, fix the sync tool class 
(#11543)
be0068065d6 is described below

commit be0068065d6727e6354e601846a4cf4d5e6d4f53
Author: Prabodh Agarwal <[email protected]>
AuthorDate: Thu Jul 4 13:35:46 2024 +0530

    [MINOR] If sync mode is glue, fix the sync tool class (#11543)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 64 ++++++++++++----------
 1 file changed, 34 insertions(+), 30 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 50722abfbab..97a1cc29cc3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -17,6 +17,12 @@
 
 package org.apache.hudi
 
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.shims.ShimLoader
 import 
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
 import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, 
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
 import 
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
@@ -47,6 +53,7 @@ import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, 
HoodieRecordCreationException, HoodieWriteConflictException}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
@@ -60,13 +67,6 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.sync.common.util.SyncUtilHelpers
 import 
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
 import org.apache.hudi.util.SparkKeyGenUtils
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.shims.ShimLoader
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -78,7 +78,6 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
 import org.slf4j.LoggerFactory
 
 import java.util.function.BiConsumer
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
@@ -142,8 +141,8 @@ object HoodieSparkSqlWriter {
 
   def getBulkInsertRowConfig(writerSchema: 
org.apache.hudi.common.util.Option[Schema], hoodieConfig: HoodieConfig,
                              basePath: String, tblName: String): 
HoodieWriteConfig = {
-    var writerSchemaStr : String = null
-    if ( writerSchema.isPresent) {
+    var writerSchemaStr: String = null
+    if (writerSchema.isPresent) {
       writerSchemaStr = writerSchema.get().toString
     }
     // Make opts mutable since it could be modified by 
tryOverrideParquetWriteLegacyFormatProperty
@@ -232,7 +231,7 @@ class HoodieSparkSqlWriterInternal {
 
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
-    var tableConfig = getHoodieTableConfig(sparkContext, path, mode, 
streamingWritesParamsOpt.map( 
_.hoodieTableConfigOpt).orElse(Option.apply(Option.empty)).get)
+    var tableConfig = getHoodieTableConfig(sparkContext, path, mode, 
streamingWritesParamsOpt.map(_.hoodieTableConfigOpt).orElse(Option.apply(Option.empty)).get)
     // get params w/o injecting default and validate
     val paramsWithoutDefaults = 
HoodieWriterUtils.getParamsWithAlternatives(optParams)
     val originKeyGeneratorClassName = 
HoodieWriterUtils.getOriginKeyGenerator(paramsWithoutDefaults)
@@ -388,7 +387,7 @@ class HoodieSparkSqlWriterInternal {
             // Create a HoodieWriteClient & issue the delete.
             val internalSchemaOpt = 
HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
             val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-              null, path, tblName,
+                null, path, tblName,
                 (addSchemaEvolutionParameters(parameters, internalSchemaOpt) - 
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
               .asInstanceOf[SparkRDDWriteClient[_]]
 
@@ -427,7 +426,7 @@ class HoodieSparkSqlWriterInternal {
             // Issue the delete.
             val schemaStr = new 
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
             val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-              schemaStr, path, tblName,
+                schemaStr, path, tblName,
                 (parameters - 
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
               .asInstanceOf[SparkRDDWriteClient[_]]
             // Issue delete partitions
@@ -538,7 +537,7 @@ class HoodieSparkSqlWriterInternal {
     }
   }
 
-  private def handleWriteClientClosure(writeClient: SparkRDDWriteClient[_], 
tableConfig : HoodieTableConfig, parameters: Map[String, String], 
configuration: Configuration): Unit =  {
+  private def handleWriteClientClosure(writeClient: SparkRDDWriteClient[_], 
tableConfig: HoodieTableConfig, parameters: Map[String, String], configuration: 
Configuration): Unit = {
     // close the write client in all cases
     val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, 
tableConfig, parameters, configuration)
     val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, 
parameters)
@@ -548,7 +547,7 @@ class HoodieSparkSqlWriterInternal {
     }
   }
 
-  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String], df: Dataset[Row]): WriteOperationType = {
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults: 
Map[String, String], df: Dataset[Row]): WriteOperationType = {
     var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
     // TODO clean up
     // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
@@ -577,11 +576,11 @@ class HoodieSparkSqlWriterInternal {
   /**
    * Resolve wildcards in partitions
    *
-   * @param partitions   list of partitions that may contain wildcards
-   * @param jsc          instance of java spark context
-   * @param storage      [[HoodieStorage]] instance
-   * @param cfg          hoodie config
-   * @param basePath     base path
+   * @param partitions list of partitions that may contain wildcards
+   * @param jsc        instance of java spark context
+   * @param storage    [[HoodieStorage]] instance
+   * @param cfg        hoodie config
+   * @param basePath   base path
    * @return Pair of(boolean, table schema), where first entry will be true 
only if schema conversion is required.
    */
   private def resolvePartitionWildcards(partitions: List[String], jsc: 
JavaSparkContext,
@@ -635,7 +634,7 @@ class HoodieSparkSqlWriterInternal {
     val schemaValidateEnable = if (schemaEvolutionEnable.toBoolean && 
parameters.getOrElse(DataSourceWriteOptions.RECONCILE_SCHEMA.key(), 
"false").toBoolean) {
       // force disable schema validate, now we support schema evolution, no 
need to do validate
       "false"
-    } else  {
+    } else {
       
parameters.getOrElse(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), 
"true")
     }
     // correct internalSchema, internalSchema should contain hoodie metadata 
columns.
@@ -649,7 +648,7 @@ class HoodieSparkSqlWriterInternal {
     }
     parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> 
SerDeHelper.toJson(correctInternalSchema.getOrElse(null)),
       HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> 
schemaEvolutionEnable,
-      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key()  -> 
schemaValidateEnable)
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key() -> 
schemaValidateEnable)
   }
 
   private def registerAvroSchemasWithKryo(sparkContext: SparkContext, 
targetAvroSchemas: Schema*): Unit = {
@@ -834,7 +833,7 @@ class HoodieSparkSqlWriterInternal {
         case _ =>
           try {
             commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, 
writeResult, parameters, writeClient, tableConfig, jsc,
-                TableInstantInfo(basePath, instantTime, 
executor.getCommitActionType, executor.getWriteOperationType), Option.empty)
+              TableInstantInfo(basePath, instantTime, 
executor.getCommitActionType, executor.getWriteOperationType), Option.empty)
 
           }
       }
@@ -884,12 +883,16 @@ class HoodieSparkSqlWriterInternal {
                        schema: StructType): Boolean = {
     val hiveSyncEnabled = 
hoodieConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_ENABLED).toBoolean
     var metaSyncEnabled = 
hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_ENABLED).toBoolean
-    var syncClientToolClassSet = scala.collection.mutable.Set[String]()
+    val syncClientToolClassSet = scala.collection.mutable.Set[String]()
     
hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS_NAME).split(",").foreach(syncClass
 => syncClientToolClassSet += syncClass)
 
     // for backward compatibility
     if (hiveSyncEnabled) metaSyncEnabled = true
 
+    // for AWS glue compatibility
+    if (hoodieConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE) == 
HiveSyncMode.GLUE.name()) {
+      syncClientToolClassSet += 
"org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool"
+    }
 
     if (metaSyncEnabled) {
       val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
@@ -899,7 +902,7 @@ class HoodieSparkSqlWriterInternal {
       properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, 
SPARK_VERSION)
       
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, 
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
       if ((fs.getConf.get(HiveConf.ConfVars.METASTOREPWD.varname) == null || 
fs.getConf.get(HiveConf.ConfVars.METASTOREPWD.varname).isEmpty) &&
-        (properties.get(HiveSyncConfigHolder.HIVE_PASS.key()) == null || 
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.isEmpty || 
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.equalsIgnoreCase(HiveSyncConfigHolder.HIVE_PASS.defaultValue()))){
+        (properties.get(HiveSyncConfigHolder.HIVE_PASS.key()) == null || 
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.isEmpty || 
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.equalsIgnoreCase(HiveSyncConfigHolder.HIVE_PASS.defaultValue())))
 {
         try {
           val passwd = 
ShimLoader.getHadoopShims.getPassword(spark.sparkContext.hadoopConfiguration, 
HiveConf.ConfVars.METASTOREPWD.varname)
           if (passwd != null && !passwd.isEmpty) {
@@ -912,7 +915,7 @@ class HoodieSparkSqlWriterInternal {
         }
       }
       // Collect exceptions in list because we want all sync to run. Then we 
can throw
-      val failedMetaSyncs = new mutable.HashMap[String,HoodieException]()
+      val failedMetaSyncs = new mutable.HashMap[String, HoodieException]()
       syncClientToolClassSet.foreach(impl => {
         try {
           SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, 
fs, basePath.toString, baseFileFormat)
@@ -1052,9 +1055,10 @@ class HoodieSparkSqlWriterInternal {
 
   /**
    * Fetch table config for an already existing table and if save mode is not 
Overwrite.
-   * @param sparkContext instance of {@link SparkContext} to use.
-   * @param tablePath table base path.
-   * @param mode save mode in use.
+   *
+   * @param sparkContext         instance of {@link SparkContext} to use.
+   * @param tablePath            table base path.
+   * @param mode                 save mode in use.
    * @param hoodieTableConfigOpt return table config from this Option if 
present. else poll from a new metaClient.
    * @return {@link HoodieTableConfig} is conditions match. if not, returns 
null.
    */
@@ -1086,7 +1090,7 @@ class HoodieSparkSqlWriterInternal {
       // over-ride only if not explicitly set by the user.
       tableConfig.getProps.asScala.filter(kv => !optParams.contains(kv._1))
         .foreach { case (key, value) =>
-          translatedOptsWithMappedTableConfig +=  (key -> value)
+          translatedOptsWithMappedTableConfig += (key -> value)
         }
     }
     val mergedParams = mutable.Map.empty ++ 
HoodieWriterUtils.parametersWithWriteDefaults(translatedOptsWithMappedTableConfig.toMap)

Reply via email to