codope commented on code in PR #10137:
URL: https://github.com/apache/hudi/pull/10137#discussion_r1407078231
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -449,7 +449,8 @@ private static void validateRow(InternalRow data,
StructType schema) {
data instanceof HoodieInternalRow
|| data instanceof GenericInternalRow
|| data instanceof SpecificInternalRow
- ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+ ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data))
+ || data instanceof JoinedRow;
Review Comment:
when is the data as `JoinedRow`?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,167 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private ClosableIterator<T> baseFileIterator;
private HoodieRecordMerger recordMerger;
- public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
- HoodieTableMetaClient metaClient,
- String fileGroupId,
- TypedProperties props,
- HoodieTimeline timeline,
- HoodieTableQueryType queryType,
- Option<String> instantTime,
- Option<String> startInstantTime,
- boolean shouldUseRecordPosition) throws
Exception {
- // This constructor is a placeholder now to allow automatically fetching
the correct list of
- // base and log files for a file group.
- // Derive base and log files and call the corresponding constructor.
- this(readerContext, metaClient.getHadoopConf(),
metaClient.getBasePathV2().toString(),
- instantTime.get(), Option.empty(), Option.empty(),
- new TableSchemaResolver(metaClient).getTableAvroSchema(),
- props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
- }
+ private final Schema dataSchema;
+ private final Schema requestedSchema;
+
+ private final Schema requiredSchema;
+
+ private final HoodieTableConfig hoodieTableConfig;
+
+ private final Option<UnaryOperator<T>> outputConverter;
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
Configuration hadoopConf,
String tablePath,
String latestCommitTime,
- Option<HoodieBaseFile> baseFilePath,
- Option<List<String>> logFilePathList,
- Schema avroSchema,
+ FileSlice fileSlice,
+ Schema dataSchema,
+ Schema requestedSchema,
TypedProperties props,
+ HoodieTableConfig tableConfig,
long start,
long length,
boolean shouldUseRecordPosition) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
- this.baseFilePath = baseFilePath;
- this.logFilePathList = logFilePathList;
+ this.hoodieBaseFileOption = fileSlice.getBaseFile();
+ this.logFiles =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
this.props = props;
this.start = start;
this.length = length;
this.recordMerger = readerContext.getRecordMerger(
getStringWithAltKeys(props, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue()));
this.readerState.tablePath = tablePath;
this.readerState.latestCommitTime = latestCommitTime;
- this.readerState.baseFileAvroSchema = avroSchema;
- this.readerState.logRecordAvroSchema = avroSchema;
+ this.dataSchema = dataSchema;
+ this.requestedSchema = requestedSchema;
+ this.hoodieTableConfig = tableConfig;
+ this.requiredSchema = generateRequiredSchema();
+ if (!requestedSchema.equals(requiredSchema)) {
+ this.outputConverter =
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+ } else {
+ this.outputConverter = Option.empty();
+ }
+ this.readerState.baseFileAvroSchema = requiredSchema;
+ this.readerState.logRecordAvroSchema = requiredSchema;
this.readerState.mergeProps.putAll(props);
- String filePath = baseFilePath.isPresent()
- ? baseFilePath.get().getPath()
- : logFilePathList.get().get(0);
- String partitionPath = FSUtils.getRelativePartitionPath(
- new Path(tablePath), new Path(filePath).getParent());
- Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
- ? Option.empty() : Option.of(partitionPath);
- Option<Object> partitionConfigValue =
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
- Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
- ?
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
- .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new
String[] {}))
- : Option.empty();
- this.recordBuffer = shouldUseRecordPosition
+ this.recordBuffer = this.logFiles.isEmpty()
+ ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext,
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger,
props)
+ : shouldUseRecordPosition
? new HoodiePositionBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props)
: new HoodieKeyBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props);
+
+
}
/**
* Initialize internal iterators on the base and log files.
*/
public void initRecordIterators() {
- this.baseFileIterator = baseFilePath.isPresent()
- ? readerContext.getFileRecordIterator(
- baseFilePath.get().getHadoopPath(), start, length,
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
- : new EmptyIterator<>();
+ this.baseFileIterator = makeBaseFileIterator();
scanLogFiles();
recordBuffer.setBaseFileIterator(baseFileIterator);
}
+ private ClosableIterator<T> makeBaseFileIterator() {
+ if (!hoodieBaseFileOption.isPresent()) {
+ return new EmptyIterator<>();
+ }
+
+ HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ return makeBootstrapBaseFileIterator(baseFile);
+ }
+
+ return readerContext.getFileRecordIterator(baseFile.getHadoopPath(),
start, length,
+ dataSchema, requiredSchema, hadoopConf);
+ }
+
+ private Schema generateRequiredSchema() {
+ //might need to change this if other queries than mor have mandatory fields
+ if (logFiles.isEmpty()) {
+ return requestedSchema;
+ }
+
+ //MergeOnReadSnapshotRelation.isProjectionCompatible. Should centralize
the logic
+ if
(!hoodieTableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
{
+ return dataSchema;
+ }
+
+ List<Schema.Field> addedFields = new ArrayList<>();
+ for (String field :
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+ if (requestedSchema.getField(field) == null) {
+ Schema.Field foundField = dataSchema.getField(field);
+ if (foundField == null) {
+ throw new IllegalArgumentException("Field: " + field + " does not
exist in the table schema");
+ }
+ addedFields.add(new Schema.Field(foundField.name(),
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+ }
+ }
+
+ if (addedFields.isEmpty()) {
+ return maybeReorderForBootstrap(requestedSchema);
+ }
+
+ return
maybeReorderForBootstrap(AvroSchemaUtils.appendFieldsToSchema(requestedSchema,
addedFields));
+ }
+
+ private Schema maybeReorderForBootstrap(Schema input) {
+ if (this.hoodieBaseFileOption.isPresent() &&
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+ Pair<List<Schema.Field>,List<Schema.Field>> requiredFields =
getDataAndMetaCols(input);
+ if (!(requiredFields.getLeft().isEmpty() ||
requiredFields.getRight().isEmpty())) {
+ return
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(),
requiredFields.getRight().stream())
+ .collect(Collectors.toList()));
+ }
+ }
+ return input;
+ }
+
+ private static Pair<List<Schema.Field>,List<Schema.Field>>
getDataAndMetaCols(Schema schema) {
Review Comment:
Would prefer to have this method in some util class and tested.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieSimpleFileGroupRecordBuffer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class HoodieSimpleFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
Review Comment:
add javadoc
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -41,10 +40,15 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
import scala.annotation.tailrec
-import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.jdk.CollectionConverters.asScalaIteratorConverter
+trait HoodieFormatTrait {
+
+ //Used so that the planner only projects once and does not stack overflow
Review Comment:
```suggestion
// Used so that the planner only projects once and does not stack overflow
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -67,8 +71,8 @@
*/
public final class HoodieFileGroupReader<T> implements Closeable {
private final HoodieReaderContext<T> readerContext;
- private final Option<HoodieBaseFile> baseFilePath;
- private final Option<List<String>> logFilePathList;
+ private final Option<HoodieBaseFile> hoodieBaseFileOption;
+ private final List<HoodieLogFile> logFiles;
Review Comment:
Why remove `Option`?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -122,6 +125,23 @@ default boolean shouldFlush(HoodieRecord record, Schema
schema, TypedProperties
return true;
}
+ default String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg) {
+ ArrayList<String> requiredFields = new ArrayList<>();
+ if (cfg.populateMetaFields()) {
+ requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ } else {
+ cfg.getRecordKeyFieldStream().forEach(requiredFields::add);
+ }
+ String preCombine = cfg.getPreCombineField();
+
+ //maybe throw exception otherwise
Review Comment:
Should this be left to the caller? There are valid code paths with optional
precombine field.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java:
##########
@@ -147,6 +148,10 @@ public String getFileExtension() {
return fileExtension;
}
+ public boolean isCDC() {
Review Comment:
Should we make it a static method in `FSUtils`?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -62,7 +63,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader:
Option[Partitioned
requiredSchema: Schema,
conf: Configuration):
ClosableIterator[InternalRow] = {
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
- .createPartitionedFile(partitionValues, filePath, start, length)
+ .createPartitionedFile(InternalRow.empty, filePath, start, length)
Review Comment:
Why pass empty? I think `createPartitionedFile` needs the partition values,
isn't it?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -540,6 +541,15 @@ public Option<String[]> getRecordKeyFields() {
}
}
+ public Stream<String> getRecordKeyFieldStream() {
Review Comment:
How about reusing the existing `getRecordKeyFields` method? and then use
stream at the call site.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,167 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private ClosableIterator<T> baseFileIterator;
private HoodieRecordMerger recordMerger;
- public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
- HoodieTableMetaClient metaClient,
- String fileGroupId,
- TypedProperties props,
- HoodieTimeline timeline,
- HoodieTableQueryType queryType,
- Option<String> instantTime,
- Option<String> startInstantTime,
- boolean shouldUseRecordPosition) throws
Exception {
- // This constructor is a placeholder now to allow automatically fetching
the correct list of
- // base and log files for a file group.
- // Derive base and log files and call the corresponding constructor.
- this(readerContext, metaClient.getHadoopConf(),
metaClient.getBasePathV2().toString(),
- instantTime.get(), Option.empty(), Option.empty(),
- new TableSchemaResolver(metaClient).getTableAvroSchema(),
- props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
- }
+ private final Schema dataSchema;
+ private final Schema requestedSchema;
+
+ private final Schema requiredSchema;
+
+ private final HoodieTableConfig hoodieTableConfig;
+
+ private final Option<UnaryOperator<T>> outputConverter;
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
Configuration hadoopConf,
String tablePath,
String latestCommitTime,
- Option<HoodieBaseFile> baseFilePath,
- Option<List<String>> logFilePathList,
- Schema avroSchema,
+ FileSlice fileSlice,
+ Schema dataSchema,
+ Schema requestedSchema,
TypedProperties props,
+ HoodieTableConfig tableConfig,
long start,
long length,
boolean shouldUseRecordPosition) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
- this.baseFilePath = baseFilePath;
- this.logFilePathList = logFilePathList;
+ this.hoodieBaseFileOption = fileSlice.getBaseFile();
+ this.logFiles =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
this.props = props;
this.start = start;
this.length = length;
this.recordMerger = readerContext.getRecordMerger(
getStringWithAltKeys(props, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue()));
this.readerState.tablePath = tablePath;
this.readerState.latestCommitTime = latestCommitTime;
- this.readerState.baseFileAvroSchema = avroSchema;
- this.readerState.logRecordAvroSchema = avroSchema;
+ this.dataSchema = dataSchema;
+ this.requestedSchema = requestedSchema;
+ this.hoodieTableConfig = tableConfig;
+ this.requiredSchema = generateRequiredSchema();
+ if (!requestedSchema.equals(requiredSchema)) {
+ this.outputConverter =
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+ } else {
+ this.outputConverter = Option.empty();
+ }
+ this.readerState.baseFileAvroSchema = requiredSchema;
+ this.readerState.logRecordAvroSchema = requiredSchema;
this.readerState.mergeProps.putAll(props);
- String filePath = baseFilePath.isPresent()
- ? baseFilePath.get().getPath()
- : logFilePathList.get().get(0);
- String partitionPath = FSUtils.getRelativePartitionPath(
- new Path(tablePath), new Path(filePath).getParent());
- Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
- ? Option.empty() : Option.of(partitionPath);
- Option<Object> partitionConfigValue =
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
- Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
- ?
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
- .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new
String[] {}))
- : Option.empty();
- this.recordBuffer = shouldUseRecordPosition
+ this.recordBuffer = this.logFiles.isEmpty()
+ ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext,
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger,
props)
+ : shouldUseRecordPosition
? new HoodiePositionBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props)
: new HoodieKeyBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props);
+
+
}
/**
* Initialize internal iterators on the base and log files.
*/
public void initRecordIterators() {
- this.baseFileIterator = baseFilePath.isPresent()
- ? readerContext.getFileRecordIterator(
- baseFilePath.get().getHadoopPath(), start, length,
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
- : new EmptyIterator<>();
+ this.baseFileIterator = makeBaseFileIterator();
scanLogFiles();
recordBuffer.setBaseFileIterator(baseFileIterator);
}
+ private ClosableIterator<T> makeBaseFileIterator() {
+ if (!hoodieBaseFileOption.isPresent()) {
+ return new EmptyIterator<>();
+ }
+
+ HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ return makeBootstrapBaseFileIterator(baseFile);
+ }
+
+ return readerContext.getFileRecordIterator(baseFile.getHadoopPath(),
start, length,
+ dataSchema, requiredSchema, hadoopConf);
+ }
+
+ private Schema generateRequiredSchema() {
+ //might need to change this if other queries than mor have mandatory fields
+ if (logFiles.isEmpty()) {
+ return requestedSchema;
+ }
+
+ //MergeOnReadSnapshotRelation.isProjectionCompatible. Should centralize
the logic
+ if
(!hoodieTableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
{
+ return dataSchema;
+ }
+
+ List<Schema.Field> addedFields = new ArrayList<>();
+ for (String field :
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+ if (requestedSchema.getField(field) == null) {
+ Schema.Field foundField = dataSchema.getField(field);
+ if (foundField == null) {
+ throw new IllegalArgumentException("Field: " + field + " does not
exist in the table schema");
+ }
+ addedFields.add(new Schema.Field(foundField.name(),
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+ }
+ }
+
+ if (addedFields.isEmpty()) {
+ return maybeReorderForBootstrap(requestedSchema);
+ }
+
+ return
maybeReorderForBootstrap(AvroSchemaUtils.appendFieldsToSchema(requestedSchema,
addedFields));
+ }
+
+ private Schema maybeReorderForBootstrap(Schema input) {
+ if (this.hoodieBaseFileOption.isPresent() &&
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+ Pair<List<Schema.Field>,List<Schema.Field>> requiredFields =
getDataAndMetaCols(input);
+ if (!(requiredFields.getLeft().isEmpty() ||
requiredFields.getRight().isEmpty())) {
+ return
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(),
requiredFields.getRight().stream())
+ .collect(Collectors.toList()));
+ }
+ }
+ return input;
+ }
+
+ private static Pair<List<Schema.Field>,List<Schema.Field>>
getDataAndMetaCols(Schema schema) {
+ Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
+ .collect(Collectors.partitioningBy(f ->
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
+ return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
+ fieldsByMeta.getOrDefault(false, Collections.emptyList()));
+ }
+
+ private Schema createSchemaFromFields(List<Schema.Field> fields) {
Review Comment:
we need to test the path wherever these methods are exercised.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,167 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private ClosableIterator<T> baseFileIterator;
private HoodieRecordMerger recordMerger;
- public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
- HoodieTableMetaClient metaClient,
- String fileGroupId,
- TypedProperties props,
- HoodieTimeline timeline,
- HoodieTableQueryType queryType,
- Option<String> instantTime,
- Option<String> startInstantTime,
- boolean shouldUseRecordPosition) throws
Exception {
- // This constructor is a placeholder now to allow automatically fetching
the correct list of
- // base and log files for a file group.
- // Derive base and log files and call the corresponding constructor.
- this(readerContext, metaClient.getHadoopConf(),
metaClient.getBasePathV2().toString(),
- instantTime.get(), Option.empty(), Option.empty(),
- new TableSchemaResolver(metaClient).getTableAvroSchema(),
- props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
- }
+ private final Schema dataSchema;
+ private final Schema requestedSchema;
+
+ private final Schema requiredSchema;
+
+ private final HoodieTableConfig hoodieTableConfig;
+
+ private final Option<UnaryOperator<T>> outputConverter;
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
Configuration hadoopConf,
String tablePath,
String latestCommitTime,
- Option<HoodieBaseFile> baseFilePath,
- Option<List<String>> logFilePathList,
- Schema avroSchema,
+ FileSlice fileSlice,
+ Schema dataSchema,
+ Schema requestedSchema,
TypedProperties props,
+ HoodieTableConfig tableConfig,
long start,
long length,
boolean shouldUseRecordPosition) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
- this.baseFilePath = baseFilePath;
- this.logFilePathList = logFilePathList;
+ this.hoodieBaseFileOption = fileSlice.getBaseFile();
+ this.logFiles =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
this.props = props;
this.start = start;
this.length = length;
this.recordMerger = readerContext.getRecordMerger(
getStringWithAltKeys(props, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue()));
this.readerState.tablePath = tablePath;
this.readerState.latestCommitTime = latestCommitTime;
- this.readerState.baseFileAvroSchema = avroSchema;
- this.readerState.logRecordAvroSchema = avroSchema;
+ this.dataSchema = dataSchema;
+ this.requestedSchema = requestedSchema;
+ this.hoodieTableConfig = tableConfig;
+ this.requiredSchema = generateRequiredSchema();
+ if (!requestedSchema.equals(requiredSchema)) {
+ this.outputConverter =
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+ } else {
+ this.outputConverter = Option.empty();
+ }
+ this.readerState.baseFileAvroSchema = requiredSchema;
+ this.readerState.logRecordAvroSchema = requiredSchema;
this.readerState.mergeProps.putAll(props);
- String filePath = baseFilePath.isPresent()
- ? baseFilePath.get().getPath()
- : logFilePathList.get().get(0);
- String partitionPath = FSUtils.getRelativePartitionPath(
- new Path(tablePath), new Path(filePath).getParent());
- Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
- ? Option.empty() : Option.of(partitionPath);
- Option<Object> partitionConfigValue =
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
- Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
- ?
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
- .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new
String[] {}))
- : Option.empty();
- this.recordBuffer = shouldUseRecordPosition
+ this.recordBuffer = this.logFiles.isEmpty()
+ ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext,
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger,
props)
+ : shouldUseRecordPosition
? new HoodiePositionBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props)
: new HoodieKeyBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props);
+
+
}
/**
* Initialize internal iterators on the base and log files.
*/
public void initRecordIterators() {
- this.baseFileIterator = baseFilePath.isPresent()
- ? readerContext.getFileRecordIterator(
- baseFilePath.get().getHadoopPath(), start, length,
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
- : new EmptyIterator<>();
+ this.baseFileIterator = makeBaseFileIterator();
scanLogFiles();
recordBuffer.setBaseFileIterator(baseFileIterator);
}
+ private ClosableIterator<T> makeBaseFileIterator() {
+ if (!hoodieBaseFileOption.isPresent()) {
+ return new EmptyIterator<>();
+ }
+
+ HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ return makeBootstrapBaseFileIterator(baseFile);
+ }
+
+ return readerContext.getFileRecordIterator(baseFile.getHadoopPath(),
start, length,
+ dataSchema, requiredSchema, hadoopConf);
+ }
+
+ private Schema generateRequiredSchema() {
+ //might need to change this if other queries than mor have mandatory fields
+ if (logFiles.isEmpty()) {
+ return requestedSchema;
+ }
+
+ //MergeOnReadSnapshotRelation.isProjectionCompatible. Should centralize
the logic
Review Comment:
Please complete the TODOs here or remove it if it is no lponger necessary.
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -158,7 +156,7 @@ private void
validateOutputFromFileGroupReader(Configuration hadoopConf,
SyncableFileSystemView fsView = viewManager.getFileSystemView(metaClient);
FileSlice fileSlice =
fsView.getAllFileSlices(partitionPaths[0]).findFirst().get();
List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
- Collections.sort(logFilePathList);
+ //Collections.sort(logFilePathList);
Review Comment:
Why removing the sorting?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -91,73 +95,69 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
spark.conf.set("spark.sql.parquet.enableVectorizedReader",
supportBatchResult)
val requiredSchemaWithMandatory =
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
+ val isCount = requiredSchemaWithMandatory.isEmpty
val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
val requiredMeta = StructType(requiredSchemaSplits._1)
val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
- val (baseFileReader, preMergeBaseFileReader, _, _) = buildFileReaders(
+ val (baseFileReader, preMergeBaseFileReader, readerMaps) =
buildFileReaders(
spark, dataSchema, partitionSchema, if (isIncremental)
requiredSchemaWithMandatory else requiredSchema,
filters, options, augmentedHadoopConf, requiredSchemaWithMandatory,
requiredWithoutMeta, requiredMeta)
+
+ val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema,
sanitizedTableName)
+ val dataAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName)
+
val broadcastedHadoopConf = spark.sparkContext.broadcast(new
SerializableConfiguration(augmentedHadoopConf))
+ val broadcastedDataSchema = spark.sparkContext.broadcast(dataAvroSchema)
+ val broadcastedRequestedSchema =
spark.sparkContext.broadcast(requestedAvroSchema)
val props: TypedProperties = HoodieFileIndex.getConfigProperties(spark,
options)
(file: PartitionedFile) => {
file.partitionValues match {
// Snapshot or incremental queries.
case fileSliceMapping: HoodiePartitionFileSliceMapping =>
val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
- if (FSUtils.isLogFile(filePath)) {
- val partitionValues = fileSliceMapping.getPartitionValues
- val fileSlice =
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
- buildFileGroupIterator(
- Option.empty[PartitionedFile => Iterator[InternalRow]],
- partitionValues,
- Option.empty[HoodieBaseFile],
- getLogFilesFromSlice(fileSlice),
- requiredSchemaWithMandatory,
- outputSchema,
- partitionSchema,
- broadcastedHadoopConf.value.value,
- -1,
- -1,
- shouldUseRecordPosition
- )
+ val filegroupName = if (FSUtils.isLogFile(filePath)) {
+ FSUtils.getFileId(filePath.getName).substring(1)
} else {
- fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName))
match {
- case Some(fileSlice) =>
+ FSUtils.getFileId(filePath.getName)
+ }
+ fileSliceMapping.getSlice(filegroupName) match {
+ case Some(fileSlice) if !isCount =>
+ if (requiredSchema.isEmpty &&
!fileSlice.getLogFiles.findAny().isPresent) {
val hoodieBaseFile = fileSlice.getBaseFile.get()
- val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
- val partitionValues = fileSliceMapping.getPartitionValues
- val logFiles = getLogFilesFromSlice(fileSlice)
- if (requiredSchemaWithMandatory.isEmpty) {
- val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
- baseFileReader(baseFile)
- } else if (bootstrapFileOpt.isPresent) {
- // TODO: Use FileGroupReader here: HUDI-6942.
- throw new NotImplementedError("Not support reading bootstrap
file")
- } else {
- if (logFiles.isEmpty) {
- throw new IllegalStateException(
- "should not be here since file slice should not have
been broadcasted "
- + "since it has no log or data files")
- }
- buildFileGroupIterator(
- Option(preMergeBaseFileReader),
- partitionValues,
- Option(hoodieBaseFile),
- logFiles,
- requiredSchemaWithMandatory,
- outputSchema,
- partitionSchema,
- broadcastedHadoopConf.value.value,
- 0,
- hoodieBaseFile.getFileLen,
- shouldUseRecordPosition
- )
- }
- // TODO: Use FileGroupReader here: HUDI-6942.
- case _ => baseFileReader(file)
- }
+
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen))
+ } else {
+ val readerContext: HoodieReaderContext[InternalRow] = new
SparkFileFormatInternalRowReaderContext(
+ readerMaps)
+ val serializedHadoopConf = broadcastedHadoopConf.value.value
+ val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
+
.builder().setConf(serializedHadoopConf).setBasePath(tableState.tablePath).build
+ val reader = new HoodieFileGroupReader[InternalRow](
+ readerContext,
+ serializedHadoopConf,
+ tableState.tablePath,
+ tableState.latestCommitTimestamp.get,
Review Comment:
i hope the latestCommitTimestamp is not empty/null. Have we considered that?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieSimpleFileGroupRecordBuffer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class HoodieSimpleFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
+ public HoodieSimpleFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext, Schema readerSchema, Schema baseFileSchema,
+ Option<String>
partitionNameOverrideOpt, Option<String[]> partitionPathFieldOpt,
HoodieRecordMerger recordMerger,
+ TypedProperties payloadProps) {
+ super(readerContext, readerSchema, baseFileSchema,
partitionNameOverrideOpt, partitionPathFieldOpt, recordMerger, payloadProps);
+ }
+
+ @Override
+ protected boolean doHasNext() throws IOException {
+ if (baseFileIterator.hasNext()) {
+ nextRecord = readerContext.seal(baseFileIterator.next());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public BufferType getBufferType() {
+ return BufferType.SIMPLE;
+ }
+
+ @Override
+ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec>
keySpecOpt) throws IOException {
+ throw new IllegalStateException("processDataBlock should never be called
from HoodieSimpleFileGroupRecordBuffer");
Review Comment:
should all these methods to process next data invoke super or throw an
exception?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]