This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/rc3-patched-for-test by this
push:
new 63d5f44bc2 Improve readability
63d5f44bc2 is described below
commit 63d5f44bc20a66412c36c6068d46887fdc53e1a3
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Apr 21 12:59:58 2022 -0700
Improve readability
---
.../main/scala/org/apache/hudi/BaseFileOnlyRelation.scala | 6 +++---
.../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala | 12 +++++++-----
.../org/apache/hudi/MergeOnReadIncrementalRelation.scala | 2 +-
.../scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala | 2 +-
.../apache/hudi/functional/TestParquetColumnProjection.scala | 4 ++--
5 files changed, 14 insertions(+), 12 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 3c667d2b42..c57f46a7b6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
-import
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat,
ParquetFileFormat}
+import
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
@@ -54,8 +54,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
override type FileSplit = HoodieBaseFileSplit
- override lazy val mandatoryColumns: Seq[String] =
- // TODO reconcile, record's key shouldn't be mandatory for base-file only
relation
+ override lazy val mandatoryFields: Seq[String] =
+ // TODO reconcile, record's key shouldn't be mandatory for base-file only
relation
Seq(recordKeyField)
override def imbueConfigs(sqlContext: SQLContext): Unit = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index aac57e1bbb..4b7177f4d6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -198,7 +198,10 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
*
* @VisibleInTests
*/
- val mandatoryColumns: Seq[String]
+ val mandatoryFields: Seq[String]
+
+ protected def mandatoryRootFields: Seq[String] =
+ mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
protected def timeline: HoodieTimeline =
// NOTE: We're including compaction here since it's not considering a
"commit" operation
@@ -245,7 +248,7 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
//
// (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS
THIS WILL BREAK THE UPSTREAM
// PROJECTION
- val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+ val fetchedColumns: Array[String] =
appendMandatoryRootFields(requiredColumns)
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns,
internalSchema)
@@ -361,12 +364,11 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
!SubqueryExpression.hasSubquery(condition)
}
- protected final def appendMandatoryColumns(requestedColumns: Array[String]):
Array[String] = {
+ protected final def appendMandatoryRootFields(requestedColumns:
Array[String]): Array[String] = {
// For a nested field in mandatory columns, we should first get the
root-level field, and then
// check for any missing column, as the requestedColumns should only
contain root-level fields
// We should only append root-level field as well
- val missing = mandatoryColumns.map(col =>
HoodieAvroUtils.getRootLevelFieldName(col))
- .filter(rootField => !requestedColumns.contains(rootField))
+ val missing = mandatoryRootFields.filter(rootField =>
!requestedColumns.contains(rootField))
requestedColumns ++ missing
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 6aa7007851..806a5e371d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -153,7 +153,7 @@ trait HoodieIncrementalRelationTrait extends
HoodieBaseRelation {
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}
- override lazy val mandatoryColumns: Seq[String] = {
+ override lazy val mandatoryFields: Seq[String] = {
// NOTE: This columns are required for Incremental flow to be able to
handle the rows properly, even in
// cases when no columns are requested to be fetched (for ex, when
using {@code count()} API)
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index a88eb63036..75bc96624e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -47,7 +47,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
override type FileSplit = HoodieMergeOnReadFileSplit
- override lazy val mandatoryColumns: Seq[String] =
+ override lazy val mandatoryFields: Seq[String] =
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
protected val mergeType: String =
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index f670450c3e..945d26be3f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
import org.apache.avro.Schema
import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{HoodieRecord,
OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{HoodieRecord,
OverwriteNonDefaultsWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.testutils.{HadoopMapRedUtils,
HoodieTestDataGenerator}
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
@@ -332,7 +332,7 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
logWarning(s"Not matching bytes read ($bytesRead)")
}
- val readColumns = targetColumns ++ relation.mandatoryColumns
+ val readColumns = targetColumns ++ relation.mandatoryFields
val (_, projectedStructType, _) =
HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
val row: InternalRow = rows.take(1).head