yihua commented on code in PR #10137:
URL: https://github.com/apache/hudi/pull/10137#discussion_r1419016929
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -123,4 +125,11 @@ private Object getFieldValueFromInternalRow(InternalRow
row, Schema recordSchema
return null;
}
}
+
+ @Override
+ public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to) {
+ UnsafeProjection projection =
HoodieInternalRowUtils.generateUnsafeProjectionAlias(AvroConversionUtils.convertAvroSchemaToStructType(from),
Review Comment:
I was referring to AvroConversionUtils.convertAvroSchemaToStructType which
can be replaced by HoodieInternalRowUtils.getCachedSchema(schema) to avoid
repeated parsing.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -77,14 +77,18 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
}
}).asInstanceOf[ClosableIterator[InternalRow]]
} else {
- if (baseFileReader.isEmpty) {
- throw new IllegalArgumentException("Base file reader is missing when
instantiating "
- + "SparkFileFormatInternalRowReaderContext.");
+ val key = schemaPairHashKey(dataSchema, requiredSchema)
Review Comment:
```suggestion
Val schemaPairHashKey = generateSchemaPairHashKey(dataSchema,
requiredSchema)
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,167 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private ClosableIterator<T> baseFileIterator;
private HoodieRecordMerger recordMerger;
- public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
- HoodieTableMetaClient metaClient,
- String fileGroupId,
- TypedProperties props,
- HoodieTimeline timeline,
- HoodieTableQueryType queryType,
- Option<String> instantTime,
- Option<String> startInstantTime,
- boolean shouldUseRecordPosition) throws
Exception {
- // This constructor is a placeholder now to allow automatically fetching
the correct list of
- // base and log files for a file group.
- // Derive base and log files and call the corresponding constructor.
- this(readerContext, metaClient.getHadoopConf(),
metaClient.getBasePathV2().toString(),
- instantTime.get(), Option.empty(), Option.empty(),
- new TableSchemaResolver(metaClient).getTableAvroSchema(),
- props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
- }
+ private final Schema dataSchema;
+ private final Schema requestedSchema;
+
+ private final Schema requiredSchema;
+
+ private final HoodieTableConfig hoodieTableConfig;
+
+ private final Option<UnaryOperator<T>> outputConverter;
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
Configuration hadoopConf,
String tablePath,
String latestCommitTime,
- Option<HoodieBaseFile> baseFilePath,
- Option<List<String>> logFilePathList,
- Schema avroSchema,
+ FileSlice fileSlice,
+ Schema dataSchema,
+ Schema requestedSchema,
TypedProperties props,
+ HoodieTableConfig tableConfig,
long start,
long length,
boolean shouldUseRecordPosition) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
- this.baseFilePath = baseFilePath;
- this.logFilePathList = logFilePathList;
+ this.hoodieBaseFileOption = fileSlice.getBaseFile();
+ this.logFiles =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
this.props = props;
this.start = start;
this.length = length;
this.recordMerger = readerContext.getRecordMerger(
getStringWithAltKeys(props, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue()));
this.readerState.tablePath = tablePath;
this.readerState.latestCommitTime = latestCommitTime;
- this.readerState.baseFileAvroSchema = avroSchema;
- this.readerState.logRecordAvroSchema = avroSchema;
+ this.dataSchema = dataSchema;
+ this.requestedSchema = requestedSchema;
+ this.hoodieTableConfig = tableConfig;
+ this.requiredSchema = generateRequiredSchema();
+ if (!requestedSchema.equals(requiredSchema)) {
+ this.outputConverter =
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+ } else {
+ this.outputConverter = Option.empty();
+ }
+ this.readerState.baseFileAvroSchema = requiredSchema;
+ this.readerState.logRecordAvroSchema = requiredSchema;
this.readerState.mergeProps.putAll(props);
- String filePath = baseFilePath.isPresent()
- ? baseFilePath.get().getPath()
- : logFilePathList.get().get(0);
- String partitionPath = FSUtils.getRelativePartitionPath(
- new Path(tablePath), new Path(filePath).getParent());
- Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
- ? Option.empty() : Option.of(partitionPath);
- Option<Object> partitionConfigValue =
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
- Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
- ?
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
- .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new
String[] {}))
- : Option.empty();
- this.recordBuffer = shouldUseRecordPosition
+ this.recordBuffer = this.logFiles.isEmpty()
+ ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext,
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger,
props)
+ : shouldUseRecordPosition
? new HoodiePositionBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props)
: new HoodieKeyBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props);
+
+
}
/**
* Initialize internal iterators on the base and log files.
*/
public void initRecordIterators() {
- this.baseFileIterator = baseFilePath.isPresent()
- ? readerContext.getFileRecordIterator(
- baseFilePath.get().getHadoopPath(), start, length,
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
- : new EmptyIterator<>();
+ this.baseFileIterator = makeBaseFileIterator();
scanLogFiles();
recordBuffer.setBaseFileIterator(baseFileIterator);
}
+ private ClosableIterator<T> makeBaseFileIterator() {
+ if (!hoodieBaseFileOption.isPresent()) {
+ return new EmptyIterator<>();
+ }
+
+ HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ return makeBootstrapBaseFileIterator(baseFile);
+ }
+
+ return readerContext.getFileRecordIterator(baseFile.getHadoopPath(),
start, length,
+ dataSchema, requiredSchema, hadoopConf);
+ }
+
+ private Schema generateRequiredSchema() {
+ //might need to change this if other queries than mor have mandatory fields
+ if (logFiles.isEmpty()) {
+ return requestedSchema;
+ }
+
+ //MergeOnReadSnapshotRelation.isProjectionCompatible. Should centralize
the logic
+ if
(!hoodieTableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
{
+ return dataSchema;
+ }
+
+ List<Schema.Field> addedFields = new ArrayList<>();
+ for (String field :
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+ if (requestedSchema.getField(field) == null) {
+ Schema.Field foundField = dataSchema.getField(field);
+ if (foundField == null) {
+ throw new IllegalArgumentException("Field: " + field + " does not
exist in the table schema");
+ }
+ addedFields.add(new Schema.Field(foundField.name(),
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+ }
+ }
+
+ if (addedFields.isEmpty()) {
+ return maybeReorderForBootstrap(requestedSchema);
+ }
+
+ return
maybeReorderForBootstrap(AvroSchemaUtils.appendFieldsToSchema(requestedSchema,
addedFields));
+ }
+
+ private Schema maybeReorderForBootstrap(Schema input) {
+ if (this.hoodieBaseFileOption.isPresent() &&
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+ Pair<List<Schema.Field>,List<Schema.Field>> requiredFields =
getDataAndMetaCols(input);
+ if (!(requiredFields.getLeft().isEmpty() ||
requiredFields.getRight().isEmpty())) {
+ return
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(),
requiredFields.getRight().stream())
+ .collect(Collectors.toList()));
+ }
+ }
+ return input;
+ }
+
+ private static Pair<List<Schema.Field>,List<Schema.Field>>
getDataAndMetaCols(Schema schema) {
+ Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
+ .collect(Collectors.partitioningBy(f ->
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
+ return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
+ fieldsByMeta.getOrDefault(false, Collections.emptyList()));
+ }
+
+ private Schema createSchemaFromFields(List<Schema.Field> fields) {
Review Comment:
@jonvex could you follow up on this separately if not done in this PR?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,119 +84,213 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private ClosableIterator<T> baseFileIterator;
private HoodieRecordMerger recordMerger;
- public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
- HoodieTableMetaClient metaClient,
- String fileGroupId,
- TypedProperties props,
- HoodieTimeline timeline,
- HoodieTableQueryType queryType,
- Option<String> instantTime,
- Option<String> startInstantTime,
- boolean shouldUseRecordPosition) throws
Exception {
- // This constructor is a placeholder now to allow automatically fetching
the correct list of
- // base and log files for a file group.
- // Derive base and log files and call the corresponding constructor.
- this(readerContext, metaClient.getHadoopConf(),
metaClient.getBasePathV2().toString(),
- instantTime.get(), Option.empty(), Option.empty(),
- new TableSchemaResolver(metaClient).getTableAvroSchema(),
- props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
- }
+ private final Schema dataSchema;
+
+ // requestedSchema: the schema that the caller requests
+ private final Schema requestedSchema;
+
+ // requiredSchema: the requestedSchema with any additional columns required
for merging etc
+ private final Schema requiredSchema;
+
+ private final HoodieTableConfig hoodieTableConfig;
+
+ private final Option<UnaryOperator<T>> outputConverter;
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
Configuration hadoopConf,
String tablePath,
String latestCommitTime,
- Option<HoodieBaseFile> baseFilePath,
- Option<List<String>> logFilePathList,
- Schema avroSchema,
+ FileSlice fileSlice,
+ Schema dataSchema,
+ Schema requestedSchema,
TypedProperties props,
+ HoodieTableConfig tableConfig,
long start,
long length,
boolean shouldUseRecordPosition) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
- this.baseFilePath = baseFilePath;
- this.logFilePathList = logFilePathList;
+ this.hoodieBaseFileOption = fileSlice.getBaseFile();
+ this.logFiles =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
this.props = props;
this.start = start;
this.length = length;
- this.recordMerger = readerContext.getRecordMerger(
- getStringWithAltKeys(props, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue()));
+ this.recordMerger =
readerContext.getRecordMerger(tableConfig.getRecordMergerStrategy());
this.readerState.tablePath = tablePath;
this.readerState.latestCommitTime = latestCommitTime;
- this.readerState.baseFileAvroSchema = avroSchema;
- this.readerState.logRecordAvroSchema = avroSchema;
+ this.dataSchema = dataSchema;
+ this.requestedSchema = requestedSchema;
+ this.hoodieTableConfig = tableConfig;
+ this.requiredSchema = generateRequiredSchema();
+ if (!requestedSchema.equals(requiredSchema)) {
+ this.outputConverter =
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+ } else {
+ this.outputConverter = Option.empty();
+ }
+ this.readerState.baseFileAvroSchema = requiredSchema;
+ this.readerState.logRecordAvroSchema = requiredSchema;
this.readerState.mergeProps.putAll(props);
- String filePath = baseFilePath.isPresent()
- ? baseFilePath.get().getPath()
- : logFilePathList.get().get(0);
- String partitionPath = FSUtils.getRelativePartitionPath(
- new Path(tablePath), new Path(filePath).getParent());
- Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
- ? Option.empty() : Option.of(partitionPath);
- Option<Object> partitionConfigValue =
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
- Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
- ?
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
- .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new
String[] {}))
- : Option.empty();
- this.recordBuffer = shouldUseRecordPosition
+ this.recordBuffer = this.logFiles.isEmpty()
+ ? null
+ : shouldUseRecordPosition
? new HoodiePositionBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props)
: new HoodieKeyBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props);
+
+
}
/**
* Initialize internal iterators on the base and log files.
*/
public void initRecordIterators() {
- this.baseFileIterator = baseFilePath.isPresent()
- ? readerContext.getFileRecordIterator(
- baseFilePath.get().getHadoopPath(), start, length,
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
- : new EmptyIterator<>();
- scanLogFiles();
- recordBuffer.setBaseFileIterator(baseFileIterator);
+ ClosableIterator<T> iter = makeBaseFileIterator();
+ if (logFiles.isEmpty()) {
+ this.baseFileIterator = CachingIterator.wrap(iter, readerContext);
+ } else {
+ this.baseFileIterator = iter;
+ scanLogFiles();
+ recordBuffer.setBaseFileIterator(baseFileIterator);
+ }
+ }
+
+ private ClosableIterator<T> makeBaseFileIterator() {
+ if (!hoodieBaseFileOption.isPresent()) {
+ return new EmptyIterator<>();
+ }
+
+ HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ return makeBootstrapBaseFileIterator(baseFile);
+ }
+
+ return readerContext.getFileRecordIterator(baseFile.getHadoopPath(),
start, length,
+ dataSchema, requiredSchema, hadoopConf);
+ }
+
+ private Schema generateRequiredSchema() {
+ //might need to change this if other queries than mor have mandatory fields
+ if (logFiles.isEmpty()) {
+ return requestedSchema;
+ }
+
+ List<Schema.Field> addedFields = new ArrayList<>();
+ for (String field :
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+ if (requestedSchema.getField(field) == null) {
+ Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema,
field);
+ if (!foundFieldOpt.isPresent()) {
+ throw new IllegalArgumentException("Field: " + field + " does not
exist in the table schema");
+ }
+ Schema.Field foundField = foundFieldOpt.get();
+ addedFields.add(foundField);
+ }
+ }
+
+ if (addedFields.isEmpty()) {
+ return maybeReorderForBootstrap(requestedSchema);
+ }
+
+ return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema,
addedFields));
+ }
+
+ private Schema maybeReorderForBootstrap(Schema input) {
+ if (this.hoodieBaseFileOption.isPresent() &&
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+ Pair<List<Schema.Field>,List<Schema.Field>> requiredFields =
getDataAndMetaCols(input);
+ if (!(requiredFields.getLeft().isEmpty() ||
requiredFields.getRight().isEmpty())) {
+ return
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(),
requiredFields.getRight().stream())
+ .collect(Collectors.toList()));
+ }
+ }
+ return input;
+ }
+
+ private static Pair<List<Schema.Field>,List<Schema.Field>>
getDataAndMetaCols(Schema schema) {
+ Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
+ .collect(Collectors.partitioningBy(f ->
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
+ return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
+ fieldsByMeta.getOrDefault(false, Collections.emptyList()));
+ }
+
+ private Schema createSchemaFromFields(List<Schema.Field> fields) {
+ //fields have positions set, so we need to remove them due to avro
setFields implementation
+ for (int i = 0; i < fields.size(); i++) {
+ Schema.Field curr = fields.get(i);
+ fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(),
curr.defaultVal()));
+ }
+ Schema newSchema = Schema.createRecord(dataSchema.getName(),
dataSchema.getDoc(), dataSchema.getNamespace(), dataSchema.isError());
+ newSchema.setFields(fields);
+ return newSchema;
+ }
+
+ private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile
baseFile) {
+ BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
+ Pair<List<Schema.Field>,List<Schema.Field>> requiredFields =
getDataAndMetaCols(requiredSchema);
+ Pair<List<Schema.Field>,List<Schema.Field>> allFields =
getDataAndMetaCols(dataSchema);
+
+ Option<ClosableIterator<T>> dataFileIterator =
requiredFields.getRight().isEmpty() ? Option.empty() :
+
Option.of(readerContext.getFileRecordIterator(dataFile.getHadoopPath(), 0,
dataFile.getFileLen(),
+ createSchemaFromFields(allFields.getRight()),
createSchemaFromFields(requiredFields.getRight()), hadoopConf));
+
+ Option<ClosableIterator<T>> skeletonFileIterator =
requiredFields.getLeft().isEmpty() ? Option.empty() :
+
Option.of(readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 0,
baseFile.getFileLen(),
+ createSchemaFromFields(allFields.getLeft()),
createSchemaFromFields(requiredFields.getLeft()), hadoopConf));
+ if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) {
+ throw new IllegalStateException("should not be here if only partition
cols are required");
+ } else if (!dataFileIterator.isPresent()) {
+ return skeletonFileIterator.get();
+ } else if (!skeletonFileIterator.isPresent()) {
+ return dataFileIterator.get();
+ } else {
+ return readerContext.mergeBootstrapReaders(skeletonFileIterator.get(),
dataFileIterator.get());
+ }
}
/**
* @return {@code true} if the next record exists; {@code false} otherwise.
* @throws IOException on reader error.
*/
public boolean hasNext() throws IOException {
- return recordBuffer.hasNext();
+ if (recordBuffer == null) {
+ return baseFileIterator.hasNext();
Review Comment:
We should revisit whether this can be directly put into the record buffer.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +86,170 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private ClosableIterator<T> baseFileIterator;
private HoodieRecordMerger recordMerger;
- public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
- HoodieTableMetaClient metaClient,
- String fileGroupId,
- TypedProperties props,
- HoodieTimeline timeline,
- HoodieTableQueryType queryType,
- Option<String> instantTime,
- Option<String> startInstantTime,
- boolean shouldUseRecordPosition) throws
Exception {
- // This constructor is a placeholder now to allow automatically fetching
the correct list of
- // base and log files for a file group.
- // Derive base and log files and call the corresponding constructor.
- this(readerContext, metaClient.getHadoopConf(),
metaClient.getBasePathV2().toString(),
- instantTime.get(), Option.empty(), Option.empty(),
- new TableSchemaResolver(metaClient).getTableAvroSchema(),
- props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
- }
+ private final Schema dataSchema;
+ private final Schema requestedSchema;
+
+ private final Schema requiredSchema;
+
+ private final HoodieTableConfig hoodieTableConfig;
+
+ private final Option<UnaryOperator<T>> outputConverter;
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
Configuration hadoopConf,
String tablePath,
String latestCommitTime,
- Option<HoodieBaseFile> baseFilePath,
- Option<List<String>> logFilePathList,
- Schema avroSchema,
+ FileSlice fileSlice,
+ Schema dataSchema,
+ Schema requestedSchema,
TypedProperties props,
+ HoodieTableConfig tableConfig,
long start,
long length,
boolean shouldUseRecordPosition) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
- this.baseFilePath = baseFilePath;
- this.logFilePathList = logFilePathList;
+ this.hoodieBaseFileOption = fileSlice.getBaseFile();
+ this.logFiles =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
this.props = props;
this.start = start;
this.length = length;
this.recordMerger = readerContext.getRecordMerger(
getStringWithAltKeys(props, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue()));
this.readerState.tablePath = tablePath;
this.readerState.latestCommitTime = latestCommitTime;
- this.readerState.baseFileAvroSchema = avroSchema;
- this.readerState.logRecordAvroSchema = avroSchema;
+ this.dataSchema = dataSchema;
+ this.requestedSchema = requestedSchema;
+ this.hoodieTableConfig = tableConfig;
+ this.requiredSchema = generateRequiredSchema();
+ if (!requestedSchema.equals(requiredSchema)) {
+ this.outputConverter =
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+ } else {
+ this.outputConverter = Option.empty();
+ }
+ this.readerState.baseFileAvroSchema = requiredSchema;
+ this.readerState.logRecordAvroSchema = requiredSchema;
this.readerState.mergeProps.putAll(props);
- String filePath = baseFilePath.isPresent()
- ? baseFilePath.get().getPath()
- : logFilePathList.get().get(0);
- String partitionPath = FSUtils.getRelativePartitionPath(
- new Path(tablePath), new Path(filePath).getParent());
- Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
- ? Option.empty() : Option.of(partitionPath);
- Option<Object> partitionConfigValue =
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
- Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
- ?
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
- .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new
String[] {}))
- : Option.empty();
- this.recordBuffer = shouldUseRecordPosition
+ this.recordBuffer = this.logFiles.isEmpty()
+ ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext,
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger,
props)
+ : shouldUseRecordPosition
? new HoodiePositionBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props)
: new HoodieKeyBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props);
+
+
}
/**
* Initialize internal iterators on the base and log files.
*/
public void initRecordIterators() {
- this.baseFileIterator = baseFilePath.isPresent()
- ? readerContext.getFileRecordIterator(
- baseFilePath.get().getHadoopPath(), start, length,
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
- : new EmptyIterator<>();
+ this.baseFileIterator = makeBaseFileIterator();
scanLogFiles();
recordBuffer.setBaseFileIterator(baseFileIterator);
}
+ private ClosableIterator<T> makeBaseFileIterator() {
+ if (!hoodieBaseFileOption.isPresent()) {
+ return new EmptyIterator<>();
+ }
+
+ HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ return makeBootstrapBaseFileIterator(baseFile);
+ }
+
+ return readerContext.getFileRecordIterator(baseFile.getHadoopPath(),
start, length,
+ dataSchema, requiredSchema, hadoopConf);
+ }
+
+ private Schema generateRequiredSchema() {
+ //might need to change this if other queries than mor have mandatory fields
+ if (logFiles.isEmpty()) {
+ return requestedSchema;
+ }
+
+ if
(!HoodiePayloadProps.isProjectionCompatible(hoodieTableConfig.getPayloadClass()))
{
+ return dataSchema;
+ }
+
+ List<Schema.Field> addedFields = new ArrayList<>();
+ for (String field :
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+ //need to match HoodieFileGroupReaderBasedParquetFileFormat for now
+ //if (!findNestedField(requestedSchema, field).isPresent()) {
+ if (requestedSchema.getField(field) == null) {
+ Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema,
field);
+ if (!foundFieldOpt.isPresent()) {
+ throw new IllegalArgumentException("Field: " + field + " does not
exist in the table schema");
+ }
+ Schema.Field foundField = foundFieldOpt.get();
+ addedFields.add(foundField);
+ }
+ }
+
+ if (addedFields.isEmpty()) {
+ return maybeReorderForBootstrap(requestedSchema);
+ }
+
+ //return
maybeReorderForBootstrap(appendFieldsToSchemaDedupNested(requestedSchema,
addedFields));
+ return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema,
addedFields));
+ }
+
+ private Schema maybeReorderForBootstrap(Schema input) {
+ if (this.hoodieBaseFileOption.isPresent() &&
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+ Pair<List<Schema.Field>,List<Schema.Field>> requiredFields =
getDataAndMetaCols(input);
Review Comment:
@jonvex could you check this?
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -188,4 +189,8 @@ public Map<String, Object>
updateSchemaAndResetOrderingValInMetadata(Map<String,
meta.put(INTERNAL_META_SCHEMA, schema);
return meta;
}
+
+ public abstract ClosableIterator<T>
mergeBootstrapReaders(ClosableIterator<T> skeletonFileIterator,
ClosableIterator<T> dataFileIterator);
+
+ public abstract UnaryOperator<T> projectRecord(Schema from, Schema to);
Review Comment:
docs here
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -62,7 +63,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader:
Option[Partitioned
requiredSchema: Schema,
conf: Configuration):
ClosableIterator[InternalRow] = {
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
- .createPartitionedFile(partitionValues, filePath, start, length)
+ .createPartitionedFile(InternalRow.empty, filePath, start, length)
Review Comment:
@jonvex nit: add a comment/doc in the code to explain why the partition
value is empty.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]