alexeykudinkin commented on code in PR #5165:
URL: https://github.com/apache/hudi/pull/5165#discussion_r1019634073
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -152,6 +152,11 @@ object DataSourceReadOptions {
val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] =
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
+ val INCREMENTAL_VECTORIZED_READER_ENABLE: ConfigProperty[String] =
ConfigProperty
Review Comment:
nit: "enabled"
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -122,6 +144,17 @@ class MergeOnReadIncrementalRelation(sqlContext:
SQLContext,
}
filteredFileSlices
}
+
+ private def setUpFilterParams(requiredSchema: HoodieTableSchema):
FilterParams = {
+ val attrs = requiredSchema.structTypeSchema.map(f =>
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+ val filterExpression =
convertToExpressions(incrementalSpanRecordFilters.toArray).map { e =>
+ e transform {
+ case a: AttributeReference =>
+ attrs.find(_.name.equalsIgnoreCase(a.name)).get
Review Comment:
Instead of manipulating expressions like that we should instead fix
`convertToExpressions` by:
- Making it produce unresolved expressions, replacing name-refs w/
`UnresolvedAttribute` instead of `AttributeReference` during conversion
- Then resolve converted expression using `resolveExpr` (in the same
utility)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -347,14 +347,25 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
- val rdd = composeRDD(fileSplits, tableSchema, requiredSchema,
targetColumns, filters)
+ val rdd = postOperationOnComposeRDD(composeRDD(fileSplits, tableSchema,
requiredSchema, targetColumns, filters), requiredSchema)
// Here we rely on a type erasure, to workaround inherited API
restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
rdd.asInstanceOf[RDD[Row]]
}
}
+ /**
+ * Post operation on ComposeRDD, data filters to be applied.
+ *
+ * @param rdd file splits to be handled by the RDD
+ * @param requiredSchema projected schema required by the reader
+ * @return handled RDD
+ */
+ protected def postOperationOnComposeRDD(rdd: RDD[InternalRow],
requiredSchema: HoodieTableSchema): RDD[Row] = {
Review Comment:
Can we implement this custom logic w/in the `composeRDD` itself? Why do we
need a separate method for it?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -122,6 +144,17 @@ class MergeOnReadIncrementalRelation(sqlContext:
SQLContext,
}
filteredFileSlices
}
+
+ private def setUpFilterParams(requiredSchema: HoodieTableSchema):
FilterParams = {
+ val attrs = requiredSchema.structTypeSchema.map(f =>
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
Review Comment:
You can just do `schema.toAttributes`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -152,6 +152,11 @@ object DataSourceReadOptions {
val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] =
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
+ val INCREMENTAL_VECTORIZED_READER_ENABLE: ConfigProperty[String] =
ConfigProperty
+ .key("hoodie.datasource.read.incr.vectorized.reader.enable")
+ .defaultValue("true")
+ .withDocumentation("Enable vectorized reader for mor table incremental
query.")
Review Comment:
This config should be applicable for both COW/MOR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]