This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 78a84ea7521 [HUDI-7137] Implement Bootstrap for new FG reader (#10137)
78a84ea7521 is described below
commit 78a84ea7521e6295732199b68aca775cd5d79f63
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Dec 7 18:00:07 2023 -0500
[HUDI-7137] Implement Bootstrap for new FG reader (#10137)
---
.../hudi/common/table/log/CachingIterator.java | 41 ----
.../hudi/common/table/log/LogFileIterator.java | 6 +
.../hudi/common/model/HoodieSparkRecord.java | 3 +-
.../hudi/BaseSparkInternalRowReaderContext.java | 11 +-
.../SparkFileFormatInternalRowReaderContext.scala | 76 +++++-
.../apache/spark/sql/HoodieInternalRowUtils.scala | 7 +
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 78 +++++-
.../hudi/common/engine/HoodieReaderContext.java | 21 ++
.../apache/hudi/common/model/HoodieLogFile.java | 5 +
.../hudi/common/model/HoodieRecordMerger.java | 23 ++
.../table/log/BaseHoodieLogRecordReader.java | 2 +-
.../table/log/HoodieMergedLogRecordReader.java | 9 +-
.../common/table/read/HoodieFileGroupReader.java | 263 ++++++++++++++-------
.../common/util/collection/CachingIterator.java | 78 ++++++
.../org/apache/hudi/avro/TestAvroSchemaUtils.java | 72 ++++++
.../table/read/TestHoodieFileGroupReaderBase.java | 13 +-
.../hudi/HoodieHadoopFsRelationFactory.scala | 2 +-
...odieFileGroupReaderBasedParquetFileFormat.scala | 224 ++++++++----------
.../parquet/NewHoodieParquetFileFormat.scala | 7 +-
.../hudi/TestHoodieMergeHandleWithSparkMerger.java | 7 +-
...stHoodiePositionBasedFileGroupRecordBuffer.java | 2 +-
.../functional/TestNewHoodieParquetFileFormat.java | 17 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 32 +--
.../hudi/TestQueryMergeOnReadOptimizedTable.scala | 15 +-
.../procedure/TestHoodieLogFileProcedure.scala | 9 +-
.../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 22 +-
.../spark/sql/HoodieSpark3CatalystPlanUtils.scala | 21 +-
.../spark/sql/HoodieSpark30CatalystPlanUtils.scala | 12 +-
.../spark/sql/HoodieSpark31CatalystPlanUtils.scala | 9 +-
.../spark/sql/HoodieSpark32CatalystPlanUtils.scala | 12 +-
.../spark/sql/HoodieSpark33CatalystPlanUtils.scala | 12 +-
.../spark/sql/HoodieSpark34CatalystPlanUtils.scala | 12 +-
.../spark/sql/HoodieSpark35CatalystPlanUtils.scala | 12 +-
33 files changed, 765 insertions(+), 370 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java
deleted file mode 100644
index d022b92ae22..00000000000
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.table.log;
-
-import java.util.Iterator;
-
-public abstract class CachingIterator<T> implements Iterator<T> {
-
- protected T nextRecord;
-
- protected abstract boolean doHasNext();
-
- @Override
- public final boolean hasNext() {
- return nextRecord != null || doHasNext();
- }
-
- @Override
- public final T next() {
- T record = nextRecord;
- nextRecord = null;
- return record;
- }
-
-}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java
index bf55a6ba06e..7331f7d3a9b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.CachingIterator;
import java.util.Iterator;
import java.util.Map;
@@ -54,4 +55,9 @@ public class LogFileIterator<T> extends
CachingIterator<HoodieRecord<T>> {
protected boolean doHasNext() {
return hasNextInternal();
}
+
+ @Override
+ public void close() {
+ scanner.close();
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 5cb8800411c..334a1e9ddc3 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -449,7 +449,8 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
data instanceof HoodieInternalRow
|| data instanceof GenericInternalRow
|| data instanceof SpecificInternalRow
- ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+ ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data))
+ || data instanceof JoinedRow;
ValidationUtils.checkState(isValid);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 5360535620f..a4d14d1eb4a 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -37,12 +37,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.types.StructType;
import java.util.Map;
+import java.util.function.UnaryOperator;
import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static org.apache.spark.sql.HoodieInternalRowUtils.getCachedSchema;
/**
* An abstract class implementing {@link HoodieReaderContext} to handle {@link
InternalRow}s.
@@ -113,7 +116,7 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
}
private Object getFieldValueFromInternalRow(InternalRow row, Schema
recordSchema, String fieldName) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType structType = getCachedSchema(recordSchema);
scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
if (cachedNestedFieldPath.isDefined()) {
@@ -123,4 +126,10 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
return null;
}
}
+
+ @Override
+ public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to) {
+ UnsafeProjection projection =
HoodieInternalRowUtils.generateUnsafeProjectionAlias(getCachedSchema(from),
getCachedSchema(to));
+ return projection::apply;
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index beca8852686..963035caf21 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -25,16 +25,18 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{ClosableIterator,
CloseableMappingIterator}
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory,
HoodieSparkParquetReader}
import org.apache.hudi.util.CloseableInternalRowIterator
-import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection,
UnsafeRow}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+import org.apache.spark.sql.HoodieInternalRowUtils
import scala.collection.mutable
@@ -44,13 +46,11 @@ import scala.collection.mutable
*
* This uses Spark parquet reader to read parquet data files or parquet log
blocks.
*
- * @param baseFileReader A reader that transforms a {@link PartitionedFile}
to an iterator of
- * {@link InternalRow}. This is required for reading
the base file and
- * not required for reading a file group with only log
files.
- * @param partitionValues The values for a partition in which the file group
lives.
+ * @param readermaps our intention is to build the reader inside of
getFileRecordIterator, but since it is called from
+ * the executor, we will need to port a bunch of the code
from ParquetFileFormat for each spark version
+ * for now, we pass in a map of the different readers we
expect to create
*/
-class SparkFileFormatInternalRowReaderContext(baseFileReader:
Option[PartitionedFile => Iterator[InternalRow]],
- partitionValues: InternalRow)
extends BaseSparkInternalRowReaderContext {
+class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long,
PartitionedFile => Iterator[InternalRow]]) extends
BaseSparkInternalRowReaderContext {
lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
lazy val sparkFileReaderFactory = new HoodieSparkFileReaderFactory
val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] =
mutable.Map()
@@ -61,8 +61,10 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
dataSchema: Schema,
requiredSchema: Schema,
conf: Configuration):
ClosableIterator[InternalRow] = {
+ // partition value is empty because the spark parquet reader will append
the partition columns to
+ // each row if they are given. That is the only usage of the partition
values in the reader.
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
- .createPartitionedFile(partitionValues, filePath, start, length)
+ .createPartitionedFile(InternalRow.empty, filePath, start, length)
if (FSUtils.isLogFile(filePath)) {
val structType: StructType =
HoodieInternalRowUtils.getCachedSchema(requiredSchema)
val projection: UnsafeProjection =
HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
@@ -77,14 +79,18 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
}
}).asInstanceOf[ClosableIterator[InternalRow]]
} else {
- if (baseFileReader.isEmpty) {
- throw new IllegalArgumentException("Base file reader is missing when
instantiating "
- + "SparkFileFormatInternalRowReaderContext.");
+ val schemaPairHashKey = generateSchemaPairHashKey(dataSchema,
requiredSchema)
+ if (!readerMaps.contains(schemaPairHashKey)) {
+ throw new IllegalStateException("schemas don't hash to a known reader")
}
- new CloseableInternalRowIterator(baseFileReader.get.apply(fileInfo))
+ new
CloseableInternalRowIterator(readerMaps(schemaPairHashKey).apply(fileInfo))
}
}
+ private def generateSchemaPairHashKey(dataSchema: Schema, requestedSchema:
Schema): Long = {
+ dataSchema.hashCode() + requestedSchema.hashCode()
+ }
+
/**
* Converts an Avro record, e.g., serialized in the log files, to an
[[InternalRow]].
*
@@ -99,4 +105,48 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
})
deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
}
+
+ override def mergeBootstrapReaders(skeletonFileIterator:
ClosableIterator[InternalRow],
+ dataFileIterator:
ClosableIterator[InternalRow]): ClosableIterator[InternalRow] = {
+ doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
+ dataFileIterator.asInstanceOf[ClosableIterator[Any]])
+ }
+
+ protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = {
+ new ClosableIterator[Any] {
+ val combinedRow = new JoinedRow()
+
+ override def hasNext: Boolean = {
+ //If the iterators are out of sync it is probably due to filter
pushdown
+ checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+ "Bootstrap data-file iterator and skeleton-file iterator have to be
in-sync!")
+ dataFileIterator.hasNext && skeletonFileIterator.hasNext
+ }
+
+ override def next(): Any = {
+ (skeletonFileIterator.next(), dataFileIterator.next()) match {
+ case (s: ColumnarBatch, d: ColumnarBatch) =>
+ val numCols = s.numCols() + d.numCols()
+ val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
+ for (i <- 0 until numCols) {
+ if (i < s.numCols()) {
+ vecs(i) = s.column(i)
+ } else {
+ vecs(i) = d.column(i - s.numCols())
+ }
+ }
+ assert(s.numRows() == d.numRows())
+ sparkAdapter.makeColumnarBatch(vecs, s.numRows())
+ case (_: ColumnarBatch, _: InternalRow) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
+ case (_: InternalRow, _: ColumnarBatch) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
+ case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+ }
+ }
+
+ override def close(): Unit = {
+ skeletonFileIterator.close()
+ dataFileIterator.close()
+ }
+ }.asInstanceOf[ClosableIterator[InternalRow]]
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index d5831be7d91..afeb9969c5f 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -73,6 +73,13 @@ object HoodieInternalRowUtils {
.getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
}
+ /**
+ * due to scala2.11 and HoodieCatalystExpressionUtils is both an object and
trait,
+ * we can't directly call generateUnsafeProjection from java code
+ */
+ def generateUnsafeProjectionAlias(from: StructType, to: StructType):
UnsafeProjection = {
+ generateUnsafeProjection(from, to)
+ }
/**
* Provides cached instance of [[UnsafeRowWriter]] transforming provided
[[InternalRow]]s from
* one [[StructType]] and into another [[StructType]]
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 3c5486c47c7..a8a72e77717 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -18,12 +18,14 @@
package org.apache.hudi.avro;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -218,6 +220,65 @@ public class AvroSchemaUtils {
return atomicTypeEqualityPredicate.apply(sourceSchema, targetSchema);
}
+ public static Option<Schema.Field> findNestedField(Schema schema, String
fieldName) {
+ return findNestedField(schema, fieldName.split("\\."), 0);
+ }
+
+ private static Option<Schema.Field> findNestedField(Schema schema, String[]
fieldParts, int index) {
+ if (schema.getType().equals(Schema.Type.UNION)) {
+ Option<Schema.Field> notUnion =
findNestedField(resolveNullableSchema(schema), fieldParts, index);
+ if (!notUnion.isPresent()) {
+ return Option.empty();
+ }
+ Schema.Field nu = notUnion.get();
+ return Option.of(new Schema.Field(nu.name(), nu.schema(), nu.doc(),
nu.defaultVal()));
+ }
+ if (fieldParts.length <= index) {
+ return Option.empty();
+ }
+
+ Schema.Field foundField = schema.getField(fieldParts[index]);
+ if (foundField == null) {
+ return Option.empty();
+ }
+
+ if (index == fieldParts.length - 1) {
+ return Option.of(new Schema.Field(foundField.name(),
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+ }
+
+ Schema foundSchema = foundField.schema();
+ Option<Schema.Field> nestedPart = findNestedField(foundSchema, fieldParts,
index + 1);
+ if (!nestedPart.isPresent()) {
+ return Option.empty();
+ }
+ return nestedPart;
+ }
+
+ public static Schema appendFieldsToSchemaDedupNested(Schema schema,
List<Schema.Field> newFields) {
+ return appendFieldsToSchemaBase(schema, newFields, true);
+ }
+
+ public static Schema mergeSchemas(Schema a, Schema b) {
+ if (!a.getType().equals(Schema.Type.RECORD)) {
+ return a;
+ }
+ List<Schema.Field> fields = new ArrayList<>();
+ for (Schema.Field f : a.getFields()) {
+ Schema.Field foundField = b.getField(f.name());
+ fields.add(new Schema.Field(f.name(), foundField == null ? f.schema() :
mergeSchemas(f.schema(), foundField.schema()),
+ f.doc(), f.defaultVal()));
+ }
+ for (Schema.Field f : b.getFields()) {
+ if (a.getField(f.name()) == null) {
+ fields.add(new Schema.Field(f.name(), f.schema(), f.doc(),
f.defaultVal()));
+ }
+ }
+
+ Schema newSchema = Schema.createRecord(a.getName(), a.getDoc(),
a.getNamespace(), a.isError());
+ newSchema.setFields(fields);
+ return newSchema;
+ }
+
/**
* Appends provided new fields at the end of the given schema
*
@@ -225,10 +286,25 @@ public class AvroSchemaUtils {
* of the source schema as is
*/
public static Schema appendFieldsToSchema(Schema schema, List<Schema.Field>
newFields) {
+ return appendFieldsToSchemaBase(schema, newFields, false);
+ }
+
+ private static Schema appendFieldsToSchemaBase(Schema schema,
List<Schema.Field> newFields, boolean dedupNested) {
List<Schema.Field> fields = schema.getFields().stream()
.map(field -> new Schema.Field(field.name(), field.schema(),
field.doc(), field.defaultVal()))
.collect(Collectors.toList());
- fields.addAll(newFields);
+ if (dedupNested) {
+ for (Schema.Field f : newFields) {
+ Schema.Field foundField = schema.getField(f.name());
+ if (foundField != null) {
+ fields.set(foundField.pos(), new Schema.Field(foundField.name(),
mergeSchemas(foundField.schema(), f.schema()), foundField.doc(),
foundField.defaultVal()));
+ } else {
+ fields.add(f);
+ }
+ }
+ } else {
+ fields.addAll(newFields);
+ }
Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(),
schema.getNamespace(), schema.isError());
newSchema.setFields(fields);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index b24a4109e75..7b8b2888983 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.UnaryOperator;
/**
* An abstract reader context class for {@code HoodieFileGroupReader} to use,
containing APIs for
@@ -188,4 +189,24 @@ public abstract class HoodieReaderContext<T> {
meta.put(INTERNAL_META_SCHEMA, schema);
return meta;
}
+
+ /**
+ * Merge the skeleton file and data file iterators into a single iterator
that will produce rows that contain all columns from the
+ * skeleton file iterator, followed by all columns in the data file iterator
+ *
+ * @param skeletonFileIterator iterator over bootstrap skeleton files that
contain hudi metadata columns
+ * @param dataFileIterator iterator over data files that were bootstrapped
into the hudi table
+ * @return iterator that concatenates the skeletonFileIterator and
dataFileIterator
+ */
+ public abstract ClosableIterator<T>
mergeBootstrapReaders(ClosableIterator<T> skeletonFileIterator,
ClosableIterator<T> dataFileIterator);
+
+ /**
+ * Creates a function that will reorder records of schema "from" to schema
of "to"
+ * all fields in "to" must be in "from", but not all fields in "from" must
be in "to"
+ *
+ * @param from the schema of records to be passed into UnaryOperator
+ * @param to the schema of records produced by UnaryOperator
+ * @return a function that takes in a record and returns the record with
reordered columns
+ */
+ public abstract UnaryOperator<T> projectRecord(Schema from, Schema to);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
index d35211b7970..c1dbefb57c3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.model;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.hadoop.CachingPath;
@@ -147,6 +148,10 @@ public class HoodieLogFile implements Serializable {
return fileExtension;
}
+ public boolean isCDC() {
+ return getSuffix().equals(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
+ }
+
public String getSuffix() {
if (suffix == null) {
parseFieldsFromPath();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 7a259d0bd9a..cdd41242ef1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -22,13 +22,17 @@ import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
/**
* HoodieMerge defines how to merge two records. It is a stateless component.
@@ -122,6 +126,25 @@ public interface HoodieRecordMerger extends Serializable {
return true;
}
+ default String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg) {
+ ArrayList<String> requiredFields = new ArrayList<>();
+
+ if (cfg.populateMetaFields()) {
+ requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ } else {
+ Option<String[]> fields = cfg.getRecordKeyFields();
+ if (fields.isPresent()) {
+ requiredFields.addAll(Arrays.asList(fields.get()));
+ }
+ }
+
+ String preCombine = cfg.getPreCombineField();
+ if (!StringUtils.isNullOrEmpty(preCombine)) {
+ requiredFields.add(preCombine);
+ }
+ return requiredFields.toArray(new String[0]);
+ }
+
/**
* The record type handled by the current merger.
* SPARK, AVRO, FLINK
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 3f2ceefaf58..2c62fcd70cb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -843,7 +843,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
public abstract Builder withBasePath(String basePath);
- public abstract Builder withLogFilePaths(List<String> logFilePaths);
+ public abstract Builder withLogFiles(List<HoodieLogFile> hoodieLogFiles);
public abstract Builder withReaderSchema(Schema schema);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 6f417be072b..44c4c973eae 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -20,10 +20,10 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -259,9 +259,10 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
}
@Override
- public Builder<T> withLogFilePaths(List<String> logFilePaths) {
- this.logFilePaths = logFilePaths.stream()
- .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+ public Builder<T> withLogFiles(List<HoodieLogFile> hoodieLogFiles) {
+ this.logFilePaths = hoodieLogFiles.stream()
+ .filter(l -> !l.isCDC())
+ .map(l -> l.getPath().toString())
.collect(Collectors.toList());
return this;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 2850a77d709..8413ef8a5e2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -23,19 +23,19 @@ import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.model.HoodieTableQueryType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
@@ -45,16 +45,19 @@ import org.apache.hadoop.fs.Path;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
-import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
-import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
-import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
/**
* A file group reader that iterates through the records in a single file
group.
@@ -67,8 +70,8 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
*/
public final class HoodieFileGroupReader<T> implements Closeable {
private final HoodieReaderContext<T> readerContext;
- private final Option<HoodieBaseFile> baseFilePath;
- private final Option<List<String>> logFilePathList;
+ private final Option<HoodieBaseFile> hoodieBaseFileOption;
+ private final List<HoodieLogFile> logFiles;
private final Configuration hadoopConf;
private final TypedProperties props;
// Byte offset to start reading from the base file
@@ -81,80 +84,168 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private ClosableIterator<T> baseFileIterator;
private HoodieRecordMerger recordMerger;
- public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
- HoodieTableMetaClient metaClient,
- String fileGroupId,
- TypedProperties props,
- HoodieTimeline timeline,
- HoodieTableQueryType queryType,
- Option<String> instantTime,
- Option<String> startInstantTime,
- boolean shouldUseRecordPosition) throws
Exception {
- // This constructor is a placeholder now to allow automatically fetching
the correct list of
- // base and log files for a file group.
- // Derive base and log files and call the corresponding constructor.
- this(readerContext, metaClient.getHadoopConf(),
metaClient.getBasePathV2().toString(),
- instantTime.get(), Option.empty(), Option.empty(),
- new TableSchemaResolver(metaClient).getTableAvroSchema(),
- props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
- }
+ private final Schema dataSchema;
+
+ // requestedSchema: the schema that the caller requests
+ private final Schema requestedSchema;
+
+ // requiredSchema: the requestedSchema with any additional columns required
for merging etc
+ private final Schema requiredSchema;
+
+ private final HoodieTableConfig hoodieTableConfig;
+
+ private final Option<UnaryOperator<T>> outputConverter;
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
Configuration hadoopConf,
String tablePath,
String latestCommitTime,
- Option<HoodieBaseFile> baseFilePath,
- Option<List<String>> logFilePathList,
- Schema avroSchema,
+ FileSlice fileSlice,
+ Schema dataSchema,
+ Schema requestedSchema,
TypedProperties props,
+ HoodieTableConfig tableConfig,
long start,
long length,
boolean shouldUseRecordPosition) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
- this.baseFilePath = baseFilePath;
- this.logFilePathList = logFilePathList;
+ this.hoodieBaseFileOption = fileSlice.getBaseFile();
+ this.logFiles =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
this.props = props;
this.start = start;
this.length = length;
- this.recordMerger = readerContext.getRecordMerger(
- getStringWithAltKeys(props, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue()));
+ this.recordMerger =
readerContext.getRecordMerger(tableConfig.getRecordMergerStrategy());
this.readerState.tablePath = tablePath;
this.readerState.latestCommitTime = latestCommitTime;
- this.readerState.baseFileAvroSchema = avroSchema;
- this.readerState.logRecordAvroSchema = avroSchema;
+ this.dataSchema = dataSchema;
+ this.requestedSchema = requestedSchema;
+ this.hoodieTableConfig = tableConfig;
+ this.requiredSchema = generateRequiredSchema();
+ if (!requestedSchema.equals(requiredSchema)) {
+ this.outputConverter =
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+ } else {
+ this.outputConverter = Option.empty();
+ }
+ this.readerState.baseFileAvroSchema = requiredSchema;
+ this.readerState.logRecordAvroSchema = requiredSchema;
this.readerState.mergeProps.putAll(props);
- String filePath = baseFilePath.isPresent()
- ? baseFilePath.get().getPath()
- : logFilePathList.get().get(0);
- String partitionPath = FSUtils.getRelativePartitionPath(
- new Path(tablePath), new Path(filePath).getParent());
- Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
- ? Option.empty() : Option.of(partitionPath);
- Option<Object> partitionConfigValue =
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
- Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
- ?
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
- .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new
String[] {}))
- : Option.empty();
- this.recordBuffer = shouldUseRecordPosition
+ this.recordBuffer = this.logFiles.isEmpty()
+ ? null
+ : shouldUseRecordPosition
? new HoodiePositionBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props)
: new HoodieKeyBasedFileGroupRecordBuffer<>(
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props);
+
+
}
/**
* Initialize internal iterators on the base and log files.
*/
public void initRecordIterators() {
- this.baseFileIterator = baseFilePath.isPresent()
- ? readerContext.getFileRecordIterator(
- baseFilePath.get().getHadoopPath(), start, length,
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
- : new EmptyIterator<>();
- scanLogFiles();
- recordBuffer.setBaseFileIterator(baseFileIterator);
+ ClosableIterator<T> iter = makeBaseFileIterator();
+ if (logFiles.isEmpty()) {
+ this.baseFileIterator = CachingIterator.wrap(iter, readerContext);
+ } else {
+ this.baseFileIterator = iter;
+ scanLogFiles();
+ recordBuffer.setBaseFileIterator(baseFileIterator);
+ }
+ }
+
+ private ClosableIterator<T> makeBaseFileIterator() {
+ if (!hoodieBaseFileOption.isPresent()) {
+ return new EmptyIterator<>();
+ }
+
+ HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ return makeBootstrapBaseFileIterator(baseFile);
+ }
+
+ return readerContext.getFileRecordIterator(baseFile.getHadoopPath(),
start, length,
+ dataSchema, requiredSchema, hadoopConf);
+ }
+
+ private Schema generateRequiredSchema() {
+ //might need to change this if other queries than mor have mandatory fields
+ if (logFiles.isEmpty()) {
+ return requestedSchema;
+ }
+
+ List<Schema.Field> addedFields = new ArrayList<>();
+ for (String field :
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+ if (requestedSchema.getField(field) == null) {
+ Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema,
field);
+ if (!foundFieldOpt.isPresent()) {
+ throw new IllegalArgumentException("Field: " + field + " does not
exist in the table schema");
+ }
+ Schema.Field foundField = foundFieldOpt.get();
+ addedFields.add(foundField);
+ }
+ }
+
+ if (addedFields.isEmpty()) {
+ return maybeReorderForBootstrap(requestedSchema);
+ }
+
+ return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema,
addedFields));
+ }
+
+ private Schema maybeReorderForBootstrap(Schema input) {
+ if (this.hoodieBaseFileOption.isPresent() &&
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+ Pair<List<Schema.Field>, List<Schema.Field>> requiredFields =
getDataAndMetaCols(input);
+ if (!(requiredFields.getLeft().isEmpty() ||
requiredFields.getRight().isEmpty())) {
+ return
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(),
requiredFields.getRight().stream())
+ .collect(Collectors.toList()));
+ }
+ }
+ return input;
+ }
+
+ private static Pair<List<Schema.Field>, List<Schema.Field>>
getDataAndMetaCols(Schema schema) {
+ Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
+ .collect(Collectors.partitioningBy(f ->
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
+ return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
+ fieldsByMeta.getOrDefault(false, Collections.emptyList()));
+ }
+
+ private Schema createSchemaFromFields(List<Schema.Field> fields) {
+ //fields have positions set, so we need to remove them due to avro
setFields implementation
+ for (int i = 0; i < fields.size(); i++) {
+ Schema.Field curr = fields.get(i);
+ fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(),
curr.defaultVal()));
+ }
+ Schema newSchema = Schema.createRecord(dataSchema.getName(),
dataSchema.getDoc(), dataSchema.getNamespace(), dataSchema.isError());
+ newSchema.setFields(fields);
+ return newSchema;
+ }
+
+ private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile
baseFile) {
+ BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
+ Pair<List<Schema.Field>,List<Schema.Field>> requiredFields =
getDataAndMetaCols(requiredSchema);
+ Pair<List<Schema.Field>,List<Schema.Field>> allFields =
getDataAndMetaCols(dataSchema);
+
+ Option<ClosableIterator<T>> dataFileIterator =
requiredFields.getRight().isEmpty() ? Option.empty() :
+
Option.of(readerContext.getFileRecordIterator(dataFile.getHadoopPath(), 0,
dataFile.getFileLen(),
+ createSchemaFromFields(allFields.getRight()),
createSchemaFromFields(requiredFields.getRight()), hadoopConf));
+
+ Option<ClosableIterator<T>> skeletonFileIterator =
requiredFields.getLeft().isEmpty() ? Option.empty() :
+
Option.of(readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 0,
baseFile.getFileLen(),
+ createSchemaFromFields(allFields.getLeft()),
createSchemaFromFields(requiredFields.getLeft()), hadoopConf));
+ if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) {
+ throw new IllegalStateException("should not be here if only partition
cols are required");
+ } else if (!dataFileIterator.isPresent()) {
+ return skeletonFileIterator.get();
+ } else if (!skeletonFileIterator.isPresent()) {
+ return dataFileIterator.get();
+ } else {
+ return readerContext.mergeBootstrapReaders(skeletonFileIterator.get(),
dataFileIterator.get());
+ }
}
/**
@@ -162,38 +253,44 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
* @throws IOException on reader error.
*/
public boolean hasNext() throws IOException {
- return recordBuffer.hasNext();
+ if (recordBuffer == null) {
+ return baseFileIterator.hasNext();
+ } else {
+ return recordBuffer.hasNext();
+ }
}
/**
* @return The next record after calling {@link #hasNext}.
*/
public T next() {
- return recordBuffer.next();
+ T nextVal = recordBuffer == null ? baseFileIterator.next() :
recordBuffer.next();
+ if (outputConverter.isPresent()) {
+ return outputConverter.get().apply(nextVal);
+ }
+ return nextVal;
}
private void scanLogFiles() {
- if (logFilePathList.isPresent()) {
- String path = baseFilePath.isPresent() ? baseFilePath.get().getPath() :
logFilePathList.get().get(0);
- FileSystem fs = readerContext.getFs(path, hadoopConf);
-
- HoodieMergedLogRecordReader logRecordReader =
HoodieMergedLogRecordReader.newBuilder()
- .withHoodieReaderContext(readerContext)
- .withFileSystem(fs)
- .withBasePath(readerState.tablePath)
- .withLogFilePaths(logFilePathList.get())
- .withLatestInstantTime(readerState.latestCommitTime)
- .withReaderSchema(readerState.logRecordAvroSchema)
- .withReadBlocksLazily(getBooleanWithAltKeys(props,
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
- .withReverseReader(false)
- .withBufferSize(getIntWithAltKeys(props,
HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
- .withPartition(getRelativePartitionPath(
- new Path(readerState.tablePath), new
Path(logFilePathList.get().get(0)).getParent()))
- .withRecordMerger(recordMerger)
- .withRecordBuffer(recordBuffer)
- .build();
- logRecordReader.close();
- }
+ String path = readerState.tablePath;
+ FileSystem fs = readerContext.getFs(path, hadoopConf);
+
+ HoodieMergedLogRecordReader logRecordReader =
HoodieMergedLogRecordReader.newBuilder()
+ .withHoodieReaderContext(readerContext)
+ .withFileSystem(fs)
+ .withBasePath(readerState.tablePath)
+ .withLogFiles(logFiles)
+ .withLatestInstantTime(readerState.latestCommitTime)
+ .withReaderSchema(readerState.logRecordAvroSchema)
+ .withReadBlocksLazily(getBooleanWithAltKeys(props,
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
+ .withReverseReader(false)
+ .withBufferSize(getIntWithAltKeys(props,
HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
+ .withPartition(getRelativePartitionPath(
+ new Path(readerState.tablePath),
logFiles.get(0).getPath().getParent()))
+ .withRecordMerger(recordMerger)
+ .withRecordBuffer(recordBuffer)
+ .build();
+ logRecordReader.close();
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CachingIterator.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CachingIterator.java
new file mode 100644
index 00000000000..58d5aa78709
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CachingIterator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+
+public abstract class CachingIterator<T> implements ClosableIterator<T> {
+
+ protected T nextRecord;
+
+ protected abstract boolean doHasNext();
+
+ @Override
+ public final boolean hasNext() {
+ return nextRecord != null || doHasNext();
+ }
+
+ @Override
+ public final T next() {
+ T record = nextRecord;
+ nextRecord = null;
+ return record;
+ }
+
+ public static <U> CachingIterator<U> wrap(ClosableIterator<U> iterator) {
+ return new CachingIterator<U>() {
+ @Override
+ protected boolean doHasNext() {
+ if (iterator.hasNext()) {
+ nextRecord = iterator.next();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
+ };
+ }
+
+ public static <U> CachingIterator<U> wrap(ClosableIterator<U> iterator,
HoodieReaderContext<U> readerContext) {
+ return new CachingIterator<U>() {
+ @Override
+ protected boolean doHasNext() {
+ if (iterator.hasNext()) {
+ nextRecord = readerContext.seal(iterator.next());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
+ };
+ }
+
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
index c05683e605c..6d8fa651e51 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
@@ -24,6 +24,7 @@ import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -229,4 +230,75 @@ public class TestAvroSchemaUtils {
public void testIsCompatiblePartitionDropCols(boolean shouldValidate) {
AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA,
shouldValidate, false, Collections.singleton("c"));
}
+
+ /* [HUDI-7045] should uncomment this test
+ @Test
+ public void testAppendFieldsToSchemaDedupNested() {
+ Schema full_schema = new Schema.Parser().parse("{\n"
+ + " \"type\": \"record\",\n"
+ + " \"namespace\": \"example.schema\",\n"
+ + " \"name\": \"source\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"number\",\n"
+ + " \"type\": [\"null\", \"int\"]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"nested_record\",\n"
+ + " \"type\": {\n"
+ + " \"name\": \"nested\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"string\",\n"
+ + " \"type\": [\"null\", \"string\"]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"long\",\n"
+ + " \"type\": [\"null\", \"long\"]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"other\",\n"
+ + " \"type\": [\"null\", \"int\"]\n"
+ + " }\n"
+ + " ]\n"
+ + "}\n");
+
+ Schema missing_field_schema = new Schema.Parser().parse("{\n"
+ + " \"type\": \"record\",\n"
+ + " \"namespace\": \"example.schema\",\n"
+ + " \"name\": \"source\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"number\",\n"
+ + " \"type\": [\"null\", \"int\"]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"nested_record\",\n"
+ + " \"type\": {\n"
+ + " \"name\": \"nested\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"string\",\n"
+ + " \"type\": [\"null\", \"string\"]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"other\",\n"
+ + " \"type\": [\"null\", \"int\"]\n"
+ + " }\n"
+ + " ]\n"
+ + "}\n");
+
+ Option<Schema.Field> missingField =
AvroSchemaUtils.findNestedField(full_schema, "nested_record.long");
+ assertTrue(missingField.isPresent());
+ assertEquals(full_schema,
AvroSchemaUtils.appendFieldsToSchemaDedupNested(missing_field_schema,
Collections.singletonList(missingField.get())));
+ }
+ */
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 439948a6cc9..59c078e929b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -34,7 +34,6 @@ import
org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -45,7 +44,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -69,7 +67,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
public abstract String getBasePath();
- public abstract HoodieReaderContext<T> getHoodieReaderContext(String
tablePath, String[] partitionPath);
+ public abstract HoodieReaderContext<T> getHoodieReaderContext(String
tablePath, Schema avroSchema);
public abstract void commitToTable(List<String> recordList, String operation,
Map<String, String> writeConfigs);
@@ -158,7 +156,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
SyncableFileSystemView fsView = viewManager.getFileSystemView(metaClient);
FileSlice fileSlice =
fsView.getAllFileSlices(partitionPaths[0]).findFirst().get();
List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
- Collections.sort(logFilePathList);
assertEquals(expectedLogFileNum, logFilePathList.size());
List<T> actualRecordList = new ArrayList<>();
@@ -169,17 +166,17 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS));
}
- String[] partitionValues = partitionPaths[0].isEmpty() ? new String[] {} :
new String[] {partitionPaths[0]};
assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
- getHoodieReaderContext(tablePath, partitionValues),
+ getHoodieReaderContext(tablePath, avroSchema),
hadoopConf,
tablePath,
metaClient.getActiveTimeline().lastInstant().get().getTimestamp(),
- fileSlice.getBaseFile(),
- logFilePathList.isEmpty() ? Option.empty() :
Option.of(logFilePathList),
+ fileSlice,
+ avroSchema,
avroSchema,
props,
+ metaClient.getTableConfig(),
0,
fileSlice.getTotalFileSize(),
false);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index f90e9bf5d7b..88f7224cff0 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -234,7 +234,7 @@ class
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
override def buildFileIndex(): FileIndex = fileIndex
override def buildFileFormat(): FileFormat = {
- if (fileGroupReaderEnabled && !isBootstrap) {
+ if (fileGroupReaderEnabled) {
new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 1728affac5b..3565c70e5eb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -17,19 +17,18 @@
package org.apache.spark.sql.execution.datasources.parquet
-import kotlin.NotImplementedError
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation,
HoodieCDCFileGroupSplit}
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
-import org.apache.hudi.common.table.read.HoodieFileGroupReader
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
-import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieSparkUtils, HoodieTableSchema, HoodieTableState,
MergeOnReadSnapshotRelation, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieSparkUtils, HoodieTableSchema, HoodieTableState,
MergeOnReadSnapshotRelation, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -41,10 +40,15 @@ import org.apache.spark.sql.types.{StringType, StructField,
StructType}
import org.apache.spark.util.SerializableConfiguration
import scala.annotation.tailrec
-import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.jdk.CollectionConverters.asScalaIteratorConverter
+trait HoodieFormatTrait {
+
+ // Used so that the planner only projects once and does not stack overflow
+ var isProjected: Boolean = false
+}
+
/**
* This class utilizes {@link HoodieFileGroupReader} and its related classes
to support reading
* from Parquet formatted base files and their log files.
@@ -59,8 +63,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState:
HoodieTableState,
isIncremental: Boolean,
shouldUseRecordPosition:
Boolean,
requiredFilters: Seq[Filter]
- ) extends ParquetFileFormat with
SparkAdapterSupport {
- var isProjected = false
+ ) extends ParquetFileFormat with
SparkAdapterSupport with HoodieFormatTrait {
/**
* Support batch needs to remain consistent, even if one side of a bootstrap
merge can support
@@ -69,10 +72,11 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
private var supportBatchCalled = false
private var supportBatchResult = false
+ private val sanitizedTableName =
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
if (!supportBatchCalled || supportBatchResult) {
supportBatchCalled = true
- supportBatchResult = !isMOR && !isIncremental &&
super.supportBatch(sparkSession, schema)
+ supportBatchResult = !isMOR && !isIncremental && !isBootstrap &&
super.supportBatch(sparkSession, schema)
}
supportBatchResult
}
@@ -88,17 +92,27 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ //dataSchema is not always right due to spark bugs
+ val partitionColumns = partitionSchema.fieldNames
+ val dataSchema =
StructType(tableSchema.structTypeSchema.fields.filterNot(f =>
partitionColumns.contains(f.name)))
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
spark.conf.set("spark.sql.parquet.enableVectorizedReader",
supportBatchResult)
val requiredSchemaWithMandatory =
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
+ val isCount = requiredSchemaWithMandatory.isEmpty
val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
val requiredMeta = StructType(requiredSchemaSplits._1)
val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
- val (baseFileReader, preMergeBaseFileReader, _, _, cdcFileReader) =
buildFileReaders(
+ val (baseFileReader, preMergeBaseFileReader, readerMaps, cdcFileReader) =
buildFileReaders(
spark, dataSchema, partitionSchema, if (isIncremental)
requiredSchemaWithMandatory else requiredSchema,
filters, options, augmentedHadoopConf, requiredSchemaWithMandatory,
requiredWithoutMeta, requiredMeta)
+
+ val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema,
sanitizedTableName)
+ val dataAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName)
+
val broadcastedHadoopConf = spark.sparkContext.broadcast(new
SerializableConfiguration(augmentedHadoopConf))
+ val broadcastedDataSchema = spark.sparkContext.broadcast(dataAvroSchema)
+ val broadcastedRequestedSchema =
spark.sparkContext.broadcast(requestedAvroSchema)
val props: TypedProperties = HoodieFileIndex.getConfigProperties(spark,
options)
(file: PartitionedFile) => {
@@ -106,58 +120,47 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
// Snapshot or incremental queries.
case fileSliceMapping: HoodiePartitionFileSliceMapping =>
val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
- if (FSUtils.isLogFile(filePath)) {
- val partitionValues = fileSliceMapping.getPartitionValues
- val fileSlice =
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
- buildFileGroupIterator(
- Option.empty[PartitionedFile => Iterator[InternalRow]],
- partitionValues,
- Option.empty[HoodieBaseFile],
- getLogFilesFromSlice(fileSlice),
- requiredSchemaWithMandatory,
- outputSchema,
- partitionSchema,
- broadcastedHadoopConf.value.value,
- -1,
- -1,
- shouldUseRecordPosition
- )
+ val filegroupName = if (FSUtils.isLogFile(filePath)) {
+ FSUtils.getFileId(filePath.getName).substring(1)
} else {
- fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName))
match {
- case Some(fileSlice) =>
+ FSUtils.getFileId(filePath.getName)
+ }
+ fileSliceMapping.getSlice(filegroupName) match {
+ case Some(fileSlice) if !isCount =>
+ if (requiredSchema.isEmpty &&
!fileSlice.getLogFiles.findAny().isPresent) {
val hoodieBaseFile = fileSlice.getBaseFile.get()
- val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
- val partitionValues = fileSliceMapping.getPartitionValues
- val logFiles = getLogFilesFromSlice(fileSlice)
- if (requiredSchemaWithMandatory.isEmpty) {
- val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
- baseFileReader(baseFile)
- } else if (bootstrapFileOpt.isPresent) {
- // TODO: Use FileGroupReader here: HUDI-6942.
- throw new NotImplementedError("Not support reading bootstrap
file")
- } else {
- if (logFiles.isEmpty) {
- throw new IllegalStateException(
- "should not be here since file slice should not have
been broadcasted "
- + "since it has no log or data files")
- }
- buildFileGroupIterator(
- Option(preMergeBaseFileReader),
- partitionValues,
- Option(hoodieBaseFile),
- logFiles,
- requiredSchemaWithMandatory,
- outputSchema,
- partitionSchema,
- broadcastedHadoopConf.value.value,
- 0,
- hoodieBaseFile.getFileLen,
- shouldUseRecordPosition
- )
- }
- // TODO: Use FileGroupReader here: HUDI-6942.
- case _ => baseFileReader(file)
- }
+
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen))
+ } else {
+ val readerContext: HoodieReaderContext[InternalRow] = new
SparkFileFormatInternalRowReaderContext(
+ readerMaps)
+ val serializedHadoopConf = broadcastedHadoopConf.value.value
+ val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
+
.builder().setConf(serializedHadoopConf).setBasePath(tableState.tablePath).build
+ val reader = new HoodieFileGroupReader[InternalRow](
+ readerContext,
+ serializedHadoopConf,
+ tableState.tablePath,
+ tableState.latestCommitTimestamp.get,
+ fileSlice,
+ broadcastedDataSchema.value,
+ broadcastedRequestedSchema.value,
+ metaClient.getTableConfig.getProps,
+ metaClient.getTableConfig,
+ file.start,
+ file.length,
+ shouldUseRecordPosition)
+ reader.initRecordIterators()
+ // Append partition values to rows and project to output schema
+ appendPartitionAndProject(
+
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala,
+ requiredSchema,
+ partitionSchema,
+ outputSchema,
+ fileSliceMapping.getPartitionValues)
+ }
+
+ // TODO: Use FileGroupReader here: HUDI-6942.
+ case _ => baseFileReader(file)
}
// CDC queries.
case hoodiePartitionCDCFileGroupSliceMapping:
HoodiePartitionCDCFileGroupMapping =>
@@ -190,43 +193,6 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
props)
}
- protected def buildFileGroupIterator(preMergeBaseFileReader:
Option[PartitionedFile => Iterator[InternalRow]],
- partitionValues: InternalRow,
- baseFile: Option[HoodieBaseFile],
- logFiles: List[HoodieLogFile],
- requiredSchemaWithMandatory: StructType,
- outputSchema: StructType,
- partitionSchema: StructType,
- hadoopConf: Configuration,
- start: Long,
- length: Long,
- shouldUseRecordPosition: Boolean):
Iterator[InternalRow] = {
- val readerContext: HoodieReaderContext[InternalRow] = new
SparkFileFormatInternalRowReaderContext(
- preMergeBaseFileReader, partitionValues)
- val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
- .builder().setConf(hadoopConf).setBasePath(tableState.tablePath).build
- val reader = new HoodieFileGroupReader[InternalRow](
- readerContext,
- hadoopConf,
- tableState.tablePath,
- tableState.latestCommitTimestamp.get,
- if (baseFile.nonEmpty) HOption.of(baseFile.get) else HOption.empty(),
- HOption.of(logFiles.map(f => f.getPath.toString).asJava),
- HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory,
tableName),
- metaClient.getTableConfig.getProps,
- start,
- length,
- shouldUseRecordPosition)
- reader.initRecordIterators()
- // Append partition values to rows and project to output schema
- appendPartitionAndProject(
-
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala,
- requiredSchemaWithMandatory,
- partitionSchema,
- outputSchema,
- partitionValues)
- }
-
private def appendPartitionAndProject(iter: Iterator[InternalRow],
inputSchema: StructType,
partitionSchema: StructType,
@@ -279,7 +245,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
// If not MergeOnRead or if projection is compatible
if (isIncremental) {
StructType(dataSchema.toArray ++ partitionSchema.fields)
- } else if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
+ } else {
val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
for (field <- mandatoryFields) {
if (requiredSchema.getFieldIndex(field).isEmpty) {
@@ -294,8 +260,6 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
}
val addedFields = StructType(added.toArray)
StructType(requiredSchema.toArray ++ addedFields.fields)
- } else {
- dataSchema
}
}
@@ -305,23 +269,26 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
requiredWithoutMeta: StructType,
requiredMeta: StructType):
(PartitionedFile => Iterator[InternalRow],
PartitionedFile => Iterator[InternalRow],
- PartitionedFile => Iterator[InternalRow],
- PartitionedFile => Iterator[InternalRow],
+ mutable.Map[Long, PartitionedFile => Iterator[InternalRow]],
PartitionedFile => Iterator[InternalRow]) = {
+ val m = scala.collection.mutable.Map[Long, PartitionedFile =>
Iterator[InternalRow]]()
+
val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters,
tableState.recordKeyField)
val baseFileReader = super.buildReaderWithPartitionValues(sparkSession,
dataSchema, partitionSchema, requiredSchema,
filters ++ requiredFilters, options, new Configuration(hadoopConf))
+ m.put(generateKey(dataSchema, requiredSchema), baseFileReader)
//file reader for reading a hudi base file that needs to be merged with
log files
val preMergeBaseFileReader = if (isMOR) {
// Add support for reading files using inline file system.
- super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchemaWithMandatory,
+ super.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty), requiredSchemaWithMandatory,
if (shouldUseRecordPosition) requiredFilters else
recordKeyRelatedFilters ++ requiredFilters,
options, new Configuration(hadoopConf))
} else {
_: PartitionedFile => Iterator.empty
}
+ m.put(generateKey(dataSchema, requiredSchemaWithMandatory),
preMergeBaseFileReader)
val cdcFileReader = super.buildReaderWithPartitionValues(
sparkSession,
@@ -341,41 +308,50 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val needDataCols = requiredWithoutMeta.nonEmpty
//file reader for bootstrap skeleton files
- val skeletonReader = if (needMetaCols && isBootstrap) {
+ if (needMetaCols && isBootstrap) {
+ val key = generateKey(HoodieSparkUtils.getMetaSchema, requiredMeta)
if (needDataCols || isMOR) {
// no filter and no append
- super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
- requiredMeta, Seq.empty, options, new Configuration(hadoopConf))
+ m.put(key, super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
+ requiredMeta, Seq.empty, options, new Configuration(hadoopConf)))
} else {
- // filter and append
- super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, partitionSchema,
- requiredMeta, filters ++ requiredFilters, options, new
Configuration(hadoopConf))
+ // filter
+ m.put(key, super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
+ requiredMeta, filters ++ requiredFilters, options, new
Configuration(hadoopConf)))
}
- } else {
- _: PartitionedFile => Iterator.empty
+
+ val requestedMeta = StructType(requiredSchema.fields.filter(sf =>
isMetaField(sf.name)))
+ m.put(generateKey(HoodieSparkUtils.getMetaSchema, requestedMeta),
+ super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty), requestedMeta,
+ Seq.empty, options, new Configuration(hadoopConf)))
}
//file reader for bootstrap base files
- val bootstrapBaseReader = if (needDataCols && isBootstrap) {
+ if (needDataCols && isBootstrap) {
val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf =>
isMetaField(sf.name)))
- if (isMOR) {
+ val key = generateKey(dataSchemaWithoutMeta, requiredWithoutMeta)
+ if (isMOR || needMetaCols) {
+ m.put(key, super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
+ Seq.empty, options, new Configuration(hadoopConf)))
// no filter and no append
- super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
- Seq.empty, options, new Configuration(hadoopConf))
- } else if (needMetaCols) {
- // no filter but append
- super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
- Seq.empty, options, new Configuration(hadoopConf))
+
} else {
- // filter and append
- super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
- filters ++ requiredFilters, options, new Configuration(hadoopConf))
+ // filter
+ m.put(key, super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
+ filters ++ requiredFilters, options, new Configuration(hadoopConf)))
}
- } else {
- _: PartitionedFile => Iterator.empty
+
+ val requestedWithoutMeta = StructType(requiredSchema.fields.filterNot(sf
=> isMetaField(sf.name)))
+ m.put(generateKey(dataSchemaWithoutMeta, requestedWithoutMeta),
+ super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requestedWithoutMeta,
+ Seq.empty, options, new Configuration(hadoopConf)))
}
- (baseFileReader, preMergeBaseFileReader, skeletonReader,
bootstrapBaseReader, cdcFileReader)
+ (baseFileReader, preMergeBaseFileReader, m, cdcFileReader)
+ }
+
+ protected def generateKey(dataSchema: StructType, requestedSchema:
StructType): Long = {
+ AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName).hashCode() +
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema,
sanitizedTableName).hashCode()
}
protected def getRecordKeyRelatedFilters(filters: Seq[Filter],
recordKeyColumn: String): Seq[Filter] = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index afa85ae6475..44381a5db4a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -26,7 +26,7 @@ import
org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{BaseFile, FileSlice, HoodieLogFile,
HoodieRecord}
import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils,
HoodieTableSchema, HoodieTableState, LogFileIterator,
MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping,
RecordMergingFileIterator, SkipMergeIterator, SparkAdapterSupport}
+import org.apache.hudi.{HoodieBaseRelation, HoodiePartitionFileSliceMapping,
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, LogFileIterator,
MergeOnReadSnapshotRelation, RecordMergingFileIterator, SkipMergeIterator,
SparkAdapterSupport}
import org.apache.spark.broadcast.Broadcast
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
@@ -54,7 +54,7 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
isBootstrap: Boolean,
isIncremental: Boolean,
requiredFilters: Seq[Filter]
- ) extends ParquetFileFormat with
SparkAdapterSupport {
+ ) extends ParquetFileFormat with
SparkAdapterSupport with HoodieFormatTrait {
override def isSplitable(sparkSession: SparkSession,
options: Map[String, String],
@@ -62,9 +62,6 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
false
}
- //Used so that the planner only projects once and does not stack overflow
- var isProjected = false
-
/**
* Support batch needs to remain consistent, even if one side of a bootstrap
merge can support
* while the other side can't
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
index ee91b2bcf85..9847918adc1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
@@ -86,9 +86,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
properties.setProperty(
HoodieTableConfig.BASE_FILE_FORMAT.key(),
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
- properties.setProperty(
- PAYLOAD_ORDERING_FIELD_PROP_KEY,
-
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
+ properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELD.key(),
"record_key");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),"partition_path");
properties.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(),"partition_path");
metaClient = getHoodieMetaClient(hadoopConf(), basePath(),
HoodieTableType.MERGE_ON_READ, properties);
@@ -168,8 +166,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
LOGFILE_DATA_BLOCK_FORMAT.key(),
"parquet");
extraProperties.setProperty(
- PAYLOAD_ORDERING_FIELD_PROP_KEY,
-
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
+ HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "record_key");
extraProperties.setProperty(
FILE_GROUP_READER_ENABLED.key(),
"true");
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index 20d2edfaa8d..682558f79f9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -97,7 +97,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
? Option.empty() : Option.of(partitionPaths[0]);
buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
- getHoodieReaderContext(getBasePath(), partitionValues),
+ getHoodieReaderContext(getBasePath(), avroSchema),
avroSchema,
avroSchema,
partitionNameOpt,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
index ec719414dc8..efc803666e5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
@@ -19,6 +19,7 @@
package org.apache.hudi.functional;
import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.spark.sql.Dataset;
@@ -103,9 +104,13 @@ public class TestNewHoodieParquetFileFormat extends
TestBootstrapReadBase {
protected void testCount(String tableBasePath) {
Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"false").load(tableBasePath);
+
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false")
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+ .load(tableBasePath);
Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"true").load(tableBasePath);
+
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true")
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
+ .load(tableBasePath);
assertEquals(legacyDf.count(), fileFormatDf.count());
}
@@ -119,9 +124,13 @@ public class TestNewHoodieParquetFileFormat extends
TestBootstrapReadBase {
protected void runIndividualComparison(String tableBasePath, String
firstColumn, String... columns) {
Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"false").load(tableBasePath);
+
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false")
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+ .load(tableBasePath);
Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"true").load(tableBasePath);
+
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true")
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
+ .load(tableBasePath);
if (firstColumn.isEmpty()) {
//df.except(df) does not work with map type cols
legacyDf = legacyDf.drop("city_to_state");
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index a5f25d8bd34..5c1d04ee477 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -26,15 +26,13 @@ import
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLE
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
-import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.{AvroConversionUtils,
SparkFileFormatInternalRowReaderContext}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils,
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
-import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach}
@@ -83,31 +81,16 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
tempDir.toAbsolutePath.toUri.toString
}
- override def getHoodieReaderContext(tablePath: String,
- partitionValues: Array[String]):
HoodieReaderContext[InternalRow] = {
+ override def getHoodieReaderContext(tablePath: String, avroSchema: Schema):
HoodieReaderContext[InternalRow] = {
val parquetFileFormat = new ParquetFileFormat
- val metaClient =
HoodieTableMetaClient.builder.setConf(getHadoopConf).setBasePath(tablePath).build
- val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
val structTypeSchema =
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
- val partitionFields = metaClient.getTableConfig.getPartitionFields
- val partitionSchema = if (partitionFields.isPresent) {
- new StructType(structTypeSchema.fields.filter(f =>
partitionFields.get().contains(f.name)))
- } else {
- new StructType()
- }
val recordReaderIterator =
parquetFileFormat.buildReaderWithPartitionValues(
- spark, structTypeSchema, partitionSchema, structTypeSchema, Seq.empty,
Map.empty, getHadoopConf)
- val numPartitionFields = if (partitionFields.isPresent)
partitionFields.get().length else 0
- assertEquals(numPartitionFields, partitionValues.length)
-
- val partitionValuesEncoded = new Array[UTF8String](partitionValues.length)
- for (i <- Range(0, numPartitionFields)) {
- partitionValuesEncoded.update(i,
UTF8String.fromString(partitionValues.apply(i)))
- }
+ spark, structTypeSchema, StructType(Seq.empty), structTypeSchema,
Seq.empty, Map.empty, getHadoopConf)
- val partitionValueRow = new
GenericInternalRow(partitionValuesEncoded.toArray[Any])
- new SparkFileFormatInternalRowReaderContext(Option(recordReaderIterator),
partitionValueRow)
+ val m = scala.collection.mutable.Map[Long, PartitionedFile =>
Iterator[InternalRow]]()
+ m.put(2*avroSchema.hashCode(), recordReaderIterator)
+ new SparkFileFormatInternalRowReaderContext(m)
}
override def commitToTable(recordList: util.List[String], operation: String,
options: util.Map[String, String]): Unit = {
@@ -135,6 +118,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
assertEquals(expectedDf.count, actualRecordList.size)
val actualDf = HoodieUnsafeUtils.createDataFrameFromInternalRows(
spark, actualRecordList, HoodieInternalRowUtils.getCachedSchema(schema))
- assertEquals(expectedDf.count, expectedDf.intersect(actualDf).count)
+ assertEquals(0, expectedDf.except(actualDf).count())
+ assertEquals(0, actualDf.except(expectedDf).count())
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
index 3f6934d9734..16d56373442 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
@@ -29,9 +29,10 @@ class TestQueryMergeOnReadOptimizedTable extends
HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long
+ | ts long,
+ | partition long
|) using hudi
- | partitioned by (ts)
+ | partitioned by (partition)
| location '$tablePath'
| tblproperties (
| type = 'mor',
@@ -41,10 +42,10 @@ class TestQueryMergeOnReadOptimizedTable extends
HoodieSparkSqlTestBase {
""".stripMargin)
// insert data to table
spark.sql("set hoodie.parquet.max.file.size = 10000")
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)")
spark.sql(s"update $tableName set price = 11 where id = 1")
spark.sql(s"update $tableName set price = 21 where id = 2")
spark.sql(s"update $tableName set price = 31 where id = 3")
@@ -60,7 +61,7 @@ class TestQueryMergeOnReadOptimizedTable extends
HoodieSparkSqlTestBase {
// expect that all complete parquet files can be scanned with a pending
compaction job
assertQueryResult(4, tablePath)
- spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000, 1000)")
// expect that all complete parquet files can be scanned with a pending
compaction job
assertQueryResult(5, tablePath)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
index be6a2bb7624..10ee9501422 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
@@ -70,9 +70,10 @@ class TestHoodieLogFileProcedure extends
HoodieSparkProcedureTestBase {
| id int,
| name string,
| price double,
- | ts long
+ | ts long,
+ | partition long
|) using hudi
- | partitioned by (ts)
+ | partitioned by (partition)
| location '$tablePath'
| tblproperties (
| type = 'mor',
@@ -81,8 +82,8 @@ class TestHoodieLogFileProcedure extends
HoodieSparkProcedureTestBase {
| )
""".stripMargin)
// insert data to table
- spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
- spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500, 1500")
spark.sql(s"update $tableName set name = 'b1' where id = 1")
spark.sql(s"update $tableName set name = 'b2' where id = 2")
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 069a4608073..7458d16641c 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import org.apache.hudi.SparkHoodieTableFileIndex
+import org.apache.hudi.{HoodieCDCFileIndex, SparkHoodieTableFileIndex}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
@@ -27,7 +27,7 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join,
LogicalPlan, MergeIntoTable, Project}
import
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand,
ExplainCommand}
-import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
@@ -100,10 +100,20 @@ object HoodieSpark2CatalystPlanUtils extends
HoodieCatalystPlansUtils {
override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
plan match {
- case p@PhysicalOperation(_, _,
- l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
- fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
-
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), p)
+ case physicalOperation@PhysicalOperation(_, _,
+ logicalRelation@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait] &&
!fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait].isProjected =>
+ val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait]
+ ff.isProjected = true
+ val tableSchema = fs.location match {
+ case index: HoodieCDCFileIndex => index.cdcRelation.schema
+ case index: SparkHoodieTableFileIndex => index.schema
+ }
+ val resolvedSchema = logicalRelation.resolve(tableSchema,
fs.sparkSession.sessionState.analyzer.resolver)
+ if (!fs.partitionSchema.fields.isEmpty) {
+ Project(resolvedSchema, physicalOperation)
+ } else {
+ physicalOperation
+ }
case _ => plan
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index a01cce70c1f..f5757e16b41 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -17,15 +17,17 @@
package org.apache.spark.sql
-import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport,
SparkHoodieTableFileIndex}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join,
JoinHint, LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.{CreateTableLikeCommand,
ExplainCommand}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -84,6 +86,21 @@ trait HoodieSpark3CatalystPlanUtils extends
HoodieCatalystPlansUtils {
object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
+ def applyNewFileFormatChanges(scanOperation: LogicalPlan, logicalRelation:
LogicalPlan, fs: HadoopFsRelation): LogicalPlan = {
+ val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait]
+ ff.isProjected = true
+ val tableSchema = fs.location match {
+ case index: HoodieCDCFileIndex => index.cdcRelation.schema
+ case index: SparkHoodieTableFileIndex => index.schema
+ }
+ val resolvedSchema = logicalRelation.resolve(tableSchema,
fs.sparkSession.sessionState.analyzer.resolver)
+ if (!fs.partitionSchema.fields.isEmpty &&
scanOperation.sameOutput(logicalRelation)) {
+ Project(resolvedSchema, scanOperation)
+ } else {
+ scanOperation
+ }
+ }
+
/**
* This is an extractor to accommodate for [[ResolvedTable]] signature
change in Spark 3.2
*/
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index 6cd5da79b86..34a1e084227 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -18,15 +18,14 @@
package org.apache.spark.sql
-import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
-import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
@@ -49,9 +48,10 @@ object HoodieSpark30CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
plan match {
case s@ScanOperation(_, _,
- l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
- fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
-
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+ if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+ && !fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait].isProjected =>
+ HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index 8a56d0fba25..dd3ac530017 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
-import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
@@ -49,9 +49,10 @@ object HoodieSpark31CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
plan match {
case s@ScanOperation(_, _,
- l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
- fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
-
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+ if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+ && !fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait].isProjected =>
+ HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index 1bb4638fcdb..7a30aebd856 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -19,17 +19,16 @@
package org.apache.spark.sql
import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex,
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
-import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
@@ -52,9 +51,10 @@ object HoodieSpark32CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
plan match {
case s@ScanOperation(_, _,
- l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
- fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
-
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+ if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+ && !fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait].isProjected =>
+ HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 85bd4a2c5e5..9bcc40a29e6 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -18,16 +18,15 @@
package org.apache.spark.sql
-import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex,
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
-import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
@@ -50,9 +49,10 @@ object HoodieSpark33CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
plan match {
case s@ScanOperation(_, _,
- l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
- fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
-
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+ if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+ && !fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait].isProjected =>
+ HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index 41b629aac8e..e49adebc14b 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -18,15 +18,14 @@
package org.apache.spark.sql
-import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex,
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
-import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
@@ -49,9 +48,10 @@ object HoodieSpark34CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
plan match {
case s@ScanOperation(_, _, _,
- l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
- fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
-
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+ if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+ && !fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait].isProjected =>
+ HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index 3ef6abdf8e1..eac3153b3f9 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -18,15 +18,14 @@
package org.apache.spark.sql
-import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex,
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
-import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
@@ -49,9 +48,10 @@ object HoodieSpark35CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
plan match {
case s@ScanOperation(_, _, _,
- l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
- fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
-
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+ if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+ && !fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait].isProjected =>
+ HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
case _ => plan
}
}