This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 172fefa709d [MINOR] Fix incorrect catch of ClassCastException using
HoodieSparkKeyGeneratorFactory (#11062)
172fefa709d is described below
commit 172fefa709df90a27380282b877dd422b185b9d5
Author: Марк Бухнер <[email protected]>
AuthorDate: Tue Apr 23 16:12:35 2024 +0700
[MINOR] Fix incorrect catch of ClassCastException using
HoodieSparkKeyGeneratorFactory (#11062)
---
.../run/strategy/ExecutionStrategyUtil.java | 17 +-----
.../MultipleSparkJobExecutionStrategy.java | 3 +-
.../strategy/SingleSparkJobExecutionStrategy.java | 3 +-
.../apache/hudi/index/SparkHoodieIndexFactory.java | 19 +-----
.../factory/HoodieSparkKeyGeneratorFactory.java | 45 +++++++++-----
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 12 +---
.../SparkBootstrapCommitActionExecutor.java | 9 +--
.../commit/BaseSparkCommitActionExecutor.java | 9 +--
.../SparkFullBootstrapDataProviderBase.java | 70 ++++++++++------------
.../apache/hudi/keygen/TestCustomKeyGenerator.java | 2 +-
.../TestHoodieSparkKeyGeneratorFactory.java | 7 +--
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 2 +-
.../TestSparkSqlWithCustomKeyGenerator.scala | 4 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 4 +-
15 files changed, 79 insertions(+), 129 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java
index b70eed70090..5fd2cb65d69 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ExecutionStrategyUtil.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.run.strategy;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -28,13 +27,10 @@ import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
-import java.io.IOException;
-
public class ExecutionStrategyUtil {
/**
@@ -49,18 +45,7 @@ public class ExecutionStrategyUtil {
HoodieWriteConfig writeConfig) {
GenericRecord record = (GenericRecord) indexedRecord;
- Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
-
- if (!writeConfig.populateMetaFields()) {
- try {
- TypedProperties typedProperties = new
TypedProperties(writeConfig.getProps());
- keyGeneratorOpt = Option.of((BaseKeyGenerator)
-
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
- } catch (IOException e) {
- throw new HoodieIOException(
- "Only BaseKeyGenerators are supported when meta columns are
disabled ", e);
- }
- }
+ Option<BaseKeyGenerator> keyGeneratorOpt =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
String key = KeyGenUtils.getRecordKeyFromGenericRecord(record,
keyGeneratorOpt);
String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record,
keyGeneratorOpt);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index a7dbd1bb4d0..8169e5fae46 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -364,8 +364,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(writeConfig.getSchema()));
HoodieFileReader baseFileReader =
getBaseOrBootstrapFileReader(hadoopConf, bootstrapBasePath, partitionFields,
clusteringOp);
- Option<BaseKeyGenerator> keyGeneratorOp =
- writeConfig.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
+ Option<BaseKeyGenerator> keyGeneratorOp =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying)
buffer we get a clean copy of
// it since these records will be shuffled later.
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index fa2af5d5b90..6353646a07d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -148,8 +148,7 @@ public abstract class SingleSparkJobExecutionStrategy<T>
try {
HoodieFileReader baseFileReader =
HoodieFileReaderFactory.getReaderFactory(recordType)
.getFileReader(writeConfig, getHoodieTable().getHadoopConf(),
new StoragePath(clusteringOp.getDataFilePath()));
- Option<BaseKeyGenerator> keyGeneratorOp =
- writeConfig.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
+ Option<BaseKeyGenerator> keyGeneratorOp =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer
we get a clean copy of
// it since these records will be shuffled later.
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
index eebaf0f05ba..661152c2d16 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
@@ -18,11 +18,8 @@
package org.apache.hudi.index;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
@@ -33,11 +30,8 @@ import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
import org.apache.hudi.index.simple.HoodieSimpleIndex;
-import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
-import java.io.IOException;
-
/**
* A factory to generate Spark {@link HoodieIndex}.
*/
@@ -62,9 +56,9 @@ public final class SparkHoodieIndexFactory {
case GLOBAL_BLOOM:
return new HoodieGlobalBloomIndex(config,
SparkHoodieBloomIndexHelper.getInstance());
case SIMPLE:
- return new HoodieSimpleIndex(config,
getKeyGeneratorForSimpleIndex(config));
+ return new HoodieSimpleIndex(config,
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config));
case GLOBAL_SIMPLE:
- return new HoodieGlobalSimpleIndex(config,
getKeyGeneratorForSimpleIndex(config));
+ return new HoodieGlobalSimpleIndex(config,
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config));
case BUCKET:
switch (config.getBucketIndexEngineType()) {
case SIMPLE:
@@ -108,13 +102,4 @@ public final class SparkHoodieIndexFactory {
return createIndex(config).isGlobal();
}
}
-
- private static Option<BaseKeyGenerator>
getKeyGeneratorForSimpleIndex(HoodieWriteConfig config) {
- try {
- return config.populateMetaFields() ? Option.empty()
- : Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps())));
- } catch (IOException e) {
- throw new HoodieIOException("KeyGenerator instantiation failed ", e);
- }
- }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index dcc2eaec9eb..c655bf62543 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -23,13 +23,15 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator;
+import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.hudi.keygen.GlobalDeleteKeyGenerator;
-import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
@@ -41,14 +43,13 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
-import static
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY;
import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
+import static
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY;
import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType;
/**
@@ -77,26 +78,40 @@ public class HoodieSparkKeyGeneratorFactory {
"org.apache.hudi.keygen.TimestampBasedKeyGenerator");
}
- public static KeyGenerator createKeyGenerator(TypedProperties props) throws
IOException {
+ public static KeyGenerator createKeyGenerator(TypedProperties props) {
String keyGeneratorClass = getKeyGeneratorClassName(props);
return createKeyGenerator(keyGeneratorClass, props);
}
- public static KeyGenerator createKeyGenerator(String keyGeneratorClass,
TypedProperties props) throws IOException {
+ public static KeyGenerator createKeyGenerator(String keyGeneratorClass,
TypedProperties props) {
boolean autoRecordKeyGen =
KeyGenUtils.isAutoGeneratedRecordKeysEnabled(props)
//Need to prevent overwriting the keygen for spark sql merge into
because we need to extract
//the recordkey from the meta cols if it exists. Sql keygen will use
pkless keygen if needed.
&& !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
- try {
- KeyGenerator keyGenerator = (KeyGenerator)
ReflectionUtils.loadClass(keyGeneratorClass, props);
- if (autoRecordKeyGen) {
- return new AutoRecordGenWrapperKeyGenerator(props,
(BuiltinKeyGenerator) keyGenerator);
- } else {
- // if user comes with their own key generator.
- return keyGenerator;
+ KeyGenerator keyGenerator = (KeyGenerator)
ReflectionUtils.loadClass(keyGeneratorClass, props);
+ if (autoRecordKeyGen) {
+ return new AutoRecordGenWrapperKeyGenerator(props, (BuiltinKeyGenerator)
keyGenerator);
+ } else {
+ // if user comes with their own key generator.
+ return keyGenerator;
+ }
+ }
+
+ /**
+ * Creates BaseKeyGenerator if meta columns are disabled.
+ *
+ * @throws HoodieException if unable instantiate or cast class to {@link
BaseKeyGenerator}.
+ */
+ public static Option<BaseKeyGenerator>
createBaseKeyGenerator(HoodieWriteConfig writeConfig) {
+ if (!writeConfig.populateMetaFields()) {
+ try {
+ TypedProperties typedProperties = new
TypedProperties(writeConfig.getProps());
+ return Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
+ } catch (ClassCastException cce) {
+ throw new HoodieException("Only BaseKeyGenerators are supported when
meta columns are disabled ", cce);
}
- } catch (Throwable e) {
- throw new IOException("Could not load key generator class " +
keyGeneratorClass, e);
+ } else {
+ return Option.empty();
}
}
@@ -140,8 +155,6 @@ public class HoodieSparkKeyGeneratorFactory {
return Option.of((BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
} catch (ClassCastException cce) {
throw new HoodieIOException("Only those key generators implementing
BuiltInKeyGenerator interface is supported with virtual keys");
- } catch (IOException e) {
- throw new HoodieIOException("Key generator instantiation failed ", e);
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 0a533e65912..8848a2bb3c7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -31,7 +31,6 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -43,7 +42,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.HoodieCreateHandle;
@@ -251,15 +249,7 @@ public class HoodieSparkCopyOnWriteTable<T>
protected HoodieMergeHandle getUpdateHandle(String instantTime, String
partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile
dataFileToBeMerged) {
- Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
- if (!config.populateMetaFields()) {
- try {
- keyGeneratorOpt = Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps())));
- } catch (IOException e) {
- throw new HoodieIOException("Only BaseKeyGenerator (or any key
generator that extends from BaseKeyGenerator) are supported when meta "
- + "columns are disabled. Please choose the right key generator if
you wish to disable meta fields.", e);
- }
- }
+ Option<BaseKeyGenerator> keyGeneratorOpt =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
return HoodieMergeHandleFactory.create(config, instantTime, this,
keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index 7d9b0e123e0..38fa15542b0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -48,7 +48,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
@@ -298,13 +297,7 @@ public class SparkBootstrapCommitActionExecutor<T>
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
- KeyGeneratorInterface keyGenerator;
- try {
- keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);
- } catch (IOException e) {
- throw new HoodieKeyGeneratorException("Init keyGenerator failed ", e);
- }
-
+ KeyGeneratorInterface keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);
BootstrapPartitionPathTranslator translator =
ReflectionUtils.loadClass(config.getBootstrapPartitionPathTranslatorClass());
List<Pair<String, Pair<String, HoodieFileStatus>>> bootstrapPaths =
partitions.stream()
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 0fcb2359cdf..c47ba72949c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -38,7 +38,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.index.HoodieIndex;
@@ -99,13 +98,7 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
WriteOperationType operationType,
Option<Map<String, String>>
extraMetadata) {
super(context, config, table, instantTime, operationType, extraMetadata);
- try {
- keyGeneratorOpt = config.populateMetaFields()
- ? Option.empty()
- : Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(this.config.getProps()));
- } catch (IOException e) {
- throw new HoodieIOException("Only BaseKeyGenerators are supported when
meta columns are disabled ", e);
- }
+ keyGeneratorOpt =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
}
private HoodieData<HoodieRecord<T>>
clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index 6117cdcae1e..c857b61e0a4 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -18,19 +18,18 @@
package org.apache.hudi.bootstrap;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -39,6 +38,8 @@ import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+
+import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
@@ -69,41 +70,36 @@ public abstract class SparkFullBootstrapDataProviderBase
extends FullRecordBoots
// More details at
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
HoodieRecordType recordType = config.getRecordMerger().getRecordType();
Dataset inputDataset =
sparkSession.read().format(getFormat()).option("basePath",
sourceBasePath).load(filePaths);
- try {
- KeyGenerator keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
- String precombineKey =
props.getString("hoodie.datasource.write.precombine.field");
- String structName = tableName + "_record";
- String namespace = "hoodie." + tableName;
- if (recordType == HoodieRecordType.AVRO) {
- RDD<GenericRecord> genericRecords =
HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
- Option.empty());
- return genericRecords.toJavaRDD().map(gr -> {
- String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
- gr, precombineKey, false, props.getBoolean(
-
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
- try {
- return DataSourceUtils.createHoodieRecord(gr, orderingVal,
keyGenerator.getKey(gr),
- props.getString("hoodie.datasource.write.payload.class"),
scala.Option.apply(null));
- } catch (IOException ioe) {
- throw new HoodieIOException(ioe.getMessage(), ioe);
- }
- });
- } else if (recordType == HoodieRecordType.SPARK) {
- SparkKeyGeneratorInterface sparkKeyGenerator =
(SparkKeyGeneratorInterface) keyGenerator;
- StructType structType = inputDataset.schema();
- return
inputDataset.queryExecution().toRdd().toJavaRDD().map(internalRow -> {
- String recordKey = sparkKeyGenerator.getRecordKey(internalRow,
structType).toString();
- String partitionPath =
sparkKeyGenerator.getPartitionPath(internalRow, structType).toString();
- HoodieKey key = new HoodieKey(recordKey, partitionPath);
- return new HoodieSparkRecord(key, internalRow, structType, false);
- });
- } else {
- throw new UnsupportedOperationException(recordType.name());
- }
-
- } catch (IOException ioe) {
- throw new HoodieIOException(ioe.getMessage(), ioe);
+ KeyGenerator keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+ String precombineKey =
props.getString("hoodie.datasource.write.precombine.field");
+ String structName = tableName + "_record";
+ String namespace = "hoodie." + tableName;
+ if (recordType == HoodieRecordType.AVRO) {
+ RDD<GenericRecord> genericRecords =
HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
+ Option.empty());
+ return genericRecords.toJavaRDD().map(gr -> {
+ String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
+ gr, precombineKey, false, props.getBoolean(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
+ try {
+ return DataSourceUtils.createHoodieRecord(gr, orderingVal,
keyGenerator.getKey(gr),
+ props.getString("hoodie.datasource.write.payload.class"),
scala.Option.apply(null));
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ });
+ } else if (recordType == HoodieRecordType.SPARK) {
+ SparkKeyGeneratorInterface sparkKeyGenerator =
(SparkKeyGeneratorInterface) keyGenerator;
+ StructType structType = inputDataset.schema();
+ return inputDataset.queryExecution().toRdd().toJavaRDD().map(internalRow
-> {
+ String recordKey = sparkKeyGenerator.getRecordKey(internalRow,
structType).toString();
+ String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow,
structType).toString();
+ HoodieKey key = new HoodieKey(recordKey, partitionPath);
+ return new HoodieSparkRecord(key, internalRow, structType, false);
+ });
+ } else {
+ throw new UnsupportedOperationException(recordType.name());
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 0ba8d1425e7..46e8b9f441d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -377,6 +377,6 @@ public class TestCustomKeyGenerator extends
KeyGeneratorTestUtilities {
private static Throwable getNestedConstructorErrorCause(Exception e) {
// custom key generator will fail in the constructor, and we must unwrap
the cause for asserting error messages
- return e.getCause().getCause().getCause();
+ return e.getCause().getCause();
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
index 3cc30e86399..e7c9c723721 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen.factory;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
@@ -32,8 +33,6 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
-
import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -67,7 +66,7 @@ public class TestHoodieSparkKeyGeneratorFactory {
}
@Test
- public void testKeyGeneratorFactory() throws IOException {
+ public void testKeyGeneratorFactory() {
TypedProperties props = getCommonProps();
// set KeyGenerator type only
@@ -91,7 +90,7 @@ public class TestHoodieSparkKeyGeneratorFactory {
// set wrong class name
final TypedProperties props2 = getCommonProps();
props2.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
TestHoodieSparkKeyGeneratorFactory.class.getName());
- assertThrows(IOException.class, () ->
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props2));
+ assertThrows(HoodieException.class, () ->
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props2));
// set wrong keyGenerator type
final TypedProperties props3 = getCommonProps();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 5603c35e03c..398b48ce135 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -412,7 +412,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// try write to Hudi
- assertThrows[IOException] {
+ assertThrows[HoodieException] {
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts -
DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index bf039f565df..9bdb57a854b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -1102,7 +1102,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
writer.save(basePath)
fail("should fail when invalid PartitionKeyType is provided!")
} catch {
- case e: Exception => assertTrue(e.getCause.getMessage.contains("Unable
to instantiate class org.apache.hudi.keygen.CustomKeyGenerator"))
+ case e: Exception => assertTrue(e.getMessage.contains("Unable to
instantiate class org.apache.hudi.keygen.CustomKeyGenerator"))
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
index c85eb40bca7..0fc2976c7fe 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
@@ -33,8 +33,6 @@ import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.slf4j.LoggerFactory
-import java.io.IOException
-
/**
* Tests Spark SQL DML with custom key generator and write configs.
*/
@@ -288,7 +286,7 @@ class TestSparkSqlWithCustomKeyGenerator extends
HoodieSparkSqlTestBase {
// INSERT INTO should fail for tableNameCustom1
val sourceTableName = tableNameCustom1 + "_source"
prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0,
1706800227, 'cat1')"))
- assertThrows[IOException] {
+ assertThrows[HoodieException] {
spark.sql(
s"""
| INSERT INTO $tableNameCustom1
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 5cef5191a7c..0edb3c6cb34 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -372,7 +372,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testPropsWithInvalidKeyGenerator() {
- Exception e = assertThrows(IOException.class, () -> {
+ Exception e = assertThrows(HoodieException.class, () -> {
String tableBasePath = basePath + "/test_table_invalid_key_gen";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT,
@@ -381,7 +381,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}, "Should error out when setting the key generator class property to an
invalid value");
// expected
LOG.warn("Expected error during getting the key generator", e);
- assertTrue(e.getMessage().contains("Could not load key generator class
invalid"));
+ assertTrue(e.getMessage().contains("Unable to load class"));
}
private static Stream<Arguments> provideInferKeyGenArgs() {