This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9a2a61c64f2d8cb6dc096c70283bccf40acd7326 Author: Марк Бухнер <[email protected]> AuthorDate: Tue Apr 23 16:12:35 2024 +0700 [MINOR] Fix incorrect catch of ClassCastException using HoodieSparkKeyGeneratorFactory (#11062) --- .../run/strategy/ExecutionStrategyUtil.java | 17 +----- .../MultipleSparkJobExecutionStrategy.java | 3 +- .../strategy/SingleSparkJobExecutionStrategy.java | 3 +- .../apache/hudi/index/SparkHoodieIndexFactory.java | 19 +----- .../factory/HoodieSparkKeyGeneratorFactory.java | 45 +++++++++----- .../hudi/table/HoodieSparkCopyOnWriteTable.java | 12 +--- .../SparkBootstrapCommitActionExecutor.java | 9 +-- .../commit/BaseSparkCommitActionExecutor.java | 9 +-- .../SparkFullBootstrapDataProviderBase.java | 70 ++++++++++------------ .../apache/hudi/keygen/TestCustomKeyGenerator.java | 2 +- .../TestHoodieSparkKeyGeneratorFactory.java | 7 +-- .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +- .../apache/hudi/functional/TestCOWDataSource.scala | 2 +- .../TestSparkSqlWithCustomKeyGenerator.scala | 4 +- .../deltastreamer/TestHoodieDeltaStreamer.java | 4 +- 15 files changed, 79 insertions(+), 129 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java index b70eed70090..5fd2cb65d69 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java @@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.run.strategy; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -28,13 +27,10 @@ import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; -import java.io.IOException; - public class ExecutionStrategyUtil { /** @@ -49,18 +45,7 @@ public class ExecutionStrategyUtil { HoodieWriteConfig writeConfig) { GenericRecord record = (GenericRecord) indexedRecord; - Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty(); - - if (!writeConfig.populateMetaFields()) { - try { - TypedProperties typedProperties = new TypedProperties(writeConfig.getProps()); - keyGeneratorOpt = Option.of((BaseKeyGenerator) - HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties)); - } catch (IOException e) { - throw new HoodieIOException( - "Only BaseKeyGenerators are supported when meta columns are disabled ", e); - } - } + Option<BaseKeyGenerator> keyGeneratorOpt = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig); String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt); String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 9d8c9318dd2..97edc237b40 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -359,8 +359,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T> Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); HoodieFileReader baseFileReader = getBaseOrBootstrapFileReader(hadoopConf, bootstrapBasePath, partitionFields, clusteringOp); - Option<BaseKeyGenerator> keyGeneratorOp = - writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())); + Option<BaseKeyGenerator> keyGeneratorOp = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig); // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be shuffled later. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index fa2af5d5b90..6353646a07d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -148,8 +148,7 @@ public abstract class SingleSparkJobExecutionStrategy<T> try { HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType) .getFileReader(writeConfig, getHoodieTable().getHadoopConf(), new StoragePath(clusteringOp.getDataFilePath())); - Option<BaseKeyGenerator> keyGeneratorOp = - writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())); + Option<BaseKeyGenerator> keyGeneratorOp = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig); // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be shuffled later. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java index eebaf0f05ba..661152c2d16 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java @@ -18,11 +18,8 @@ package org.apache.hudi.index; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; @@ -33,11 +30,8 @@ import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex; import org.apache.hudi.index.simple.HoodieSimpleIndex; -import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; -import java.io.IOException; - /** * A factory to generate Spark {@link HoodieIndex}. */ @@ -62,9 +56,9 @@ public final class SparkHoodieIndexFactory { case GLOBAL_BLOOM: return new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); case SIMPLE: - return new HoodieSimpleIndex(config, getKeyGeneratorForSimpleIndex(config)); + return new HoodieSimpleIndex(config, HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config)); case GLOBAL_SIMPLE: - return new HoodieGlobalSimpleIndex(config, getKeyGeneratorForSimpleIndex(config)); + return new HoodieGlobalSimpleIndex(config, HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config)); case BUCKET: switch (config.getBucketIndexEngineType()) { case SIMPLE: @@ -108,13 +102,4 @@ public final class SparkHoodieIndexFactory { return createIndex(config).isGlobal(); } } - - private static Option<BaseKeyGenerator> getKeyGeneratorForSimpleIndex(HoodieWriteConfig config) { - try { - return config.populateMetaFields() ? Option.empty() - : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps()))); - } catch (IOException e) { - throw new HoodieIOException("KeyGenerator instantiation failed ", e); - } - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java index dcc2eaec9eb..c655bf62543 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java @@ -23,13 +23,15 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator; +import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.keygen.ComplexKeyGenerator; import org.apache.hudi.keygen.CustomKeyGenerator; import org.apache.hudi.keygen.GlobalDeleteKeyGenerator; -import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; @@ -41,14 +43,13 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Properties; -import static org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY; import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE; +import static org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY; import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType; /** @@ -77,26 +78,40 @@ public class HoodieSparkKeyGeneratorFactory { "org.apache.hudi.keygen.TimestampBasedKeyGenerator"); } - public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { + public static KeyGenerator createKeyGenerator(TypedProperties props) { String keyGeneratorClass = getKeyGeneratorClassName(props); return createKeyGenerator(keyGeneratorClass, props); } - public static KeyGenerator createKeyGenerator(String keyGeneratorClass, TypedProperties props) throws IOException { + public static KeyGenerator createKeyGenerator(String keyGeneratorClass, TypedProperties props) { boolean autoRecordKeyGen = KeyGenUtils.isAutoGeneratedRecordKeysEnabled(props) //Need to prevent overwriting the keygen for spark sql merge into because we need to extract //the recordkey from the meta cols if it exists. Sql keygen will use pkless keygen if needed. && !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false); - try { - KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); - if (autoRecordKeyGen) { - return new AutoRecordGenWrapperKeyGenerator(props, (BuiltinKeyGenerator) keyGenerator); - } else { - // if user comes with their own key generator. - return keyGenerator; + KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); + if (autoRecordKeyGen) { + return new AutoRecordGenWrapperKeyGenerator(props, (BuiltinKeyGenerator) keyGenerator); + } else { + // if user comes with their own key generator. + return keyGenerator; + } + } + + /** + * Creates BaseKeyGenerator if meta columns are disabled. + * + * @throws HoodieException if unable instantiate or cast class to {@link BaseKeyGenerator}. + */ + public static Option<BaseKeyGenerator> createBaseKeyGenerator(HoodieWriteConfig writeConfig) { + if (!writeConfig.populateMetaFields()) { + try { + TypedProperties typedProperties = new TypedProperties(writeConfig.getProps()); + return Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties)); + } catch (ClassCastException cce) { + throw new HoodieException("Only BaseKeyGenerators are supported when meta columns are disabled ", cce); } - } catch (Throwable e) { - throw new IOException("Could not load key generator class " + keyGeneratorClass, e); + } else { + return Option.empty(); } } @@ -140,8 +155,6 @@ public class HoodieSparkKeyGeneratorFactory { return Option.of((BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties)); } catch (ClassCastException cce) { throw new HoodieIOException("Only those key generators implementing BuiltInKeyGenerator interface is supported with virtual keys"); - } catch (IOException e) { - throw new HoodieIOException("Key generator instantiation failed ", e); } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index eeadd40d99e..441ac9eb1ec 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -31,7 +31,6 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieBaseFile; @@ -43,7 +42,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.io.HoodieCreateHandle; @@ -240,15 +238,7 @@ public class HoodieSparkCopyOnWriteTable<T> protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) { - Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty(); - if (!config.populateMetaFields()) { - try { - keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps()))); - } catch (IOException e) { - throw new HoodieIOException("Only BaseKeyGenerator (or any key generator that extends from BaseKeyGenerator) are supported when meta " - + "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e); - } - } + Option<BaseKeyGenerator> keyGeneratorOpt = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config); return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index 6f94139b4b7..994d66e3324 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -50,7 +50,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieKeyGeneratorException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.keygen.KeyGeneratorInterface; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; @@ -336,13 +335,7 @@ public class SparkBootstrapCommitActionExecutor<T> TypedProperties properties = new TypedProperties(); properties.putAll(config.getProps()); - KeyGeneratorInterface keyGenerator; - try { - keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties); - } catch (IOException e) { - throw new HoodieKeyGeneratorException("Init keyGenerator failed ", e); - } - + KeyGeneratorInterface keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties); BootstrapPartitionPathTranslator translator = ReflectionUtils.loadClass(config.getBootstrapPartitionPathTranslatorClass()); List<Pair<String, Pair<String, HoodieFileStatus>>> bootstrapPaths = partitions.stream() diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 264e00c53f9..30e3cb533b1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -41,7 +41,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieCommitException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.index.HoodieIndex; @@ -103,13 +102,7 @@ public abstract class BaseSparkCommitActionExecutor<T> extends WriteOperationType operationType, Option<Map<String, String>> extraMetadata) { super(context, config, table, instantTime, operationType, extraMetadata); - try { - keyGeneratorOpt = config.populateMetaFields() - ? Option.empty() - : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(this.config.getProps())); - } catch (IOException e) { - throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e); - } + keyGeneratorOpt = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config); } private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java index 6117cdcae1e..c857b61e0a4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -18,19 +18,18 @@ package org.apache.hudi.bootstrap; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -39,6 +38,8 @@ import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.SparkKeyGeneratorInterface; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; + +import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; @@ -69,41 +70,36 @@ public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBoots // More details at https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery HoodieRecordType recordType = config.getRecordMerger().getRecordType(); Dataset inputDataset = sparkSession.read().format(getFormat()).option("basePath", sourceBasePath).load(filePaths); - try { - KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - String precombineKey = props.getString("hoodie.datasource.write.precombine.field"); - String structName = tableName + "_record"; - String namespace = "hoodie." + tableName; - if (recordType == HoodieRecordType.AVRO) { - RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false, - Option.empty()); - return genericRecords.toJavaRDD().map(gr -> { - String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( - gr, precombineKey, false, props.getBoolean( - KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))); - try { - return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), - props.getString("hoodie.datasource.write.payload.class"), scala.Option.apply(null)); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } - }); - } else if (recordType == HoodieRecordType.SPARK) { - SparkKeyGeneratorInterface sparkKeyGenerator = (SparkKeyGeneratorInterface) keyGenerator; - StructType structType = inputDataset.schema(); - return inputDataset.queryExecution().toRdd().toJavaRDD().map(internalRow -> { - String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType).toString(); - String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType).toString(); - HoodieKey key = new HoodieKey(recordKey, partitionPath); - return new HoodieSparkRecord(key, internalRow, structType, false); - }); - } else { - throw new UnsupportedOperationException(recordType.name()); - } - - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); + KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + String precombineKey = props.getString("hoodie.datasource.write.precombine.field"); + String structName = tableName + "_record"; + String namespace = "hoodie." + tableName; + if (recordType == HoodieRecordType.AVRO) { + RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false, + Option.empty()); + return genericRecords.toJavaRDD().map(gr -> { + String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( + gr, precombineKey, false, props.getBoolean( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))); + try { + return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), + props.getString("hoodie.datasource.write.payload.class"), scala.Option.apply(null)); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + }); + } else if (recordType == HoodieRecordType.SPARK) { + SparkKeyGeneratorInterface sparkKeyGenerator = (SparkKeyGeneratorInterface) keyGenerator; + StructType structType = inputDataset.schema(); + return inputDataset.queryExecution().toRdd().toJavaRDD().map(internalRow -> { + String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType).toString(); + String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType).toString(); + HoodieKey key = new HoodieKey(recordKey, partitionPath); + return new HoodieSparkRecord(key, internalRow, structType, false); + }); + } else { + throw new UnsupportedOperationException(recordType.name()); } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java index 0ba8d1425e7..46e8b9f441d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java @@ -377,6 +377,6 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { private static Throwable getNestedConstructorErrorCause(Exception e) { // custom key generator will fail in the constructor, and we must unwrap the cause for asserting error messages - return e.getCause().getCause().getCause(); + return e.getCause().getCause(); } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java index 3cc30e86399..e7c9c723721 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java @@ -20,6 +20,7 @@ package org.apache.hudi.keygen.factory; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieKeyGeneratorException; import org.apache.hudi.keygen.ComplexKeyGenerator; import org.apache.hudi.keygen.CustomKeyGenerator; @@ -32,8 +33,6 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.junit.jupiter.api.Test; -import java.io.IOException; - import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -67,7 +66,7 @@ public class TestHoodieSparkKeyGeneratorFactory { } @Test - public void testKeyGeneratorFactory() throws IOException { + public void testKeyGeneratorFactory() { TypedProperties props = getCommonProps(); // set KeyGenerator type only @@ -91,7 +90,7 @@ public class TestHoodieSparkKeyGeneratorFactory { // set wrong class name final TypedProperties props2 = getCommonProps(); props2.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), TestHoodieSparkKeyGeneratorFactory.class.getName()); - assertThrows(IOException.class, () -> HoodieSparkKeyGeneratorFactory.createKeyGenerator(props2)); + assertThrows(HoodieException.class, () -> HoodieSparkKeyGeneratorFactory.createKeyGenerator(props2)); // set wrong keyGenerator type final TypedProperties props3 = getCommonProps(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 0767d055915..120304c1219 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -412,7 +412,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = { val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // try write to Hudi - assertThrows[IOException] { + assertThrows[HoodieException] { HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts - DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index dd613ce1153..f710786e41f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -1104,7 +1104,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup writer.save(basePath) fail("should fail when invalid PartitionKeyType is provided!") } catch { - case e: Exception => assertTrue(e.getCause.getMessage.contains("Unable to instantiate class org.apache.hudi.keygen.CustomKeyGenerator")) + case e: Exception => assertTrue(e.getMessage.contains("Unable to instantiate class org.apache.hudi.keygen.CustomKeyGenerator")) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala index c85eb40bca7..0fc2976c7fe 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala @@ -33,8 +33,6 @@ import org.joda.time.format.DateTimeFormat import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.slf4j.LoggerFactory -import java.io.IOException - /** * Tests Spark SQL DML with custom key generator and write configs. */ @@ -288,7 +286,7 @@ class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { // INSERT INTO should fail for tableNameCustom1 val sourceTableName = tableNameCustom1 + "_source" prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) - assertThrows[IOException] { + assertThrows[HoodieException] { spark.sql( s""" | INSERT INTO $tableNameCustom1 diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index bc6332c842d..14aa3b5d2e9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -390,7 +390,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { @Test public void testPropsWithInvalidKeyGenerator() { - Exception e = assertThrows(IOException.class, () -> { + Exception e = assertThrows(HoodieException.class, () -> { String tableBasePath = basePath + "/test_table_invalid_key_gen"; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, @@ -399,7 +399,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { }, "Should error out when setting the key generator class property to an invalid value"); // expected LOG.warn("Expected error during getting the key generator", e); - assertTrue(e.getMessage().contains("Could not load key generator class invalid")); + assertTrue(e.getMessage().contains("Unable to load class")); } private static Stream<Arguments> provideInferKeyGenArgs() {
