alexeykudinkin commented on code in PR #6442:
URL: https://github.com/apache/hudi/pull/6442#discussion_r1019650046
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala:
##########
@@ -82,6 +85,15 @@ case class HoodieInternalV2Table(spark: SparkSession,
}.toArray
}
+ override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
+ val scanOptions = buildHoodieScanConfig(caseInsensitiveStringMap,
hoodieCatalogTable)
+ new HoodieBatchScanBuilder(spark, hoodieCatalogTable, scanOptions)
+ }
+
+ private def buildHoodieScanConfig(caseInsensitiveStringMap:
CaseInsensitiveStringMap,
Review Comment:
We can inline this method (it's trivial)
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/HoodieBatchScanBuilder.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.source
+
+import org.apache.hudi.DataSourceReadOptions.QUERY_TYPE
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.{DataSourceOptionsHelper, DefaultSource,
HoodieSparkUtils}
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownFilters, SupportsPushDownRequiredColumns,
SupportsRuntimeFiltering}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+
+class HoodieBatchScanBuilder(spark: SparkSession,
+ hoodieCatalogTable: HoodieCatalogTable,
+ options: Map[String, String])
+ extends ScanBuilder with SupportsPushDownFilters with
SupportsPushDownRequiredColumns {
+ @transient lazy val hadoopConf = {
+ // Hadoop Configurations are case sensitive.
+ spark.sessionState.newHadoopConfWithOptions(options)
+ }
+
+ private var filterExpressions: Option[Expression] = None
+
+ private var filterArrays: Array[Filter] = Array.empty
+
+ private var expectedSchema: StructType= hoodieCatalogTable.tableSchema
+
+ override def build(): Scan = {
+ val relation = new DefaultSource().createRelation(new SQLContext(spark),
options)
+ relation match {
+ case HadoopFsRelation(location, partitionSchema, dataSchema, _, _,
options) =>
Review Comment:
BTW this will be greatly helped by
[RFC-64](https://github.com/apache/hudi/pull/7080) we're currently working on,
so we'd sync and sequence our efforts to make sure there's minimal to no
duplication in what we do
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala:
##########
@@ -402,7 +402,7 @@ object Spark32PlusHoodieParquetFileFormat {
/**
* NOTE: This method is specific to Spark 3.2.0
*/
- private def createParquetFilters(args: Any*): ParquetFilters = {
+ def createParquetFilters(args: Any*): ParquetFilters = {
Review Comment:
Sorry, i don't think i understand. Can you please elaborate? Where these are
used?
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/SparkBatch.scala:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, RecordReader, TaskAttemptID,
TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.SerDeHelper
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.connector.read.{Batch, InputPartition,
PartitionReader, PartitionReaderFactory}
+import org.apache.spark.sql.execution.PartitionedFileUtil
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters,
ParquetFooterReader, ParquetOptions, ParquetReadSupport, ParquetWriteSupport,
Spark32PlusDataSourceUtils, Spark32PlusHoodieVectorizedParquetRecordReader,
VectorizedParquetRecordReader}
+import
org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.{createParquetFilters,
createParquetReadSupport, createVectorizedParquetRecordReader,
pruneInternalSchema, rebuildFilterFromParquet}
+import
org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory,
PartitionReaderWithPartitionValues}
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
FilePartition, PartitionDirectory, PartitionedFile}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{AtomicType, DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+
+class SparkBatch(spark: SparkSession,
+ selectedPartitions: Seq[PartitionDirectory],
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ @transient hadoopConf: Configuration) extends Batch with
Serializable {
+
+ override def planInputPartitions(): Array[InputPartition] = {
+ // TODO support more accurate task planning.
+ val maxSplitBytes = FilePartition.maxSplitBytes(spark, selectedPartitions)
+ val splitFiles = selectedPartitions.flatMap { partition =>
+ partition.files.flatMap { file =>
+ val filePath = file.getPath
+ PartitionedFileUtil.splitFiles(
+ sparkSession = spark,
+ file = file,
+ filePath = filePath,
+ isSplitable = true,
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partition.values
+ )
+ }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ }
+ FilePartition.getFilePartitions(spark, splitFiles, maxSplitBytes).toArray
+
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(
+ ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ requiredSchema.json)
+ hadoopConf.set(
+ ParquetWriteSupport.SPARK_ROW_SCHEMA,
+ requiredSchema.json)
+ hadoopConf.set(
+ SQLConf.SESSION_LOCAL_TIMEZONE.key,
+ spark.sessionState.conf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(
+ SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+ spark.sessionState.conf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(
+ SQLConf.CASE_SENSITIVE.key,
+ spark.sessionState.conf.caseSensitiveAnalysis)
+
+ ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+ // Sets flags for `ParquetToSparkSchemaConverter`
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_BINARY_AS_STRING.key,
+ spark.sessionState.conf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ spark.sessionState.conf.isParquetINT96AsTimestamp)
+ val broadcastedConf = spark.sparkContext.broadcast(
+ new SerializableConfiguration(hadoopConf))
+ val sqlConf = spark.sessionState.conf
+ ReaderFactory(sqlConf, broadcastedConf)
+ }
+
+ case class ReaderFactory(sqlConf: SQLConf,
+ broadcastedConf:
Broadcast[SerializableConfiguration]) extends FilePartitionReaderFactory {
+ // TODO: if you move this into the closure it reverts to the default
values.
+ // If true, enable using the custom RecordReader for parquet. This only
works for
+ // a subset of the types (no complex types).
+ val resultSchema: StructType = StructType(partitionSchema.fields ++
requiredSchema.fields)
Review Comment:
Sorry, i still i'm not sure i understand what we're copying this for. Can
you please elaborate what exactly you're trying to modify here?
We should copy code from Spark only in exceptional circumstances when
there's just no other way around it. Otherwise we should avoid doing that at
all costs.
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/HoodieBatchScanBuilder.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.source
+
+import org.apache.hudi.DataSourceReadOptions.QUERY_TYPE
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.{DataSourceOptionsHelper, DefaultSource,
HoodieSparkUtils}
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownFilters, SupportsPushDownRequiredColumns,
SupportsRuntimeFiltering}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+
+class HoodieBatchScanBuilder(spark: SparkSession,
+ hoodieCatalogTable: HoodieCatalogTable,
+ options: Map[String, String])
+ extends ScanBuilder with SupportsPushDownFilters with
SupportsPushDownRequiredColumns {
+ @transient lazy val hadoopConf = {
+ // Hadoop Configurations are case sensitive.
+ spark.sessionState.newHadoopConfWithOptions(options)
+ }
+
+ private var filterExpressions: Option[Expression] = None
+
+ private var filterArrays: Array[Filter] = Array.empty
+
+ private var expectedSchema: StructType= hoodieCatalogTable.tableSchema
+
+ override def build(): Scan = {
+ val relation = new DefaultSource().createRelation(new SQLContext(spark),
options)
+ relation match {
+ case HadoopFsRelation(location, partitionSchema, dataSchema, _, _,
options) =>
Review Comment:
Agreed, it's no small effort, and we should definitely think about how we
can approach it incrementally.
But we also need to be mindful that we can't keep things in transitory state
for long -- we can't have the integration spread b/w V1 and V2 and we have to
commit and migrate fully either way, w/ a clear plan for the deprecation of the
one.
I'd suggest for feature of this scale we should actually write a proper RFC
to cover all of these concerns. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]