This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9a2a61c64f2d8cb6dc096c70283bccf40acd7326
Author: Марк Бухнер <[email protected]>
AuthorDate: Tue Apr 23 16:12:35 2024 +0700

    [MINOR] Fix incorrect catch of ClassCastException using 
HoodieSparkKeyGeneratorFactory (#11062)
---
 .../run/strategy/ExecutionStrategyUtil.java        | 17 +-----
 .../MultipleSparkJobExecutionStrategy.java         |  3 +-
 .../strategy/SingleSparkJobExecutionStrategy.java  |  3 +-
 .../apache/hudi/index/SparkHoodieIndexFactory.java | 19 +-----
 .../factory/HoodieSparkKeyGeneratorFactory.java    | 45 +++++++++-----
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    | 12 +---
 .../SparkBootstrapCommitActionExecutor.java        |  9 +--
 .../commit/BaseSparkCommitActionExecutor.java      |  9 +--
 .../SparkFullBootstrapDataProviderBase.java        | 70 ++++++++++------------
 .../apache/hudi/keygen/TestCustomKeyGenerator.java |  2 +-
 .../TestHoodieSparkKeyGeneratorFactory.java        |  7 +--
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |  2 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |  2 +-
 .../TestSparkSqlWithCustomKeyGenerator.scala       |  4 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  4 +-
 15 files changed, 79 insertions(+), 129 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java
index b70eed70090..5fd2cb65d69 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.run.strategy;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -28,13 +27,10 @@ import org.apache.hudi.common.model.RewriteAvroPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 
-import java.io.IOException;
-
 public class ExecutionStrategyUtil {
 
   /**
@@ -49,18 +45,7 @@ public class ExecutionStrategyUtil {
       HoodieWriteConfig writeConfig) {
 
     GenericRecord record = (GenericRecord) indexedRecord;
-    Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
-
-    if (!writeConfig.populateMetaFields()) {
-      try {
-        TypedProperties typedProperties = new 
TypedProperties(writeConfig.getProps());
-        keyGeneratorOpt = Option.of((BaseKeyGenerator)
-            
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
-      } catch (IOException e) {
-        throw new HoodieIOException(
-            "Only BaseKeyGenerators are supported when meta columns are 
disabled ", e);
-      }
-    }
+    Option<BaseKeyGenerator> keyGeneratorOpt = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
 
     String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, 
keyGeneratorOpt);
     String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, 
keyGeneratorOpt);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 9d8c9318dd2..97edc237b40 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -359,8 +359,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
               Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getSchema()));
               HoodieFileReader baseFileReader = 
getBaseOrBootstrapFileReader(hadoopConf, bootstrapBasePath, partitionFields, 
clusteringOp);
 
-              Option<BaseKeyGenerator> keyGeneratorOp =
-                  writeConfig.populateMetaFields() ? Option.empty() : 
Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
+              Option<BaseKeyGenerator> keyGeneratorOp = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
               // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
               //       payload pointing into a shared, mutable (underlying) 
buffer we get a clean copy of
               //       it since these records will be shuffled later.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index fa2af5d5b90..6353646a07d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -148,8 +148,7 @@ public abstract class SingleSparkJobExecutionStrategy<T>
         try {
           HoodieFileReader baseFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
               .getFileReader(writeConfig, getHoodieTable().getHadoopConf(), 
new StoragePath(clusteringOp.getDataFilePath()));
-          Option<BaseKeyGenerator> keyGeneratorOp =
-              writeConfig.populateMetaFields() ? Option.empty() : 
Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
+          Option<BaseKeyGenerator> keyGeneratorOp = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
           // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
           //       payload pointing into a shared, mutable (underlying) buffer 
we get a clean copy of
           //       it since these records will be shuffled later.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
index eebaf0f05ba..661152c2d16 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
@@ -18,11 +18,8 @@
 
 package org.apache.hudi.index;
 
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.bloom.HoodieBloomIndex;
 import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
@@ -33,11 +30,8 @@ import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
 import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
 import org.apache.hudi.index.simple.HoodieSimpleIndex;
-import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 
-import java.io.IOException;
-
 /**
  * A factory to generate Spark {@link HoodieIndex}.
  */
@@ -62,9 +56,9 @@ public final class SparkHoodieIndexFactory {
       case GLOBAL_BLOOM:
         return new HoodieGlobalBloomIndex(config, 
SparkHoodieBloomIndexHelper.getInstance());
       case SIMPLE:
-        return new HoodieSimpleIndex(config, 
getKeyGeneratorForSimpleIndex(config));
+        return new HoodieSimpleIndex(config, 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config));
       case GLOBAL_SIMPLE:
-        return new HoodieGlobalSimpleIndex(config, 
getKeyGeneratorForSimpleIndex(config));
+        return new HoodieGlobalSimpleIndex(config, 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config));
       case BUCKET:
         switch (config.getBucketIndexEngineType()) {
           case SIMPLE:
@@ -108,13 +102,4 @@ public final class SparkHoodieIndexFactory {
         return createIndex(config).isGlobal();
     }
   }
-
-  private static Option<BaseKeyGenerator> 
getKeyGeneratorForSimpleIndex(HoodieWriteConfig config) {
-    try {
-      return config.populateMetaFields() ? Option.empty()
-          : Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(config.getProps())));
-    } catch (IOException e) {
-      throw new HoodieIOException("KeyGenerator instantiation failed ", e);
-    }
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index dcc2eaec9eb..c655bf62543 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -23,13 +23,15 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator;
+import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.ComplexKeyGenerator;
 import org.apache.hudi.keygen.CustomKeyGenerator;
 import org.apache.hudi.keygen.GlobalDeleteKeyGenerator;
-import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
@@ -41,14 +43,13 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY;
 import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
+import static 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY;
 import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType;
 
 /**
@@ -77,26 +78,40 @@ public class HoodieSparkKeyGeneratorFactory {
         "org.apache.hudi.keygen.TimestampBasedKeyGenerator");
   }
 
-  public static KeyGenerator createKeyGenerator(TypedProperties props) throws 
IOException {
+  public static KeyGenerator createKeyGenerator(TypedProperties props) {
     String keyGeneratorClass = getKeyGeneratorClassName(props);
     return createKeyGenerator(keyGeneratorClass, props);
   }
 
-  public static KeyGenerator createKeyGenerator(String keyGeneratorClass, 
TypedProperties props) throws IOException {
+  public static KeyGenerator createKeyGenerator(String keyGeneratorClass, 
TypedProperties props) {
     boolean autoRecordKeyGen = 
KeyGenUtils.isAutoGeneratedRecordKeysEnabled(props)
         //Need to prevent overwriting the keygen for spark sql merge into 
because we need to extract
         //the recordkey from the meta cols if it exists. Sql keygen will use 
pkless keygen if needed.
         && !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
-    try {
-      KeyGenerator keyGenerator = (KeyGenerator) 
ReflectionUtils.loadClass(keyGeneratorClass, props);
-      if (autoRecordKeyGen) {
-        return new AutoRecordGenWrapperKeyGenerator(props, 
(BuiltinKeyGenerator) keyGenerator);
-      } else {
-        // if user comes with their own key generator.
-        return keyGenerator;
+    KeyGenerator keyGenerator = (KeyGenerator) 
ReflectionUtils.loadClass(keyGeneratorClass, props);
+    if (autoRecordKeyGen) {
+      return new AutoRecordGenWrapperKeyGenerator(props, (BuiltinKeyGenerator) 
keyGenerator);
+    } else {
+      // if user comes with their own key generator.
+      return keyGenerator;
+    }
+  }
+
+  /**
+   * Creates BaseKeyGenerator if meta columns are disabled.
+   *
+   * @throws HoodieException if unable instantiate or cast class to {@link 
BaseKeyGenerator}.
+   */
+  public static Option<BaseKeyGenerator> 
createBaseKeyGenerator(HoodieWriteConfig writeConfig) {
+    if (!writeConfig.populateMetaFields()) {
+      try {
+        TypedProperties typedProperties = new 
TypedProperties(writeConfig.getProps());
+        return Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
+      } catch (ClassCastException cce) {
+        throw new HoodieException("Only BaseKeyGenerators are supported when 
meta columns are disabled ", cce);
       }
-    } catch (Throwable e) {
-      throw new IOException("Could not load key generator class " + 
keyGeneratorClass, e);
+    } else {
+      return Option.empty();
     }
   }
 
@@ -140,8 +155,6 @@ public class HoodieSparkKeyGeneratorFactory {
         return Option.of((BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
       } catch (ClassCastException cce) {
         throw new HoodieIOException("Only those key generators implementing 
BuiltInKeyGenerator interface is supported with virtual keys");
-      } catch (IOException e) {
-        throw new HoodieIOException("Key generator instantiation failed ", e);
       }
     }
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index eeadd40d99e..441ac9eb1ec 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -31,7 +31,6 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -43,7 +42,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.io.HoodieCreateHandle;
@@ -240,15 +238,7 @@ public class HoodieSparkCopyOnWriteTable<T>
 
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String 
partitionPath, String fileId,
       Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile 
dataFileToBeMerged) {
-    Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
-    if (!config.populateMetaFields()) {
-      try {
-        keyGeneratorOpt = Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(config.getProps())));
-      } catch (IOException e) {
-        throw new HoodieIOException("Only BaseKeyGenerator (or any key 
generator that extends from BaseKeyGenerator) are supported when meta "
-            + "columns are disabled. Please choose the right key generator if 
you wish to disable meta fields.", e);
-      }
-    }
+    Option<BaseKeyGenerator> keyGeneratorOpt = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
     return HoodieMergeHandleFactory.create(config, instantTime, this, 
keyToNewRecords, partitionPath, fileId,
         dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index 6f94139b4b7..994d66e3324 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -50,7 +50,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieKeyGeneratorException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.KeyGeneratorInterface;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
@@ -336,13 +335,7 @@ public class SparkBootstrapCommitActionExecutor<T>
     TypedProperties properties = new TypedProperties();
     properties.putAll(config.getProps());
 
-    KeyGeneratorInterface keyGenerator;
-    try {
-      keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);
-    } catch (IOException e) {
-      throw new HoodieKeyGeneratorException("Init keyGenerator failed ", e);
-    }
-
+    KeyGeneratorInterface keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);
     BootstrapPartitionPathTranslator translator = 
ReflectionUtils.loadClass(config.getBootstrapPartitionPathTranslatorClass());
 
     List<Pair<String, Pair<String, HoodieFileStatus>>> bootstrapPaths = 
partitions.stream()
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 264e00c53f9..30e3cb533b1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -41,7 +41,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieCommitException;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.SparkLazyInsertIterable;
 import org.apache.hudi.index.HoodieIndex;
@@ -103,13 +102,7 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
                                        WriteOperationType operationType,
                                        Option<Map<String, String>> 
extraMetadata) {
     super(context, config, table, instantTime, operationType, extraMetadata);
-    try {
-      keyGeneratorOpt = config.populateMetaFields()
-          ? Option.empty()
-          : Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(this.config.getProps()));
-    } catch (IOException e) {
-      throw new HoodieIOException("Only BaseKeyGenerators are supported when 
meta columns are disabled ", e);
-    }
+    keyGeneratorOpt = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
   }
 
   private HoodieData<HoodieRecord<T>> 
clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index 6117cdcae1e..c857b61e0a4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -18,19 +18,18 @@
 
 package org.apache.hudi.bootstrap;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -39,6 +38,8 @@ import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+
+import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
@@ -69,41 +70,36 @@ public abstract class SparkFullBootstrapDataProviderBase 
extends FullRecordBoots
     // More details at 
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
     HoodieRecordType recordType =  config.getRecordMerger().getRecordType();
     Dataset inputDataset = 
sparkSession.read().format(getFormat()).option("basePath", 
sourceBasePath).load(filePaths);
-    try {
-      KeyGenerator keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
-      String precombineKey = 
props.getString("hoodie.datasource.write.precombine.field");
-      String structName = tableName + "_record";
-      String namespace = "hoodie." + tableName;
-      if (recordType == HoodieRecordType.AVRO) {
-        RDD<GenericRecord> genericRecords = 
HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
-            Option.empty());
-        return genericRecords.toJavaRDD().map(gr -> {
-          String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
-              gr, precombineKey, false, props.getBoolean(
-                  
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                  
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
-          try {
-            return DataSourceUtils.createHoodieRecord(gr, orderingVal, 
keyGenerator.getKey(gr),
-                props.getString("hoodie.datasource.write.payload.class"), 
scala.Option.apply(null));
-          } catch (IOException ioe) {
-            throw new HoodieIOException(ioe.getMessage(), ioe);
-          }
-        });
-      } else if (recordType == HoodieRecordType.SPARK) {
-        SparkKeyGeneratorInterface sparkKeyGenerator = 
(SparkKeyGeneratorInterface) keyGenerator;
-        StructType structType = inputDataset.schema();
-        return 
inputDataset.queryExecution().toRdd().toJavaRDD().map(internalRow -> {
-          String recordKey = sparkKeyGenerator.getRecordKey(internalRow, 
structType).toString();
-          String partitionPath = 
sparkKeyGenerator.getPartitionPath(internalRow, structType).toString();
-          HoodieKey key = new HoodieKey(recordKey, partitionPath);
-          return new HoodieSparkRecord(key, internalRow, structType, false);
-        });
-      } else {
-        throw new UnsupportedOperationException(recordType.name());
-      }
-
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
+    KeyGenerator keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+    String precombineKey = 
props.getString("hoodie.datasource.write.precombine.field");
+    String structName = tableName + "_record";
+    String namespace = "hoodie." + tableName;
+    if (recordType == HoodieRecordType.AVRO) {
+      RDD<GenericRecord> genericRecords = 
HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
+          Option.empty());
+      return genericRecords.toJavaRDD().map(gr -> {
+        String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
+            gr, precombineKey, false, props.getBoolean(
+                
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+                
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
+        try {
+          return DataSourceUtils.createHoodieRecord(gr, orderingVal, 
keyGenerator.getKey(gr),
+              props.getString("hoodie.datasource.write.payload.class"), 
scala.Option.apply(null));
+        } catch (IOException ioe) {
+          throw new HoodieIOException(ioe.getMessage(), ioe);
+        }
+      });
+    } else if (recordType == HoodieRecordType.SPARK) {
+      SparkKeyGeneratorInterface sparkKeyGenerator = 
(SparkKeyGeneratorInterface) keyGenerator;
+      StructType structType = inputDataset.schema();
+      return inputDataset.queryExecution().toRdd().toJavaRDD().map(internalRow 
-> {
+        String recordKey = sparkKeyGenerator.getRecordKey(internalRow, 
structType).toString();
+        String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, 
structType).toString();
+        HoodieKey key = new HoodieKey(recordKey, partitionPath);
+        return new HoodieSparkRecord(key, internalRow, structType, false);
+      });
+    } else {
+      throw new UnsupportedOperationException(recordType.name());
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 0ba8d1425e7..46e8b9f441d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -377,6 +377,6 @@ public class TestCustomKeyGenerator extends 
KeyGeneratorTestUtilities {
 
   private static Throwable getNestedConstructorErrorCause(Exception e) {
     // custom key generator will fail in the constructor, and we must unwrap 
the cause for asserting error messages
-    return e.getCause().getCause().getCause();
+    return e.getCause().getCause();
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
index 3cc30e86399..e7c9c723721 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen.factory;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
 import org.apache.hudi.keygen.ComplexKeyGenerator;
 import org.apache.hudi.keygen.CustomKeyGenerator;
@@ -32,8 +33,6 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
 
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
-
 import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -67,7 +66,7 @@ public class TestHoodieSparkKeyGeneratorFactory {
   }
 
   @Test
-  public void testKeyGeneratorFactory() throws IOException {
+  public void testKeyGeneratorFactory() {
     TypedProperties props = getCommonProps();
 
     // set KeyGenerator type only
@@ -91,7 +90,7 @@ public class TestHoodieSparkKeyGeneratorFactory {
     // set wrong class name
     final TypedProperties props2 = getCommonProps();
     props2.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
TestHoodieSparkKeyGeneratorFactory.class.getName());
-    assertThrows(IOException.class, () -> 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props2));
+    assertThrows(HoodieException.class, () -> 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props2));
 
     // set wrong keyGenerator type
     final TypedProperties props3 = getCommonProps();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 0767d055915..120304c1219 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -412,7 +412,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
 
     // try write to Hudi
-    assertThrows[IOException] {
+    assertThrows[HoodieException] {
       HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts - 
DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df)
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index dd613ce1153..f710786e41f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -1104,7 +1104,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       writer.save(basePath)
       fail("should fail when invalid PartitionKeyType is provided!")
     } catch {
-      case e: Exception => assertTrue(e.getCause.getMessage.contains("Unable 
to instantiate class org.apache.hudi.keygen.CustomKeyGenerator"))
+      case e: Exception => assertTrue(e.getMessage.contains("Unable to 
instantiate class org.apache.hudi.keygen.CustomKeyGenerator"))
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
index c85eb40bca7..0fc2976c7fe 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
@@ -33,8 +33,6 @@ import org.joda.time.format.DateTimeFormat
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.slf4j.LoggerFactory
 
-import java.io.IOException
-
 /**
  * Tests Spark SQL DML with custom key generator and write configs.
  */
@@ -288,7 +286,7 @@ class TestSparkSqlWithCustomKeyGenerator extends 
HoodieSparkSqlTestBase {
         // INSERT INTO should fail for tableNameCustom1
         val sourceTableName = tableNameCustom1 + "_source"
         prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 
1706800227, 'cat1')"))
-        assertThrows[IOException] {
+        assertThrows[HoodieException] {
           spark.sql(
             s"""
                | INSERT INTO $tableNameCustom1
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index bc6332c842d..14aa3b5d2e9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -390,7 +390,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testPropsWithInvalidKeyGenerator() {
-    Exception e = assertThrows(IOException.class, () -> {
+    Exception e = assertThrows(HoodieException.class, () -> {
       String tableBasePath = basePath + "/test_table_invalid_key_gen";
       HoodieDeltaStreamer deltaStreamer =
           new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
@@ -399,7 +399,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }, "Should error out when setting the key generator class property to an 
invalid value");
     // expected
     LOG.warn("Expected error during getting the key generator", e);
-    assertTrue(e.getMessage().contains("Could not load key generator class 
invalid"));
+    assertTrue(e.getMessage().contains("Unable to load class"));
   }
 
   private static Stream<Arguments> provideInferKeyGenArgs() {

Reply via email to