vinothchandar commented on a change in pull request #3306:
URL: https://github.com/apache/hudi/pull/3306#discussion_r675205989
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
##########
@@ -39,18 +41,27 @@
public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K,
O> extends HoodieReadHandle<T, I, K, O> {
private final Pair<String, HoodieBaseFile> partitionPathBaseFilePair;
+ private final Option<BaseKeyGenerator> keyGeneratorOpt;
public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T,
I, K, O> hoodieTable,
- Pair<String, HoodieBaseFile>
partitionPathBaseFilePair) {
+ Pair<String, HoodieBaseFile>
partitionPathBaseFilePair, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, null, hoodieTable,
Pair.of(partitionPathBaseFilePair.getLeft(),
partitionPathBaseFilePair.getRight().getFileId()));
this.partitionPathBaseFilePair = partitionPathBaseFilePair;
+ this.keyGeneratorOpt = keyGeneratorOpt;
}
public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
- return
BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath(
- hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
- .map(entry -> Pair.of(entry,
- new HoodieRecordLocation(baseFile.getCommitTime(),
baseFile.getFileId())));
+ if (config.populateMetaFields()) {
Review comment:
you already decide in a upper layer to pass in Option.empty if
`config.popularMetaFields() == true` right? In these cases, it advisable to
just use `keyGeneratorOpt.map(keyGen -> /* else block call */).orElse(/* if
block*/)` and not rely on checking the config again and again
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -101,33 +103,44 @@
protected long updatedRecordsWritten = 0;
protected long insertRecordsWritten = 0;
protected boolean useWriterSchema;
+ protected boolean populateMetaFields;
+ protected Option<BaseKeyGenerator> keyGeneratorOpt;
private HoodieBaseFile baseFileToMerge;
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
- TaskContextSupplier taskContextSupplier) {
+ TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier,
- hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath,
fileId).get());
+ hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath,
fileId).get(), keyGeneratorOpt);
}
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
- TaskContextSupplier taskContextSupplier,
HoodieBaseFile baseFile) {
+ TaskContextSupplier taskContextSupplier,
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier);
init(fileId, recordItr);
init(fileId, partitionPath, baseFile);
+ this.populateMetaFields = config.populateMetaFields();
Review comment:
same question: can we just work off `keyGeneratorOpt.isEmpty()`
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -244,6 +244,22 @@ public static Schema getRecordKeyPartitionPathSchema() {
return recordSchema;
}
+ /**
+ * Fetch schema for record key and partition path.
+ */
+ public static Schema getRecordKeyPartitionPathSchema(Schema fileSchema,
List<String> recordKeyFields, List<String> partitionPathFields) {
Review comment:
any reason why we can't just merge the lists outside and keep this
method simpler. i.e take a list of fields and get a subschema? in fact, we may
have a method like that already, that we can reuse around.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -101,33 +103,44 @@
protected long updatedRecordsWritten = 0;
protected long insertRecordsWritten = 0;
protected boolean useWriterSchema;
+ protected boolean populateMetaFields;
+ protected Option<BaseKeyGenerator> keyGeneratorOpt;
private HoodieBaseFile baseFileToMerge;
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
- TaskContextSupplier taskContextSupplier) {
+ TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier,
- hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath,
fileId).get());
+ hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath,
fileId).get(), keyGeneratorOpt);
}
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
- TaskContextSupplier taskContextSupplier,
HoodieBaseFile baseFile) {
+ TaskContextSupplier taskContextSupplier,
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier);
init(fileId, recordItr);
init(fileId, partitionPath, baseFile);
+ this.populateMetaFields = config.populateMetaFields();
+ setAndValidateKeyGenProps(keyGeneratorOpt);
}
/**
* Called by compactor code path.
*/
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords,
String partitionPath, String fileId,
- HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier) {
+ HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator>
keyGeneratorOpt) {
super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, this.partitionPath, dataFileToBeMerged);
+ this.populateMetaFields = config.populateMetaFields();
+ setAndValidateKeyGenProps(keyGeneratorOpt);
+ }
+
+ private void setAndValidateKeyGenProps(Option<BaseKeyGenerator>
keyGeneratorOpt) {
Review comment:
validate and then set?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -78,13 +82,16 @@
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>, HoodieWriteMetadata> {
private static final Logger LOG =
LogManager.getLogger(BaseSparkCommitActionExecutor.class);
+ protected boolean populateMetaFields;
Review comment:
do we need both?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -45,7 +45,8 @@
TaskContextSupplier taskContextSupplier) throws IOException {
final String extension = FSUtils.getFileExtension(path.getName());
if (PARQUET.getFileExtension().equals(extension)) {
- return newParquetFileWriter(instantTime, path, config, schema,
hoodieTable, taskContextSupplier);
+ return newParquetFileWriter(instantTime, path, config, schema,
hoodieTable, taskContextSupplier, config.populateMetaFields(),
Review comment:
why would we need to pass the same value twice?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
##########
@@ -39,18 +41,27 @@
public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K,
O> extends HoodieReadHandle<T, I, K, O> {
private final Pair<String, HoodieBaseFile> partitionPathBaseFilePair;
+ private final Option<BaseKeyGenerator> keyGeneratorOpt;
public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T,
I, K, O> hoodieTable,
- Pair<String, HoodieBaseFile>
partitionPathBaseFilePair) {
+ Pair<String, HoodieBaseFile>
partitionPathBaseFilePair, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, null, hoodieTable,
Pair.of(partitionPathBaseFilePair.getLeft(),
partitionPathBaseFilePair.getRight().getFileId()));
this.partitionPathBaseFilePair = partitionPathBaseFilePair;
+ this.keyGeneratorOpt = keyGeneratorOpt;
}
public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
- return
BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath(
- hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
- .map(entry -> Pair.of(entry,
- new HoodieRecordLocation(baseFile.getCommitTime(),
baseFile.getFileId())));
+ if (config.populateMetaFields()) {
+ return
BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath(
+ hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
+ .map(entry -> Pair.of(entry,
+ new HoodieRecordLocation(baseFile.getCommitTime(),
baseFile.getFileId())));
+ } else {
+ return
BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath(
+ hoodieTable.getHadoopConf(), new Path(baseFile.getPath()),
keyGeneratorOpt.get()).stream()
+ .map(entry -> Pair.of(entry,
Review comment:
can we avoid repeating lines 63, 58.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java
##########
@@ -146,8 +151,15 @@ public boolean isImplicitWithStorage() {
List<Pair<String, HoodieBaseFile>> baseFiles) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
int fetchParallelism = Math.max(1, Math.max(baseFiles.size(),
parallelism));
- return jsc.parallelize(baseFiles, fetchParallelism)
- .flatMapToPair(partitionPathBaseFile -> new
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile)
+
+ try {
+ Option<BaseKeyGenerator> keyGeneratorOpt = config.populateMetaFields() ?
Option.empty()
+ : Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps())));
+ return jsc.parallelize(baseFiles, fetchParallelism)
+ .flatMapToPair(partitionPathBaseFile -> new
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile,
keyGeneratorOpt)
.locations().map(x -> Tuple2.apply(((Pair)x).getLeft(),
((Pair)x).getRight())).iterator());
+ } catch (IOException e) {
+ throw new HoodieIOException("KeyGenerator instantiation throwed
exception " + e);
Review comment:
`", e` ? instead of `+` ?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
##########
@@ -28,4 +30,6 @@
String getRecordKey(Row row);
String getPartitionPath(Row row);
+
+ String getPartitionPath(InternalRow internalRow, StructType structType);
Review comment:
should we do a default implementation, so that not everyone is forced to
implemment this?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -94,6 +101,18 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext
context,
WriteOperationType operationType,
Option extraMetadata) {
super(context, config, table, instantTime, operationType, extraMetadata);
+ initKeyGenIfNeeded();
+ }
+
+ private void initKeyGenIfNeeded() {
+ this.populateMetaFields = config.populateMetaFields();
Review comment:
move this to constructor?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java
##########
@@ -66,21 +68,22 @@
private static final Logger LOG =
LogManager.getLogger(HoodieConcatHandle.class);
public HoodieConcatHandle(HoodieWriteConfig config, String instantTime,
HoodieTable hoodieTable, Iterator recordItr,
- String partitionPath, String fileId, TaskContextSupplier
taskContextSupplier) {
- super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier);
+ String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator>
keyGeneratorOpt) {
+ super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier, keyGeneratorOpt);
}
public HoodieConcatHandle(HoodieWriteConfig config, String instantTime,
HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String
fileId,
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier
taskContextSupplier) {
- super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath,
fileId, dataFileToBeMerged, taskContextSupplier);
+ super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath,
fileId, dataFileToBeMerged, taskContextSupplier,
+ Option.empty());
}
/**
* Write old record as is w/o merging with incoming record.
*/
@Override
public void write(GenericRecord oldRecord) {
- String key =
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String key = populateMetaFields ?
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString() :
keyGeneratorOpt.get().getKey(oldRecord).getRecordKey();
Review comment:
can this code be shared?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -45,7 +45,8 @@
TaskContextSupplier taskContextSupplier) throws IOException {
final String extension = FSUtils.getFileExtension(path.getName());
if (PARQUET.getFileExtension().equals(extension)) {
- return newParquetFileWriter(instantTime, path, config, schema,
hoodieTable, taskContextSupplier);
+ return newParquetFileWriter(instantTime, path, config, schema,
hoodieTable, taskContextSupplier, config.populateMetaFields(),
Review comment:
if you want this control for testing, lets add a new overload for
`newParquetFileWriter() `?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -247,8 +247,8 @@ protected String getCommitActionType() {
*/
private HoodieRecord<? extends HoodieRecordPayload> transform(IndexedRecord
indexedRecord) {
GenericRecord record = (GenericRecord) indexedRecord;
- String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String partition =
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+ String key = populateMetaFields ?
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString() :
keyGeneratorOpt.get().getRecordKey(record);
Review comment:
this ternary switch is kind of everywhere. single method call like
`fetchKey(populateMetaFields, record, keyGeneratorOpt)` would be nice
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -210,12 +214,20 @@ public void rollbackBootstrap(HoodieEngineContext
context, String instantTime) {
protected HoodieMergeHandle getUpdateHandle(String instantTime, String
partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile
dataFileToBeMerged) {
+ Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
+ if (!config.populateMetaFields()) {
+ try {
+ keyGeneratorOpt = Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps())));
+ } catch (IOException e) {
+ throw new HoodieIOException("Only BaseKyGenerators are supported when
meta columns are disabled ", e);
Review comment:
would a user understand `BaseKeyGenerators are supported` ? (note typo).
Can we make exception messages more user-friendly?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -94,6 +101,18 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext
context,
WriteOperationType operationType,
Option extraMetadata) {
super(context, config, table, instantTime, operationType, extraMetadata);
+ initKeyGenIfNeeded();
+ }
+
+ private void initKeyGenIfNeeded() {
+ this.populateMetaFields = config.populateMetaFields();
+ if (!populateMetaFields) {
+ try {
+ keyGeneratorOpt = Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps())));
+ } catch (IOException e) {
+ throw new HoodieIOException("Only BaseKeyGenerators are supported when
meta columns are disabled ", e);
Review comment:
move this exception handling into the method itself? its an unchecked
exception anyway. we can save some lines
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -244,6 +244,22 @@ public static Schema getRecordKeyPartitionPathSchema() {
return recordSchema;
}
+ /**
+ * Fetch schema for record key and partition path.
+ */
+ public static Schema getRecordKeyPartitionPathSchema(Schema fileSchema,
List<String> recordKeyFields, List<String> partitionPathFields) {
Review comment:
unit test for this?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -142,6 +143,43 @@
return hoodieKeys;
}
+ /**
+ * Fetch {@link HoodieKey}s from the given parquet file.
+ *
+ * @param filePath The parquet file path.
+ * @param configuration configuration to build fs object
+ * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+ */
+ @Override
+ public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration
configuration, Path filePath, BaseKeyGenerator keyGenerator) {
Review comment:
probably. but this method has a lot of code duplication atm. can we
reduce that
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -58,16 +59,15 @@
private static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieFileWriter<R> newParquetFileWriter(
String instantTime, Path path, HoodieWriteConfig config, Schema schema,
HoodieTable hoodieTable,
- TaskContextSupplier taskContextSupplier) throws IOException {
- BloomFilter filter = createBloomFilter(config);
- HoodieAvroWriteSupport writeSupport =
- new HoodieAvroWriteSupport(new
AvroSchemaConverter(hoodieTable.getHadoopConf()).convert(schema), schema,
filter);
+ TaskContextSupplier taskContextSupplier, boolean populateMetaFields,
boolean enableBloomFilter) throws IOException {
+ BloomFilter filter = enableBloomFilter ? createBloomFilter(config) : null;
Review comment:
yes, let's fix this to not do nulls, if its not a lot of change
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1588,6 +1588,11 @@ public Builder withWriteMetaKeyPrefixes(String
writeMetaKeyPrefixes) {
return this;
}
+ public Builder withPopulateMetaFields(boolean populateMetaFields) {
Review comment:
yeah. probably composing it like that is the right away. Separate out
the table configs from the write configs.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -142,6 +143,43 @@
return hoodieKeys;
}
+ /**
+ * Fetch {@link HoodieKey}s from the given parquet file.
+ *
+ * @param filePath The parquet file path.
+ * @param configuration configuration to build fs object
+ * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+ */
+ @Override
+ public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration
configuration, Path filePath, BaseKeyGenerator keyGenerator) {
+ List<HoodieKey> hoodieKeys = new ArrayList<>();
+ try {
+ if (!filePath.getFileSystem(configuration).exists(filePath)) {
Review comment:
let avoid this call? and have it error out if does not exist.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]