This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 034adda [HUDI-3396] Make sure `BaseFileOnlyViewRelation` only reads
projected columns (#4818)
034adda is described below
commit 034addaef5834eff09cfd9ac5cc2656df95ca0e8
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Mar 9 18:45:25 2022 -0800
[HUDI-3396] Make sure `BaseFileOnlyViewRelation` only reads projected
columns (#4818)
NOTE: This change is first part of the series to clean up Hudi's Spark
DataSource related implementations, making sure there's minimal code
duplication among them, implementations are consistent and performant
This PR is making sure that BaseFileOnlyViewRelation only reads projected
columns as well as avoiding unnecessary serde from Row to InternalRow
Brief change log
- Introduced HoodieBaseRDD as a base for all custom RDD impls
- Extracted common fields/methods to HoodieBaseRelation
- Cleaned up and streamlined HoodieBaseFileViewOnlyRelation
- Fixed all of the Relations to avoid superfluous Row <> InternalRow
conversions
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +-
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 18 +-
...zerTrait.scala => HoodieAvroDeserializer.scala} | 10 +-
...lizerTrait.scala => HoodieAvroSerializer.scala} | 2 +-
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 10 +-
.../SparkClientFunctionalTestHarness.java | 32 +-
.../hudi/metadata/HoodieMetadataPayload.java | 2 +-
.../org/apache/hudi/BaseFileOnlyViewRelation.scala | 135 +++++---
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 78 ++++-
.../org/apache/hudi/HoodieDataSourceHelper.scala | 39 +--
.../scala/org/apache/hudi/HoodieFileScanRDD.scala | 60 +---
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 16 +-
.../scala/org/apache/hudi/HoodieUnsafeRDD.scala | 68 ++++
.../hudi/MergeOnReadIncrementalRelation.scala | 29 +-
.../apache/hudi/MergeOnReadSnapshotRelation.scala | 53 +--
.../org/apache/spark/HoodieUnsafeRDDUtils.scala | 44 +++
...lizer.scala => HoodieSparkAvroSerializer.scala} | 4 +-
.../TestConvertFilterToCatalystExpression.scala | 8 +-
.../hudi/functional/TestMORDataSourceStorage.scala | 3 +
.../functional/TestParquetColumnProjection.scala | 355 +++++++++++++++++++++
.../apache/spark/sql/adapter/Spark2Adapter.scala | 10 +-
...er.scala => HoodieSpark2AvroDeserializer.scala} | 10 +-
.../apache/spark/sql/adapter/Spark3Adapter.scala | 10 +-
...er.scala => HoodieSpark3AvroDeserializer.scala} | 6 +-
.../functional/TestHDFSParquetImporter.java | 11 +-
25 files changed, 752 insertions(+), 265 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 6053dcf..4202cbd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -44,6 +44,7 @@ import
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
@@ -1552,7 +1553,8 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public CompressionCodecName getParquetCompressionCodec() {
- return
CompressionCodecName.fromConf(getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME));
+ String codecName =
getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
+ return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName)
? null : codecName);
}
public boolean parquetDictionaryEnabled() {
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index b288289..c963806 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -177,14 +177,24 @@ object HoodieSparkUtils extends SparkAdapterSupport {
* Convert Filters to Catalyst Expressions and joined by And. If convert
success return an
* Non-Empty Option[Expression],or else return None.
*/
- def convertToCatalystExpressions(filters: Array[Filter],
- tableSchema: StructType):
Option[Expression] = {
- val expressions = filters.map(convertToCatalystExpression(_, tableSchema))
+ def convertToCatalystExpressions(filters: Seq[Filter],
+ tableSchema: StructType):
Seq[Option[Expression]] = {
+ filters.map(convertToCatalystExpression(_, tableSchema))
+ }
+
+
+ /**
+ * Convert Filters to Catalyst Expressions and joined by And. If convert
success return an
+ * Non-Empty Option[Expression],or else return None.
+ */
+ def convertToCatalystExpression(filters: Array[Filter],
+ tableSchema: StructType): Option[Expression]
= {
+ val expressions = convertToCatalystExpressions(filters, tableSchema)
if (expressions.forall(p => p.isDefined)) {
if (expressions.isEmpty) {
None
} else if (expressions.length == 1) {
- expressions(0)
+ expressions.head
} else {
Some(expressions.map(_.get).reduce(org.apache.spark.sql.catalyst.expressions.And))
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
similarity index 75%
rename from
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala
rename to
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
index 5c30353..4c4ddb5 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
@@ -24,12 +24,6 @@ package org.apache.spark.sql.avro
* If you're looking to convert Avro into "deserialized" [[Row]]
(comprised of Java native types),
* please check [[AvroConversionUtils]]
*/
-trait HoodieAvroDeserializerTrait {
- final def deserialize(data: Any): Option[Any] =
- doDeserialize(data) match {
- case opt: Option[_] => opt // As of Spark 3.1, this will return data
wrapped with Option, so we fetch the data
- case row => Some(row) // For other Spark versions, return the
data as is
- }
-
- protected def doDeserialize(data: Any): Any
+trait HoodieAvroDeserializer {
+ def deserialize(data: Any): Option[Any]
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
similarity index 97%
rename from
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala
rename to
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
index 159d8da..84ba44b 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
@@ -23,6 +23,6 @@ package org.apache.spark.sql.avro
* NOTE: This is low-level component operating on Spark internal data-types
(comprising [[InternalRow]]).
* If you're looking to convert "deserialized" [[Row]] into Avro, please
check [[AvroConversionUtils]]
*/
-trait HoodieAvroSerializerTrait {
+trait HoodieAvroSerializer {
def serialize(catalystData: Any): Any
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 62bdc44..e41a9c1 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi
import org.apache.avro.Schema
import org.apache.hudi.client.utils.SparkRowSerDe
-import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait,
HoodieAvroSerializerTrait}
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -43,16 +43,16 @@ import java.util.Locale
trait SparkAdapter extends Serializable {
/**
- * Creates instance of [[HoodieAvroSerializerTrait]] providing for ability
to serialize
+ * Creates instance of [[HoodieAvroSerializer]] providing for ability to
serialize
* Spark's [[InternalRow]] into Avro payloads
*/
- def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean): HoodieAvroSerializerTrait
+ def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean): HoodieAvroSerializer
/**
- * Creates instance of [[HoodieAvroDeserializerTrait]] providing for ability
to deserialize
+ * Creates instance of [[HoodieAvroDeserializer]] providing for ability to
deserialize
* Avro payloads into Spark's [[InternalRow]]
*/
- def createAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType): HoodieAvroDeserializerTrait
+ def createAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType): HoodieAvroDeserializer
/**
* Create the SparkRowSerDe.
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 94e080c..f9676c6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -19,6 +19,13 @@
package org.apache.hudi.testutils;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -28,6 +35,7 @@ import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -42,6 +50,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.table.HoodieSparkTable;
@@ -50,14 +59,11 @@ import
org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
@@ -69,6 +75,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
@@ -348,6 +355,21 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
.withRollbackUsingMarkers(rollbackUsingMarkers);
}
+ protected Dataset<Row> toDataset(List<HoodieRecord> records, Schema schema) {
+ List<GenericRecord> avroRecords = records.stream()
+ .map(r -> {
+ HoodieRecordPayload payload = (HoodieRecordPayload) r.getData();
+ try {
+ return (GenericRecord) payload.getInsertValue(schema).get();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to extract Avro payload", e);
+ }
+ })
+ .collect(Collectors.toList());
+ JavaRDD<GenericRecord> jrdd = jsc.parallelize(avroRecords, 2);
+ return AvroConversionUtils.createDataFrame(jrdd.rdd(), schema.toString(),
spark);
+ }
+
protected int incrementTimelineServicePortToUse() {
// Increment the timeline service port for each individual test
// to avoid port reuse causing failures
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 548fbb9..c0ad8b1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -133,7 +133,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
// This can be simplified using SpecificData.deepcopy once this bug is
fixed
// https://issues.apache.org/jira/browse/AVRO-1811
//
- // NOTE: {@code HoodieMetadataRecord} has to always carry both "key" nad
"type" fields
+ // NOTE: {@code HoodieMetadataRecord} has to always carry both "key" and
"type" fields
// for it to be handled appropriately, therefore these fields have
to be reflected
// in any (read-)projected schema
key = record.get(KEY_FIELD_NAME).toString();
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
index 8e94805..adc34af 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
@@ -18,63 +18,82 @@
package org.apache.hudi
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-
+import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
-
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache,
PartitionedFile}
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Expression,
SubqueryExpression}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.sql.types.StructType
/**
- * The implement of [[BaseRelation]], which is used to respond to query that
only touches the base files(Parquet),
- * like query COW tables in Snapshot-Query and Read_Optimized mode and MOR
tables in Read_Optimized mode.
+ * [[BaseRelation]] implementation only reading Base files of Hudi tables,
essentially supporting following querying
+ * modes:
+ * <ul>
+ * <li>For COW tables: Snapshot</li>
+ * <li>For MOR tables: Read-optimized</li>
+ * </ul>
+ *
+ * NOTE: The reason this Relation is used in liue of Spark's default
[[HadoopFsRelation]] is primarily due to the
+ * fact that it injects real partition's path as the value of the partition
field, which Hudi ultimately persists
+ * as part of the record payload. In some cases, however, partition path might
not necessarily be equal to the
+ * verbatim value of the partition path field (when custom [[KeyGenerator]] is
used) therefore leading to incorrect
+ * partition field values being written
*/
-class BaseFileOnlyViewRelation(
- sqlContext: SQLContext,
- metaClient: HoodieTableMetaClient,
- optParams: Map[String, String],
- userSchema: Option[StructType],
- globPaths: Seq[Path]
- ) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema)
with SparkAdapterSupport {
-
- override def buildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[Row] = {
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
"false")
-
- val filterExpressions =
HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
- .getOrElse(Literal(true, BooleanType))
- val (partitionFilters, dataFilters) = {
- val splited = filters.map { filter =>
- HoodieDataSourceHelper.splitPartitionAndDataPredicates(
- sparkSession, filterExpressions, partitionColumns)
- }
- (splited.flatMap(_._1), splited.flatMap(_._2))
- }
- val partitionFiles = getPartitionFiles(partitionFilters, dataFilters)
+class BaseFileOnlyViewRelation(sqlContext: SQLContext,
+ metaClient: HoodieTableMetaClient,
+ optParams: Map[String, String],
+ userSchema: Option[StructType],
+ globPaths: Seq[Path])
+ extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema)
with SparkAdapterSupport {
- val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
- val filePartitions = sparkAdapter.getFilePartitions(sparkSession,
partitionFiles, maxSplitBytes)
+ private val fileIndex = HoodieFileIndex(sparkSession, metaClient,
userSchema, optParams,
+ FileStatusCache.getOrCreate(sqlContext.sparkSession))
+
+ override def doBuildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[InternalRow] = {
+ // NOTE: In case list of requested columns doesn't contain the Primary Key
one, we
+ // have to add it explicitly so that
+ // - Merging could be performed correctly
+ // - In case 0 columns are to be fetched (for ex, when doing
{@code count()} on Spark's [[Dataset]],
+ // Spark still fetches all the rows to execute the query correctly
+ //
+ // It's okay to return columns that have not been requested by the
caller, as those nevertheless will be
+ // filtered out upstream
+ val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
+ val (requiredAvroSchema, requiredStructSchema) =
+ HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
+
+ val filterExpressions = convertToExpressions(filters)
+ val (partitionFilters, dataFilters) =
filterExpressions.partition(isPartitionPredicate)
+
+ val filePartitions = getPartitions(partitionFilters, dataFilters)
+
+ val partitionSchema = StructType(Nil)
+ val tableSchema = HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString)
+ val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString)
- val requiredSchemaParquetReader =
HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = sparkSession,
- dataSchema = tableStructSchema,
- partitionSchema = StructType(Nil),
- requiredSchema = tableStructSchema,
+ val baseFileReader = createBaseFileReader(
+ spark = sparkSession,
+ partitionSchema = partitionSchema,
+ tableSchema = tableSchema,
+ requiredSchema = requiredSchema,
filters = filters,
options = optParams,
- hadoopConf = sparkSession.sessionState.newHadoopConf()
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = new Configuration(conf)
)
- new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema,
- requiredSchemaParquetReader, filePartitions)
+ new HoodieFileScanRDD(sparkSession, baseFileReader, filePartitions)
}
- private def getPartitionFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionedFile] = {
+ private def getPartitions(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[FilePartition] = {
val partitionDirectories = if (globPaths.isEmpty) {
val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient,
userSchema, optParams,
FileStatusCache.getOrCreate(sqlContext.sparkSession))
@@ -89,18 +108,46 @@ class BaseFileOnlyViewRelation(
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}
- val partitionFiles = partitionDirectories.flatMap { partition =>
+ val partitions = partitionDirectories.flatMap { partition =>
partition.files.flatMap { file =>
+ // TODO move to adapter
+ // TODO fix, currently assuming parquet as underlying format
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = file,
- partitionValues = partition.values
+ // TODO clarify why this is required
+ partitionValues = InternalRow.empty
)
}
}
- partitionFiles.map{ f =>
- PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length)
+ val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+
+ sparkAdapter.getFilePartitions(sparkSession, partitions, maxSplitBytes)
+ }
+
+ private def convertToExpressions(filters: Array[Filter]): Array[Expression]
= {
+ val catalystExpressions =
HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
+
+ val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _)
=> opt.isEmpty }
+ if (failedExprs.nonEmpty) {
+ val failedFilters = failedExprs.map(p => filters(p._2))
+ logWarning(s"Failed to convert Filters into Catalyst expressions
(${failedFilters.map(_.toString)})")
}
+
+ catalystExpressions.filter(_.isDefined).map(_.get).toArray
}
+
+ /**
+ * Checks whether given expression only references only references partition
columns
+ * (and involves no sub-query)
+ */
+ private def isPartitionPredicate(condition: Expression): Boolean = {
+ // Validates that the provided names both resolve to the same entity
+ val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
+
+ condition.references.forall { r =>
partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
+ !SubqueryExpression.hasSubquery(condition)
+ }
+
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 1e2946d..e07b316 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -22,38 +22,70 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.io.hfile.CacheConfig
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation.isMetadataTable
import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieFileFormat
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.io.storage.HoodieHFileReader
-import org.apache.hudi.metadata.HoodieTableMetadata
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import scala.collection.JavaConverters._
import scala.util.Try
case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr:
String)
+case class HoodieTableState(recordKeyField: String,
+ preCombineFieldOpt: Option[String])
+
/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
*/
-abstract class HoodieBaseRelation(
- val sqlContext: SQLContext,
- metaClient: HoodieTableMetaClient,
- optParams: Map[String, String],
- userSchema: Option[StructType])
- extends BaseRelation with PrunedFilteredScan with Logging{
+abstract class HoodieBaseRelation(val sqlContext: SQLContext,
+ metaClient: HoodieTableMetaClient,
+ optParams: Map[String, String],
+ userSchema: Option[StructType])
+ extends BaseRelation with PrunedFilteredScan with Logging {
protected val sparkSession: SparkSession = sqlContext.sparkSession
+ protected lazy val conf: Configuration = new
Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ protected lazy val jobConf = new JobConf(conf)
+
+ // If meta fields are enabled, always prefer key from the meta field as
opposed to user-specified one
+ // NOTE: This is historical behavior which is preserved as is
+ protected lazy val recordKeyField: String =
+ if (metaClient.getTableConfig.populateMetaFields())
HoodieRecord.RECORD_KEY_METADATA_FIELD
+ else metaClient.getTableConfig.getRecordKeyFieldProp
+
+ protected lazy val preCombineFieldOpt: Option[String] =
getPrecombineFieldProperty
+
+ /**
+ * @VisibleInTests
+ */
+ lazy val mandatoryColumns: Seq[String] = {
+ if (isMetadataTable(metaClient)) {
+ Seq(HoodieMetadataPayload.KEY_FIELD_NAME,
HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
+ } else {
+ Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+ }
+ }
+
+ protected lazy val specifiedQueryInstant: Option[String] =
+ optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
+ .map(HoodieSqlCommonUtils.formatQueryInstant)
+
protected lazy val tableAvroSchema: Schema = {
val schemaUtil = new TableSchemaResolver(metaClient)
Try(schemaUtil.getTableAvroSchema).getOrElse(
@@ -81,6 +113,34 @@ abstract class HoodieBaseRelation(
}
override def schema: StructType = tableStructSchema
+
+ /**
+ * This method controls whether relation will be producing
+ * <ul>
+ * <li>[[Row]], when it's being equal to true</li>
+ * <li>[[InternalRow]], when it's being equal to false</li>
+ * </ul>
+ *
+ * Returning [[InternalRow]] directly enables us to save on needless ser/de
loop from [[InternalRow]] (being
+ * produced by file-reader) to [[Row]] and back
+ */
+ override final def needConversion: Boolean = false
+
+ /**
+ * NOTE: DO NOT OVERRIDE THIS METHOD
+ */
+ override final def buildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[Row] = {
+ // Here we rely on a type erasure, to workaround inherited API restriction
and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
+ // Please check [[needConversion]] scala-doc for more details
+ doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
+ }
+
+ protected def doBuildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[InternalRow]
+
+ protected final def appendMandatoryColumns(requestedColumns: Array[String]):
Array[String] = {
+ val missing = mandatoryColumns.filter(col =>
!requestedColumns.contains(col))
+ requestedColumns ++ missing
+ }
}
object HoodieBaseRelation {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index fb12549..40299cf 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper,
SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{PredicateHelper,
SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources.Filter
@@ -33,43 +33,6 @@ import scala.collection.JavaConverters._
object HoodieDataSourceHelper extends PredicateHelper {
- /**
- * Partition the given condition into two sequence of conjunctive predicates:
- * - predicates that can be evaluated using metadata only.
- * - other predicates.
- */
- def splitPartitionAndDataPredicates(
- spark: SparkSession,
- condition: Expression,
- partitionColumns: Seq[String]): (Seq[Expression], Seq[Expression]) = {
- splitConjunctivePredicates(condition).partition(
- isPredicateMetadataOnly(spark, _, partitionColumns))
- }
-
- /**
- * Check if condition can be evaluated using only metadata. In Delta, this
means the condition
- * only references partition columns and involves no subquery.
- */
- def isPredicateMetadataOnly(
- spark: SparkSession,
- condition: Expression,
- partitionColumns: Seq[String]): Boolean = {
- isPredicatePartitionColumnsOnly(spark, condition, partitionColumns) &&
- !SubqueryExpression.hasSubquery(condition)
- }
-
- /**
- * Does the predicate only contains partition columns?
- */
- def isPredicatePartitionColumnsOnly(
- spark: SparkSession,
- condition: Expression,
- partitionColumns: Seq[String]): Boolean = {
- val nameEquality = spark.sessionState.analyzer.resolver
- condition.references.forall { r =>
- partitionColumns.exists(nameEquality(r.name, _))
- }
- }
/**
* Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
index 9f2d7d9..7e8f62b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
@@ -18,56 +18,37 @@
package org.apache.hudi
-import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile, SchemaColumnConvertNotSupportedException}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.{Partition, TaskContext}
/**
- * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]].
- *
- * This class will extract the fields needed according to [[requiredColumns]]
and
- * return iterator of [[org.apache.spark.sql.Row]] directly.
+ * TODO eval if we actually need it
*/
-class HoodieFileScanRDD(
- @transient private val sparkSession: SparkSession,
- requiredColumns: Array[String],
- schema: StructType,
- readFunction: PartitionedFile => Iterator[InternalRow],
- @transient val filePartitions: Seq[FilePartition])
- extends RDD[Row](sparkSession.sparkContext, Nil) {
-
- private val requiredSchema = {
- val nameToStructField = schema.map(field => (field.name, field)).toMap
- StructType(requiredColumns.map(nameToStructField))
- }
-
- private val requiredFieldPos =
HoodieSparkUtils.collectFieldIndexes(requiredSchema, schema)
-
- override def compute(split: Partition, context: TaskContext): Iterator[Row]
= {
- val iterator = new Iterator[Object] with AutoCloseable {
+class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
+ readFunction: PartitionedFile => Iterator[InternalRow],
+ @transient fileSplits: Seq[FilePartition])
+ extends HoodieUnsafeRDD(sparkSession.sparkContext) {
+ override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
+ val iterator = new Iterator[InternalRow] with AutoCloseable {
private[this] val files =
split.asInstanceOf[FilePartition].files.toIterator
- private[this] var currentFile: PartitionedFile = null
- private[this] var currentIterator: Iterator[Object] = null
+ private[this] var currentFile: PartitionedFile = _
+ private[this] var currentIterator: Iterator[InternalRow] = _
override def hasNext: Boolean = {
(currentIterator != null && currentIterator.hasNext) || nextIterator()
}
- def next(): Object = {
- currentIterator.next()
- }
+ def next(): InternalRow = currentIterator.next()
/** Advances to the next file. Returns true if a new non-empty iterator
is available. */
private def nextIterator(): Boolean = {
if (files.hasNext) {
- currentFile = files.next()
-
logInfo(s"Reading File $currentFile")
+ currentFile = files.next()
currentIterator = readFunction(currentFile)
try {
@@ -93,17 +74,8 @@ class HoodieFileScanRDD(
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener[Unit](_ => iterator.close())
- // extract required columns from row
- val iterAfterExtract = HoodieDataSourceHelper.extractRequiredSchema(
- iterator.asInstanceOf[Iterator[InternalRow]],
- requiredSchema,
- requiredFieldPos)
-
- // convert InternalRow to Row and return
- val converter =
CatalystTypeConverters.createToScalaConverter(requiredSchema)
- iterAfterExtract.map(converter(_).asInstanceOf[Row])
+ iterator.asInstanceOf[Iterator[InternalRow]]
}
- override protected def getPartitions: Array[Partition] =
filePartitions.toArray
-
+ override protected def getPartitions: Array[Partition] = fileSplits.toArray
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 4a4ea2c..3a518da 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -36,7 +36,6 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadata}
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -54,10 +53,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
fullSchemaFileReader: PartitionedFile =>
Iterator[InternalRow],
requiredSchemaFileReader: PartitionedFile =>
Iterator[InternalRow],
- tableState: HoodieMergeOnReadTableState,
+ tableState: HoodieTableState,
tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema)
- extends RDD[InternalRow](sc, Nil) {
+ requiredSchema: HoodieTableSchema,
+ @transient fileSplits:
List[HoodieMergeOnReadFileSplit])
+ extends HoodieUnsafeRDD(sc) {
private val confBroadcast = sc.broadcast(new SerializableWritable(config))
private val recordKeyField = tableState.recordKeyField
@@ -98,12 +98,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
iter
}
- override protected def getPartitions: Array[Partition] = {
- tableState
- .hoodieRealtimeFileSplits
- .zipWithIndex
- .map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
- }
+ override protected def getPartitions: Array[Partition] =
+ fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2,
file._1)).toArray
private def getConfig: Configuration = {
val conf = confBroadcast.value.value
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala
new file mode 100644
index 0000000..3f95746
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+
+/**
+ * !!! PLEASE READ CAREFULLY !!!
+ *
+ * Base class for all of the custom low-overhead RDD implementations for Hudi.
+ *
+ * To keep memory allocation footprint as low as possible, each inheritor of
this RDD base class
+ *
+ * <pre>
+ * 1. Does NOT deserialize from [[InternalRow]] to [[Row]] (therefore only
providing access to
+ * Catalyst internal representations (often mutable) of the read row)
+ *
+ * 2. DOES NOT COPY UNDERLYING ROW OUT OF THE BOX, meaning that
+ *
+ * a) access to this RDD is NOT thread-safe
+ *
+ * b) iterating over it reference to a _mutable_ underlying instance (of
[[InternalRow]]) is
+ * returned, entailing that after [[Iterator#next()]] is invoked on the
provided iterator,
+ * previous reference becomes **invalid**. Therefore, you will have to
copy underlying mutable
+ * instance of [[InternalRow]] if you plan to access it after
[[Iterator#next()]] is invoked (filling
+ * it with the next row's payload)
+ *
+ * c) due to item b) above, no operation other than the iteration will
produce meaningful
+ * results on it and will likely fail [1]
+ * </pre>
+ *
+ * [1] For example, [[RDD#collect]] method on this implementation would not
work correctly, as it's
+ * simply using Scala's default [[Iterator#toArray]] method which will simply
concat all the references onto
+ * the same underlying mutable object into [[Array]]. Instead each individual
[[InternalRow]] _has to be copied_,
+ * before concatenating into the final output. Please refer to
[[HoodieRDDUtils#collect]] for more details.
+ *
+ * NOTE: It enforces, for ex, that all of the RDDs implement [[compute]]
method returning
+ * [[InternalRow]] to avoid superfluous ser/de
+ */
+abstract class HoodieUnsafeRDD(@transient sc: SparkContext)
+ extends RDD[InternalRow](sc, Nil) {
+
+ def compute(split: Partition, context: TaskContext): Iterator[InternalRow]
+
+ override final def collect(): Array[InternalRow] =
+ throw new UnsupportedOperationException(
+ "This method will not function correctly, please refer to scala-doc for
HoodieUnsafeRDD"
+ )
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index b9d18c6..8308e3b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -19,7 +19,6 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{GlobPattern, Path}
-import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -28,11 +27,11 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata,
getWritePartitionPaths, listAffectedFilesForCommits}
import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
import scala.collection.JavaConversions._
@@ -47,9 +46,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
- private val conf = new
Configuration(sqlContext.sparkContext.hadoopConfiguration)
- private val jobConf = new JobConf(conf)
-
private val commitTimeline =
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
@@ -77,8 +73,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
private val fileIndex = if (commitsToReturn.isEmpty) List() else
buildFileIndex()
- private val preCombineFieldOpt = getPrecombineFieldProperty
-
// Record filters making sure that only records w/in the requested bounds
are being fetched as part of the
// scan collected by this relation
private lazy val incrementalSpanRecordsFilters: Seq[Filter] = {
@@ -88,18 +82,16 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}
- private lazy val mandatoryColumns = {
+ override lazy val mandatoryColumns: Seq[String] = {
// NOTE: This columns are required for Incremental flow to be able to
handle the rows properly, even in
// cases when no columns are requested to be fetched (for ex, when
using {@code count()} API)
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
}
- override def needConversion: Boolean = false
-
- override def buildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[Row] = {
+ override def doBuildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[InternalRow] = {
if (fileIndex.isEmpty) {
- sqlContext.sparkContext.emptyRDD[Row]
+ sqlContext.sparkContext.emptyRDD[InternalRow]
} else {
logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
logDebug(s"buildScan filters = ${filters.mkString(",")}")
@@ -148,20 +140,20 @@ class MergeOnReadIncrementalRelation(sqlContext:
SQLContext,
hadoopConf = new Configuration(conf)
)
- val hoodieTableState = HoodieMergeOnReadTableState(fileIndex,
HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
+ val hoodieTableState =
HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
// TODO implement incremental span record filtering w/in RDD to make
sure returned iterator is appropriately
// filtered, since file-reader might not be capable to perform
filtering
- val rdd = new HoodieMergeOnReadRDD(
+ new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
jobConf,
fullSchemaParquetReader,
requiredSchemaParquetReader,
hoodieTableState,
tableSchema,
- requiredSchema
+ requiredSchema,
+ fileIndex
)
- rdd.asInstanceOf[RDD[Row]]
}
}
@@ -225,9 +217,4 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes,
mergeType)
})
}
-
- private def appendMandatoryColumns(requestedColumns: Array[String]):
Array[String] = {
- val missing = mandatoryColumns.filter(col =>
!requestedColumns.contains(col))
- requestedColumns ++ missing
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 7c1a354..6156054 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -20,22 +20,19 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.{createBaseFileReader,
isMetadataTable}
+import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
-import org.apache.hudi.metadata.HoodieMetadataPayload
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileStatusCache,
PartitionedFile}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
import scala.collection.JavaConverters._
@@ -46,10 +43,6 @@ case class HoodieMergeOnReadFileSplit(dataFile:
Option[PartitionedFile],
maxCompactionMemoryInBytes: Long,
mergeType: String)
-case class HoodieMergeOnReadTableState(hoodieRealtimeFileSplits:
List[HoodieMergeOnReadFileSplit],
- recordKeyField: String,
- preCombineFieldOpt: Option[String])
-
class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
optParams: Map[String, String],
val userSchema: Option[StructType],
@@ -57,38 +50,13 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
- private val conf = new
Configuration(sqlContext.sparkContext.hadoopConfiguration)
- private val jobConf = new JobConf(conf)
-
private val mergeType = optParams.getOrElse(
DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
private val maxCompactionMemoryInBytes =
getMaxCompactionMemoryInBytes(jobConf)
- // If meta fields are enabled, always prefer key from the meta field as
opposed to user-specified one
- // NOTE: This is historical behavior which is preserved as is
- private val recordKeyField = {
- if (metaClient.getTableConfig.populateMetaFields())
HoodieRecord.RECORD_KEY_METADATA_FIELD
- else metaClient.getTableConfig.getRecordKeyFieldProp
- }
-
- private val preCombineFieldOpt = getPrecombineFieldProperty
-
- private lazy val mandatoryColumns = {
- if (isMetadataTable(metaClient)) {
- Seq(HoodieMetadataPayload.KEY_FIELD_NAME,
HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
- } else {
- Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
- }
- }
-
- override def needConversion: Boolean = false
-
- private val specifiedQueryInstant =
optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
- .map(HoodieSqlCommonUtils.formatQueryInstant)
-
- override def buildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[Row] = {
+ override def doBuildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[InternalRow] = {
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s" buildScan filters = ${filters.mkString(",")}")
@@ -137,12 +105,10 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
hadoopConf = new Configuration(conf)
)
- val tableState = HoodieMergeOnReadTableState(fileIndex, recordKeyField,
preCombineFieldOpt)
+ val tableState = HoodieTableState(recordKeyField, preCombineFieldOpt)
- val rdd = new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf,
fullSchemaParquetReader,
- requiredSchemaParquetReader, tableState, tableSchema, requiredSchema)
-
- rdd.asInstanceOf[RDD[Row]]
+ new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf,
fullSchemaParquetReader,
+ requiredSchemaParquetReader, tableState, tableSchema, requiredSchema,
fileIndex)
}
def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit]
= {
@@ -193,7 +159,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val partitionColumns = hoodieFileIndex.partitionSchema.fieldNames.toSet
val partitionFilters = filters.filter(f => f.references.forall(p =>
partitionColumns.contains(p)))
val partitionFilterExpression =
- HoodieSparkUtils.convertToCatalystExpressions(partitionFilters,
tableStructSchema)
+ HoodieSparkUtils.convertToCatalystExpression(partitionFilters,
tableStructSchema)
val convertedPartitionFilterExpression =
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient,
partitionFilterExpression.toSeq)
@@ -231,11 +197,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
}
}
}
-
- private def appendMandatoryColumns(requestedColumns: Array[String]):
Array[String] = {
- val missing = mandatoryColumns.filter(col =>
!requestedColumns.contains(col))
- requestedColumns ++ missing
- }
}
object MergeOnReadSnapshotRelation {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
new file mode 100644
index 0000000..1ac8fa0
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.hudi.HoodieUnsafeRDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.util.MutablePair
+
+/**
+ * Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]]
+ */
+object HoodieUnsafeRDDUtils {
+
+ /**
+ * Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]],
returning a properly
+ * copied [[Array]] of [[InternalRow]]s
+ */
+ def collect(rdd: HoodieUnsafeRDD): Array[InternalRow] = {
+ rdd.mapPartitionsInternal { iter =>
+ // NOTE: We're leveraging [[MutablePair]] here to avoid unnecessary
allocations, since
+ // a) iteration is performed lazily and b) iteration is
single-threaded (w/in partition)
+ val pair = new MutablePair[InternalRow, Null]()
+ iter.map(row => pair.update(row.copy(), null))
+ }
+ .map(p => p._1)
+ .collect()
+ }
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala
similarity index 89%
rename from
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
rename to
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala
index 050efbd..4a3a7c4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.avro
import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType
-class HoodieAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean)
- extends HoodieAvroSerializerTrait {
+class HoodieSparkAvroSerializer(rootCatalystType: DataType, rootAvroType:
Schema, nullable: Boolean)
+ extends HoodieAvroSerializer {
val avroSerializer = new AvroSerializer(rootCatalystType, rootAvroType,
nullable)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala
index 9b1b88d..8aa47ff 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala
@@ -17,11 +17,9 @@
package org.apache.hudi
-import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpressions
import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpression
-
-import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter,
GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan,
LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
-import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType,
StringType, StructField, StructType}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@@ -93,7 +91,7 @@ class TestConvertFilterToCatalystExpression {
} else {
expectExpression
}
- val exp = convertToCatalystExpressions(filters, tableSchema)
+ val exp = convertToCatalystExpression(filters, tableSchema)
if (removeQuotesIfNeed == null) {
assertEquals(exp.isEmpty, true)
} else {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 315a14c..18b639f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -26,6 +26,7 @@ import
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
+import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
@@ -39,6 +40,8 @@ import scala.collection.JavaConversions._
@Tag("functional")
class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
+ private val log = LogManager.getLogger(classOf[TestMORDataSourceStorage])
+
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
new file mode 100644
index 0000000..a963081
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.avro.Schema
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.testutils.{HadoopMapRedUtils,
HoodieTestDataGenerator}
+import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
+import org.apache.spark.HoodieUnsafeRDDUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Test}
+
+import scala.:+
+import scala.collection.JavaConverters._
+
+@Tag("functional")
+class TestParquetColumnProjection extends SparkClientFunctionalTestHarness
with Logging {
+
+ val defaultWriteOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+ "hoodie.delete.shuffle.parallelism" -> "1",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ // NOTE: It's critical that we use non-partitioned table, since the way we
track amount of bytes read
+ // is not robust, and works most reliably only when we read just a
single file. As such, making table
+ // non-partitioned makes it much more likely just a single file will
be written
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
classOf[NonpartitionedKeyGenerator].getName
+ )
+
+ @Test
+ def testBaseFileOnlyViewRelation(): Unit = {
+ val tablePath = s"$basePath/cow"
+ val targetRecordsCount = 100
+ val (_, schema) = bootstrapTable(tablePath,
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, targetRecordsCount,
+ defaultWriteOpts, populateMetaFields = true)
+ val tableState = TableState(tablePath, schema, targetRecordsCount, 0.0)
+
+ // Stats for the reads fetching only _projected_ columns (note how amount
of bytes read
+ // increases along w/ the # of columns)
+ val projectedColumnsReadStats: Array[(String, Long)] =
+ if (HoodieSparkUtils.isSpark3)
+ Array(
+ ("rider", 2452),
+ ("rider,driver", 2552),
+ ("rider,driver,tip_history", 3517))
+ else if (HoodieSparkUtils.isSpark2)
+ Array(
+ ("rider", 2595),
+ ("rider,driver", 2735),
+ ("rider,driver,tip_history", 3750))
+ else
+ fail("Only Spark 3 and Spark 2 are currently supported")
+
+ // Test COW / Snapshot
+ runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, "",
projectedColumnsReadStats)
+ }
+
+ @Test
+ def testMergeOnReadSnapshotRelationWithDeltaLogs(): Unit = {
+ val tablePath = s"$basePath/mor-with-logs"
+ val targetRecordsCount = 100
+ val targetUpdatedRecordsRatio = 0.5
+
+ val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount,
targetUpdatedRecordsRatio, defaultWriteOpts, populateMetaFields = true)
+ val tableState = TableState(tablePath, schema, targetRecordsCount,
targetUpdatedRecordsRatio)
+
+ // Stats for the reads fetching only _projected_ columns (note how amount
of bytes read
+ // increases along w/ the # of columns)
+ val projectedColumnsReadStats: Array[(String, Long)] =
+ if (HoodieSparkUtils.isSpark3)
+ Array(
+ ("rider", 2452),
+ ("rider,driver", 2552),
+ ("rider,driver,tip_history", 3517))
+ else if (HoodieSparkUtils.isSpark2)
+ Array(
+ ("rider", 2595),
+ ("rider,driver", 2735),
+ ("rider,driver,tip_history", 3750))
+ else
+ fail("Only Spark 3 and Spark 2 are currently supported")
+
+ // Stats for the reads fetching _all_ columns (note, how amount of bytes
read
+ // is invariant of the # of columns)
+ val fullColumnsReadStats: Array[(String, Long)] =
+ if (HoodieSparkUtils.isSpark3)
+ Array(
+ ("rider", 14665),
+ ("rider,driver", 14665),
+ ("rider,driver,tip_history", 14665))
+ else if (HoodieSparkUtils.isSpark2)
+ // TODO re-enable tests (these tests are very unstable currently)
+ Array(
+ ("rider", -1),
+ ("rider,driver", -1),
+ ("rider,driver,tip_history", -1))
+ else
+ fail("Only Spark 3 and Spark 2 are currently supported")
+
+ // Test MOR / Snapshot / Skip-merge
+ runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
+
+ // Test MOR / Snapshot / Payload-combine
+ runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats)
+
+ // Test MOR / Read Optimized
+ runTest(tableState,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null",
projectedColumnsReadStats)
+ }
+
+ @Test
+ def testMergeOnReadSnapshotRelationWithNoDeltaLogs(): Unit = {
+ val tablePath = s"$basePath/mor-no-logs"
+ val targetRecordsCount = 100
+ val targetUpdatedRecordsRatio = 0.0
+
+ val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount,
targetUpdatedRecordsRatio, defaultWriteOpts, populateMetaFields = true)
+ val tableState = TableState(tablePath, schema, targetRecordsCount,
targetUpdatedRecordsRatio)
+
+ //
+ // Test #1: MOR table w/ Delta Logs
+ //
+
+ // Stats for the reads fetching only _projected_ columns (note how amount
of bytes read
+ // increases along w/ the # of columns)
+ val projectedColumnsReadStats: Array[(String, Long)] =
+ if (HoodieSparkUtils.isSpark3)
+ Array(
+ ("rider", 2452),
+ ("rider,driver", 2552),
+ ("rider,driver,tip_history", 3517))
+ else if (HoodieSparkUtils.isSpark2)
+ Array(
+ ("rider", 2595),
+ ("rider,driver", 2735),
+ ("rider,driver,tip_history", 3750))
+ else
+ fail("Only Spark 3 and Spark 2 are currently supported")
+
+ // Test MOR / Snapshot / Skip-merge
+ runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
+
+ // Test MOR / Snapshot / Payload-combine
+ runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL,
projectedColumnsReadStats)
+
+ // Test MOR / Read Optimized
+ runTest(tableState,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null",
projectedColumnsReadStats)
+ }
+
+ // TODO add test for incremental query of the table with logs
+ @Test
+ def testMergeOnReadIncrementalRelationWithNoDeltaLogs(): Unit = {
+ val tablePath = s"$basePath/mor-no-logs"
+ val targetRecordsCount = 100
+ val targetUpdatedRecordsRatio = 0.0
+
+ val opts: Map[String, String] =
+ // NOTE: Parquet Compression is disabled as it was leading to
non-deterministic outcomes when testing
+ // against Spark 2.x
+ defaultWriteOpts ++
Seq(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key -> "")
+
+ val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount,
targetUpdatedRecordsRatio, opts, populateMetaFields = true)
+ val tableState = TableState(tablePath, schema, targetRecordsCount,
targetUpdatedRecordsRatio)
+
+ // Stats for the reads fetching only _projected_ columns (note how amount
of bytes read
+ // increases along w/ the # of columns)
+ val projectedColumnsReadStats: Array[(String, Long)] =
+ if (HoodieSparkUtils.isSpark3)
+ Array(
+ ("rider", 4219),
+ ("rider,driver", 4279),
+ ("rider,driver,tip_history", 5186))
+ else if (HoodieSparkUtils.isSpark2)
+ Array(
+ ("rider", 4430),
+ ("rider,driver", 4530),
+ ("rider,driver,tip_history", 5487))
+ else
+ fail("Only Spark 3 and Spark 2 are currently supported")
+
+ // Stats for the reads fetching _all_ columns (note, how amount of bytes
read
+ // is invariant of the # of columns)
+ val fullColumnsReadStats: Array[(String, Long)] =
+ if (HoodieSparkUtils.isSpark3)
+ Array(
+ ("rider", 19683),
+ ("rider,driver", 19683),
+ ("rider,driver,tip_history", 19683))
+ else if (HoodieSparkUtils.isSpark2)
+ // TODO re-enable tests (these tests are very unstable currently)
+ Array(
+ ("rider", -1),
+ ("rider,driver", -1),
+ ("rider,driver,tip_history", -1))
+ else
+ fail("Only Spark 3 and Spark 2 are currently supported")
+
+ val incrementalOpts: Map[String, String] = Map(
+ DataSourceReadOptions.BEGIN_INSTANTTIME.key -> "001"
+ )
+
+ // Test MOR / Incremental / Skip-merge
+ runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL,
+ projectedColumnsReadStats, incrementalOpts)
+
+ // Test MOR / Incremental / Payload-combine
+ runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL,
+ fullColumnsReadStats, incrementalOpts)
+ }
+
+
+ // Test routine
+ private def runTest(tableState: TableState,
+ queryType: String,
+ mergeType: String,
+ expectedStats: Array[(String, Long)],
+ additionalOpts: Map[String, String] = Map.empty): Unit =
{
+ val tablePath = tableState.path
+ val readOpts = defaultWriteOpts ++ Map(
+ "path" -> tablePath,
+ DataSourceReadOptions.QUERY_TYPE.key -> queryType,
+ DataSourceReadOptions.REALTIME_MERGE.key -> mergeType
+ ) ++ additionalOpts
+
+ val ds = new DefaultSource()
+ val relation: HoodieBaseRelation = ds.createRelation(spark.sqlContext,
readOpts).asInstanceOf[HoodieBaseRelation]
+
+ for ((columnListStr, expectedBytesRead) <- expectedStats) {
+ val targetColumns = columnListStr.split(",")
+
+ println(s"Running test for $tablePath / $queryType / $mergeType /
$columnListStr")
+
+ val (rows, bytesRead) = measureBytesRead { () =>
+ val rdd = relation.buildScan(targetColumns,
Array.empty).asInstanceOf[HoodieUnsafeRDD]
+ HoodieUnsafeRDDUtils.collect(rdd)
+ }
+
+ val targetRecordCount = tableState.targetRecordCount;
+ val targetUpdatedRecordsRatio = tableState.targetUpdatedRecordsRatio
+
+ val expectedRecordCount =
+ if
(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL.equals(mergeType))
targetRecordCount * (1 + targetUpdatedRecordsRatio)
+ else targetRecordCount
+
+ assertEquals(expectedRecordCount, rows.length)
+ if (expectedBytesRead != -1) {
+ assertEquals(expectedBytesRead, bytesRead)
+ } else {
+ logWarning(s"Not matching bytes read ($bytesRead)")
+ }
+
+ val readColumns = targetColumns ++ relation.mandatoryColumns
+ val (_, projectedStructType) =
HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
+
+ val row: InternalRow = rows.take(1).head
+
+ // This check is mostly about making sure InternalRow deserializes
properly into projected schema
+ val deserializedColumns = row.toSeq(projectedStructType)
+ assertEquals(readColumns.length, deserializedColumns.size)
+ }
+ }
+
+ private def bootstrapTable(path: String,
+ tableType: String,
+ recordCount: Int,
+ opts: Map[String, String],
+ populateMetaFields: Boolean,
+ dataGenOpt: Option[HoodieTestDataGenerator] =
None): (List[HoodieRecord[_]], Schema) = {
+ val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345))
+
+ // Bulk Insert Operation
+ val schema =
+ if (populateMetaFields)
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS
+ else HoodieTestDataGenerator.AVRO_SCHEMA
+
+ val records = dataGen.generateInserts("001", recordCount)
+ val inputDF: Dataset[Row] = toDataset(records,
HoodieTestDataGenerator.AVRO_SCHEMA)
+
+ inputDF.write.format("org.apache.hudi")
+ .options(opts)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(path)
+
+ (records.asScala.toList, schema)
+ }
+
+ private def bootstrapMORTable(path: String,
+ recordCount: Int,
+ updatedRecordsRatio: Double,
+ opts: Map[String, String],
+ populateMetaFields: Boolean,
+ dataGenOpt: Option[HoodieTestDataGenerator] =
None): (List[HoodieRecord[_]], Schema) = {
+ val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345))
+
+ // Step 1: Bootstrap table w/ N records (t/h bulk-insert)
+ val (insertedRecords, schema) = bootstrapTable(path,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, recordCount, opts,
populateMetaFields, Some(dataGen))
+
+ if (updatedRecordsRatio == 0) {
+ (insertedRecords, schema)
+ } else {
+ val updatesCount = (insertedRecords.length * updatedRecordsRatio).toInt
+ val recordsToUpdate = insertedRecords.take(updatesCount)
+ val updatedRecords = dataGen.generateUpdates("002",
recordsToUpdate.asJava)
+
+ // Step 2: Update M records out of those (t/h update)
+ val inputDF = toDataset(updatedRecords,
HoodieTestDataGenerator.AVRO_SCHEMA)
+
+ inputDF.write.format("org.apache.hudi")
+ .options(opts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(path)
+
+ (updatedRecords.asScala.toList ++ insertedRecords.drop(updatesCount),
schema)
+ }
+ }
+
+ def measureBytesRead[T](f: () => T): (T, Int) = {
+ // Init BenchmarkCounter to report number of bytes actually read from the
Block
+
BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter,
fs.getConf)
+ val r = f.apply()
+ val bytesRead = BenchmarkCounter.getBytesRead.toInt
+ (r, bytesRead)
+ }
+
+ case class TableState(path: String, schema: Schema, targetRecordCount: Long,
targetUpdatedRecordsRatio: Double)
+}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index f7fa233..54c8b91 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.adapter
import org.apache.avro.Schema
import org.apache.hudi.Spark2RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
-import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait,
HoodieAvroSerializerTrait, Spark2HoodieAvroDeserializer, HoodieAvroSerializer}
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer,
HoodieAvroSerializer, HoodieSpark2AvroDeserializer, HoodieSparkAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
@@ -42,11 +42,11 @@ import scala.collection.mutable.ArrayBuffer
*/
class Spark2Adapter extends SparkAdapter {
- def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean): HoodieAvroSerializerTrait =
- new HoodieAvroSerializer(rootCatalystType, rootAvroType, nullable)
+ def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean): HoodieAvroSerializer =
+ new HoodieSparkAvroSerializer(rootCatalystType, rootAvroType, nullable)
- def createAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType): HoodieAvroDeserializerTrait =
- new Spark2HoodieAvroDeserializer(rootAvroType, rootCatalystType)
+ def createAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType): HoodieAvroDeserializer =
+ new HoodieSpark2AvroDeserializer(rootAvroType, rootCatalystType)
override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]):
SparkRowSerDe = {
new Spark2RowSerDe(encoder)
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala
similarity index 76%
rename from
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala
rename to
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala
index ac2c82f..2b55c66 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala
@@ -21,13 +21,15 @@ import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType
/**
- * This is Spark 2 implementation for the [[HoodieAvroDeserializerTrait]]
leveraging [[PatchedAvroDeserializer]],
+ * This is Spark 2 implementation for the [[HoodieAvroDeserializer]]
leveraging [[PatchedAvroDeserializer]],
* which is just copied over version of [[AvroDeserializer]] from Spark 2.4.4
w/ SPARK-30267 being back-ported to it
*/
-class Spark2HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType)
- extends HoodieAvroDeserializerTrait {
+class HoodieSpark2AvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType)
+ extends HoodieAvroDeserializer {
private val avroDeserializer = new PatchedAvroDeserializer(rootAvroType,
rootCatalystType)
- def doDeserialize(data: Any): Any = avroDeserializer.deserialize(data)
+ // As of Spark 3.1, this will return data wrapped with Option, so we make
sure these interfaces
+ // are aligned across Spark versions
+ def deserialize(data: Any): Option[Any] =
Some(avroDeserializer.deserialize(data))
}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
index d0328fb..ad33832 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
import org.apache.hudi.Spark3RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.spark3.internal.ReflectUtil
-import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait,
HoodieAvroSerializerTrait, Spark3HoodieAvroDeserializer, HoodieAvroSerializer}
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer,
HoodieAvroSerializer, HoodieSpark3AvroDeserializer, HoodieSparkAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
@@ -43,11 +43,11 @@ import org.apache.spark.sql.{Row, SparkSession}
*/
class Spark3Adapter extends SparkAdapter {
- def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean): HoodieAvroSerializerTrait =
- new HoodieAvroSerializer(rootCatalystType, rootAvroType, nullable)
+ def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean): HoodieAvroSerializer =
+ new HoodieSparkAvroSerializer(rootCatalystType, rootAvroType, nullable)
- def createAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType): HoodieAvroDeserializerTrait =
- new Spark3HoodieAvroDeserializer(rootAvroType, rootCatalystType)
+ def createAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType): HoodieAvroDeserializer =
+ new HoodieSpark3AvroDeserializer(rootAvroType, rootCatalystType)
override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]):
SparkRowSerDe = {
new Spark3RowSerDe(encoder)
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala
similarity index 89%
rename from
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala
rename to
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala
index fa03f5d..bd9ead5 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala
@@ -21,8 +21,8 @@ import org.apache.avro.Schema
import org.apache.hudi.HoodieSparkUtils
import org.apache.spark.sql.types.DataType
-class Spark3HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType)
- extends HoodieAvroDeserializerTrait {
+class HoodieSpark3AvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType)
+ extends HoodieAvroDeserializer {
// SPARK-34404: As of Spark3.2, there is no AvroDeserializer's constructor
with Schema and DataType arguments.
// So use the reflection to get AvroDeserializer instance.
@@ -34,5 +34,5 @@ class Spark3HoodieAvroDeserializer(rootAvroType: Schema,
rootCatalystType: DataT
constructor.newInstance(rootAvroType, rootCatalystType)
}
- def doDeserialize(data: Any): Any = avroDeserializer.deserialize(data)
+ def deserialize(data: Any): Option[Any] = avroDeserializer.deserialize(data)
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
index a57be62..9a62c14 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
@@ -249,14 +249,17 @@ public class TestHDFSParquetImporter extends
FunctionalTestHarness implements Se
long startTime =
HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() /
1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
// 10 for update
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
for (long recordNum = 0; recordNum < 11; recordNum++) {
- records.add(new
HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0",
"rider-upsert-" + recordNum,
- "driver-upsert" + recordNum, startTime +
TimeUnit.HOURS.toSeconds(recordNum)));
+ records.add(
+ dataGen.generateGenericRecord(Long.toString(recordNum), "0",
"rider-upsert-" + recordNum,
+ "driver-upsert" + recordNum, startTime +
TimeUnit.HOURS.toSeconds(recordNum)));
}
// 4 for insert
for (long recordNum = 96; recordNum < 100; recordNum++) {
- records.add(new
HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0",
"rider-upsert-" + recordNum,
- "driver-upsert" + recordNum, startTime +
TimeUnit.HOURS.toSeconds(recordNum)));
+ records.add(
+ dataGen.generateGenericRecord(Long.toString(recordNum), "0",
"rider-upsert-" + recordNum,
+ "driver-upsert" + recordNum, startTime +
TimeUnit.HOURS.toSeconds(recordNum)));
}
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(srcFile)
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build())
{