vinothchandar commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r603859896



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##########
@@ -57,6 +58,7 @@
   public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type";
   public static final String HOODIE_TABLE_VERSION_PROP_NAME = 
"hoodie.table.version";
   public static final String HOODIE_TABLE_PRECOMBINE_FIELD = 
"hoodie.table.precombine.field";
+  public static final String HOODIE_TABLE_PARTITION_COLUMNS = 
"hoodie.table.partition.columns";

Review comment:
       how does this work for existing tables? do we need an upgrade-downgrade 
step for writing this to `hoodie.properties`?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -79,39 +81,53 @@ class DefaultSource extends RelationProvider
     val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
 
     val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
-    val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
-    val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+    // Use the HoodieFileIndex only if the 'path' is not globbed.
+    // Or else we use the original way to read hoodie table.
+    val enableFileIndex = optParams.get(ENABLE_HOODIE_FILE_INDEX)
+      .map(_.toBoolean).getOrElse(DEFAULT_ENABLE_HOODIE_FILE_INDEX)
+    val useHoodieFileIndex = enableFileIndex && path.isDefined && 
!path.get.contains("*") &&
+      !parameters.contains(DataSourceReadOptions.READ_PATHS_OPT_KEY)
+    val globPaths = if (useHoodieFileIndex) {
+      None
+    } else {
+      Some(HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs))
+    }
+    // Get the table base path
+    val tablePath = if (globPaths.isDefined) {
+      DataSourceUtils.getTablePath(fs, globPaths.get.toArray)
+    } else {
+      DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
+    }
     log.info("Obtained hudi table path: " + tablePath)
 
     val metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
     val isBootstrappedTable = 
metaClient.getTableConfig.getBootstrapBasePath.isPresent
-    log.info("Is bootstrapped table => " + isBootstrappedTable)
-
-    if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
-      if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        if (isBootstrappedTable) {
-          // Snapshot query is not supported for Bootstrapped MOR tables
-          log.warn("Snapshot query is not supported for Bootstrapped 
Merge-on-Read tables." +
-            " Falling back to Read Optimized query.")
-          new HoodieBootstrapRelation(sqlContext, schema, globPaths, 
metaClient, optParams)
-        } else {
-          new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, 
globPaths, metaClient)
-        }
-      } else {
-        getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, 
isBootstrappedTable, globPaths, metaClient)
-      }
-    } else 
if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
-      getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, 
isBootstrappedTable, globPaths, metaClient)
-    } else if 
(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
-      val metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
-      if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, 
metaClient)
-      } else {
-        new IncrementalRelation(sqlContext, optParams, schema, metaClient)
-      }
-    } else {
-      throw new HoodieException("Invalid query type :" + 
parameters(QUERY_TYPE_OPT_KEY))
+    val tableType = metaClient.getTableType
+    val queryType = parameters(QUERY_TYPE_OPT_KEY)
+    log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: 
$tableType")
+
+    (tableType, queryType, isBootstrappedTable) match {

Review comment:
       this is very neat. thanks @pengzhiwei2018 ! 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -133,25 +132,47 @@ class MergeOnReadSnapshotRelation(val sqlContext: 
SQLContext,
   }
 
   def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
-    val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
-    val fileStatuses = inMemoryFileIndex.allFiles()
-    if (fileStatuses.isEmpty) {
-      throw new HoodieException("No files found for reading in user provided 
path.")
+    val fileStatuses = if (globPaths.isDefined) {
+      // Load files from the global paths if it has defined to be compatible 
with the original mode
+      val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
+      inMemoryFileIndex.allFiles()
+    } else { // Load files by the HoodieFileIndex.
+      val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, 
metaClient,
+        Some(tableStructSchema), optParams, 
FileStatusCache.getOrCreate(sqlContext.sparkSession))
+      hoodieFileIndex.allFiles
     }
 
-    val fsView = new HoodieTableFileSystemView(metaClient,
-      metaClient.getActiveTimeline.getCommitsTimeline
-        .filterCompletedInstants, fileStatuses.toArray)
-    val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
-    val latestCommit = fsView.getLastInstant.get().getTimestamp
-    val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
latestFiles.asJava).asScala
-    val fileSplits = fileGroup.map(kv => {
-      val baseFile = kv._1
-      val logPaths = if (kv._2.isEmpty) Option.empty else 
Option(kv._2.asScala.toList)
-      val partitionedFile = PartitionedFile(InternalRow.empty, 
baseFile.getPath, 0, baseFile.getFileLen)
-      HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, 
latestCommit,
-        metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
-    }).toList
-    fileSplits
+    if (fileStatuses.isEmpty) { // If this an empty table, return an empty 
split list.
+      List.empty[HoodieMergeOnReadFileSplit]
+    } else {
+      val fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline.getCommitsTimeline
+          .filterCompletedInstants, fileStatuses.toArray)
+      val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
+      val latestCommit = fsView.getLastInstant.get().getTimestamp
+      val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
latestFiles.asJava).asScala
+      val fileSplits = fileGroup.map(kv => {
+        val baseFile = kv._1
+        val logPaths = if (kv._2.isEmpty) Option.empty else 
Option(kv._2.asScala.toList)
+
+        // Here we use the Path#toUri to encode the path string, as there is a 
decode in

Review comment:
       can we avoid repeating this long comment in two places? and shared code 
in some common helper?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -112,12 +112,15 @@ private[hudi] object HoodieSparkSqlWriter {
         val archiveLogFolder = parameters.getOrElse(
           HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
 
+        val partitionColumns = 
parameters.getOrElse(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, null)

Review comment:
       Understood. but what about a custom key generator which does not depend 
on `DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY` ? Should we add handle 
some errors in that scenario? or turn off the use of file index?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to