garyli1019 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r434962196



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRelation.scala
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+/**
+  * This is Spark relation that can be used for querying metadata/fully 
bootstrapped query hudi tables, as well as
+  * non-bootstrapped tables. It implements PrunedFilteredScan interface in 
order to support column pruning and filter
+  * push-down. For metadata bootstrapped files, if we query columns from both 
metadata and actual data then it will
+  * perform a merge of both to return the result.
+  *
+  * Caveat: Filter push-down does not work when querying both metadata and 
actual data columns over metadata
+  * bootstrapped files, because then the metadata file and data file can 
return different number of rows causing errors
+  * merging.
+  *
+  * @param _sqlContext Spark SQL Context
+  * @param userSchema User specified schema in the datasource query
+  * @param globPaths Globbed paths obtained from the user provided path for 
querying
+  * @param metaClient Hudi table meta client
+  * @param optParams DataSource options passed by the user
+  */
+class HudiBootstrapRelation(@transient val _sqlContext: SQLContext,
+                            val userSchema: StructType,
+                            val globPaths: Seq[Path],
+                            val metaClient: HoodieTableMetaClient,
+                            val optParams: Map[String, String]) extends 
BaseRelation
+  with PrunedFilteredScan with Logging {
+
+  val skeletonSchema: StructType = HudiSparkUtils.getHudiMetadataSchema
+  var dataSchema: StructType = _
+  var fullSchema: StructType = _
+
+  val fileIndex: HudiBootstrapFileIndex = buildFileIndex()
+
+  override def sqlContext: SQLContext = _sqlContext
+
+  override val needConversion: Boolean = false
+
+  override def schema: StructType = inferFullSchema()
+
+  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    logInfo("Starting scan..")
+
+    // Compute splits
+    val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
+      var skeletonFile: Option[PartitionedFile] = Option.empty
+      var dataFile: PartitionedFile = null
+
+      if (hoodieBaseFile.getExternalBaseFile.isPresent) {
+        skeletonFile = Option(PartitionedFile(InternalRow.empty, 
hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
+        dataFile = PartitionedFile(InternalRow.empty, 
hoodieBaseFile.getExternalBaseFile.get().getPath, 0,
+          hoodieBaseFile.getExternalBaseFile.get().getFileLen)
+      } else {
+        dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 
0, hoodieBaseFile.getFileLen)
+      }
+      HudiBootstrapSplit(dataFile, skeletonFile)
+    })
+    val tableState = HudiBootstrapTableState(bootstrapSplits)
+
+    // Get required schemas for column pruning
+    var requiredDataSchema = StructType(Seq())
+    var requiredSkeletonSchema = StructType(Seq())
+    requiredColumns.foreach(col => {
+      var field = dataSchema.find(_.name == col)
+      if (field.isDefined) {
+        requiredDataSchema = requiredDataSchema.add(field.get)
+      } else {
+        field = skeletonSchema.find(_.name == col)
+        requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
+      }
+    })
+
+    // Prepare readers for reading data file and skeleton files
+    val dataReadFunction = new ParquetFileFormat()
+        .buildReaderWithPartitionValues(
+          sparkSession = _sqlContext.sparkSession,
+          dataSchema = dataSchema,
+          partitionSchema = StructType(Seq.empty),
+          requiredSchema = requiredDataSchema,
+          filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
+          options = Map.empty,
+          hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+        )
+
+    val skeletonReadFunction = new ParquetFileFormat()
+      .buildReaderWithPartitionValues(
+        sparkSession = _sqlContext.sparkSession,
+        dataSchema = skeletonSchema,
+        partitionSchema = StructType(Seq.empty),
+        requiredSchema = requiredSkeletonSchema,
+        filters = if (requiredDataSchema.isEmpty) filters else Seq(),
+        options = Map.empty,
+        hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+      )
+
+    val regularReadFunction = new ParquetFileFormat()
+      .buildReaderWithPartitionValues(
+        sparkSession = _sqlContext.sparkSession,
+        dataSchema = fullSchema,
+        partitionSchema = StructType(Seq.empty),
+        requiredSchema = StructType(requiredSkeletonSchema.fields ++ 
requiredDataSchema.fields),
+        filters = filters,
+        options = Map.empty,
+        hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
+
+    val rdd = new HudiBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, 
skeletonReadFunction,
+      regularReadFunction, requiredDataSchema, requiredSkeletonSchema, 
requiredColumns, tableState)
+    rdd.asInstanceOf[RDD[Row]]

Review comment:
       @umehrot2 I think we are on the same page 😄 
   ~~as long as we load the data as `RDD[Row]`, then it's very flexible. We can 
union different formats together~~
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to