alexeykudinkin commented on code in PR #6442:
URL: https://github.com/apache/hudi/pull/6442#discussion_r1019650046


##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala:
##########
@@ -82,6 +85,15 @@ case class HoodieInternalV2Table(spark: SparkSession,
     }.toArray
   }
 
+  override def newScanBuilder(caseInsensitiveStringMap: 
CaseInsensitiveStringMap): ScanBuilder = {
+    val scanOptions = buildHoodieScanConfig(caseInsensitiveStringMap, 
hoodieCatalogTable)
+    new HoodieBatchScanBuilder(spark, hoodieCatalogTable, scanOptions)
+  }
+
+  private def buildHoodieScanConfig(caseInsensitiveStringMap: 
CaseInsensitiveStringMap,

Review Comment:
   We can inline this method (it's trivial)



##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/HoodieBatchScanBuilder.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.source
+
+import org.apache.hudi.DataSourceReadOptions.QUERY_TYPE
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.{DataSourceOptionsHelper, DefaultSource, 
HoodieSparkUtils}
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns, 
SupportsRuntimeFiltering}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+
+class HoodieBatchScanBuilder(spark: SparkSession,
+                             hoodieCatalogTable: HoodieCatalogTable,
+                             options: Map[String, String])
+    extends ScanBuilder with SupportsPushDownFilters with 
SupportsPushDownRequiredColumns {
+  @transient lazy val hadoopConf = {
+    // Hadoop Configurations are case sensitive.
+    spark.sessionState.newHadoopConfWithOptions(options)
+  }
+
+  private var filterExpressions: Option[Expression] = None
+
+  private var filterArrays: Array[Filter] = Array.empty
+
+  private var expectedSchema: StructType= hoodieCatalogTable.tableSchema
+
+  override def build(): Scan = {
+    val relation = new DefaultSource().createRelation(new SQLContext(spark), 
options)
+    relation match {
+      case HadoopFsRelation(location, partitionSchema, dataSchema, _, _, 
options) =>

Review Comment:
   BTW this will be greatly helped by 
[RFC-64](https://github.com/apache/hudi/pull/7080) we're currently working on, 
so we'd sync and sequence our efforts to make sure there's minimal to no 
duplication in what we do



##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala:
##########
@@ -402,7 +402,7 @@ object Spark32PlusHoodieParquetFileFormat {
   /**
    * NOTE: This method is specific to Spark 3.2.0
    */
-  private def createParquetFilters(args: Any*): ParquetFilters = {
+  def createParquetFilters(args: Any*): ParquetFilters = {

Review Comment:
   Sorry, i don't think i understand. Can you please elaborate? Where these are 
used?



##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/SparkBatch.scala:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.source
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, RecordReader, TaskAttemptID, 
TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.SerDeHelper
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReader, PartitionReaderFactory}
+import org.apache.spark.sql.execution.PartitionedFileUtil
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, 
ParquetFooterReader, ParquetOptions, ParquetReadSupport, ParquetWriteSupport, 
Spark32PlusDataSourceUtils, Spark32PlusHoodieVectorizedParquetRecordReader, 
VectorizedParquetRecordReader}
+import 
org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.{createParquetFilters,
 createParquetReadSupport, createVectorizedParquetRecordReader, 
pruneInternalSchema, rebuildFilterFromParquet}
+import 
org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, 
PartitionReaderWithPartitionValues}
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
FilePartition, PartitionDirectory, PartitionedFile}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{AtomicType, DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+
+class SparkBatch(spark: SparkSession,
+                 selectedPartitions: Seq[PartitionDirectory],
+                 partitionSchema: StructType,
+                 requiredSchema: StructType,
+                 filters: Seq[Filter],
+                 options: Map[String, String],
+                 @transient hadoopConf: Configuration) extends Batch with 
Serializable {
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    // TODO support more accurate task planning.
+    val maxSplitBytes = FilePartition.maxSplitBytes(spark, selectedPartitions)
+    val splitFiles = selectedPartitions.flatMap { partition =>
+      partition.files.flatMap { file =>
+        val filePath = file.getPath
+        PartitionedFileUtil.splitFiles(
+          sparkSession = spark,
+          file = file,
+          filePath = filePath,
+          isSplitable = true,
+          maxSplitBytes = maxSplitBytes,
+          partitionValues = partition.values
+        )
+      }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    }
+    FilePartition.getFilePartitions(spark, splitFiles, maxSplitBytes).toArray
+
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      spark.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      spark.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      spark.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      spark.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      spark.sessionState.conf.isParquetINT96AsTimestamp)
+    val broadcastedConf = spark.sparkContext.broadcast(
+      new SerializableConfiguration(hadoopConf))
+    val sqlConf = spark.sessionState.conf
+    ReaderFactory(sqlConf, broadcastedConf)
+  }
+
+  case class ReaderFactory(sqlConf: SQLConf,
+                           broadcastedConf: 
Broadcast[SerializableConfiguration]) extends FilePartitionReaderFactory {
+    // TODO: if you move this into the closure it reverts to the default 
values.
+    // If true, enable using the custom RecordReader for parquet. This only 
works for
+    // a subset of the types (no complex types).
+    val resultSchema: StructType = StructType(partitionSchema.fields ++ 
requiredSchema.fields)

Review Comment:
   Sorry, i still i'm not sure i understand what we're copying this for. Can 
you please elaborate what exactly you're trying to modify here?
   
   We should copy code from Spark only in exceptional circumstances when 
there's just no other way around it. Otherwise we should avoid doing that at 
all costs.



##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/HoodieBatchScanBuilder.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.source
+
+import org.apache.hudi.DataSourceReadOptions.QUERY_TYPE
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.{DataSourceOptionsHelper, DefaultSource, 
HoodieSparkUtils}
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns, 
SupportsRuntimeFiltering}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+
+class HoodieBatchScanBuilder(spark: SparkSession,
+                             hoodieCatalogTable: HoodieCatalogTable,
+                             options: Map[String, String])
+    extends ScanBuilder with SupportsPushDownFilters with 
SupportsPushDownRequiredColumns {
+  @transient lazy val hadoopConf = {
+    // Hadoop Configurations are case sensitive.
+    spark.sessionState.newHadoopConfWithOptions(options)
+  }
+
+  private var filterExpressions: Option[Expression] = None
+
+  private var filterArrays: Array[Filter] = Array.empty
+
+  private var expectedSchema: StructType= hoodieCatalogTable.tableSchema
+
+  override def build(): Scan = {
+    val relation = new DefaultSource().createRelation(new SQLContext(spark), 
options)
+    relation match {
+      case HadoopFsRelation(location, partitionSchema, dataSchema, _, _, 
options) =>

Review Comment:
   Agreed, it's no small effort, and we should definitely think about how we 
can approach it incrementally. 
   But we also need to be mindful that we can't keep things in transitory state 
for long -- we can't have the integration spread b/w V1 and V2 and we have to 
commit and migrate fully either way, w/ a clear plan for the deprecation of the 
one.
   
   I'd suggest for feature of this scale we should actually write a proper RFC 
to cover all of these concerns. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to