vinothchandar commented on code in PR #10352:
URL: https://github.com/apache/hudi/pull/10352#discussion_r1486955424


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -346,6 +352,12 @@ case class HoodieFileIndex(spark: SparkSession,
       Option.empty
     } else if (recordKeys.nonEmpty) {
       Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), 
recordKeys))
+    } else if (recordKeys.nonEmpty && partitionStatsIndex.isIndexAvailable && 
!queryFilters.isEmpty) {
+      val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
+      val shouldReadInMemory = partitionStatsIndex.shouldReadInMemory(this, 
queryReferencedColumns)

Review Comment:
   Can you please point me to the piece of code/line numbers where we identify 
a referenced column as a partitioned column? (on read side) 
   and how `.partitionBy("a,b,c")` turns into partition stats on the metadata 
write side?
   
   That part is a still unclear to me. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -346,6 +352,12 @@ case class HoodieFileIndex(spark: SparkSession,
       Option.empty
     } else if (recordKeys.nonEmpty) {
       Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), 
recordKeys))
+    } else if (recordKeys.nonEmpty && partitionStatsIndex.isIndexAvailable && 
!queryFilters.isEmpty) {

Review Comment:
   why is `recordKeys` being non empty coupled with partitionStats index?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, 
HoodieMetadataRecord}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.data.HoodieData
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.hash.ColumnIndexID
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, 
HoodieTableMetadataUtil}
+import org.apache.hudi.util.JFunction
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+class PartitionStatsIndexSupport(spark: SparkSession,
+                                 tableSchema: StructType,
+                                 @transient metadataConfig: 
HoodieMetadataConfig,
+                                 @transient metaClient: HoodieTableMetaClient,
+                                 allowCaching: Boolean = false)
+  extends ColumnStatsIndexSupport(spark, tableSchema, metadataConfig, 
metaClient, allowCaching) {
+
+  @transient private lazy val engineCtx = new HoodieSparkEngineContext(new 
JavaSparkContext(spark.sparkContext))
+  @transient private lazy val metadataTable: HoodieTableMetadata =
+    HoodieTableMetadata.create(engineCtx, metadataConfig, 
metaClient.getBasePathV2.toString)
+
+  override def isIndexAvailable: Boolean = {
+    checkState(metadataConfig.enabled, "Metadata Table support has to be 
enabled")

Review Comment:
   also check for partition stats enbled?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, 
HoodieMetadataRecord}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.data.HoodieData
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.hash.ColumnIndexID
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, 
HoodieTableMetadataUtil}
+import org.apache.hudi.util.JFunction
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+class PartitionStatsIndexSupport(spark: SparkSession,
+                                 tableSchema: StructType,
+                                 @transient metadataConfig: 
HoodieMetadataConfig,
+                                 @transient metaClient: HoodieTableMetaClient,
+                                 allowCaching: Boolean = false)
+  extends ColumnStatsIndexSupport(spark, tableSchema, metadataConfig, 
metaClient, allowCaching) {
+
+  @transient private lazy val engineCtx = new HoodieSparkEngineContext(new 
JavaSparkContext(spark.sparkContext))
+  @transient private lazy val metadataTable: HoodieTableMetadata =

Review Comment:
   should n't we be sharing an instance and passing it down to all the 
different skipping/pruning classes?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
+import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieFileIndex}
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
+import org.apache.spark.sql.types.StringType
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import scala.collection.JavaConversions._
+
+/**
+ * Test cases on partition stats index with Spark datasource.
+ */
+@Tag("functional")
+class TestPartitionStatsIndex extends PartitionStatsIndexTestBase {
+
+  val sqlTempTable = "hudi_tbl"
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testIndexInitialization(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name())
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+  }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testIndexWithUpsert(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name())
+    doWriteAndValidateDataAndPartitionStats(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+  }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testIndexWithUpsertNonPartitioned(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts - PARTITIONPATH_FIELD.key + 
(DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+  }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testIndexUpsertAndRollback(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name())
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    rollbackLastInstant(hudiOpts)
+    validateDataAndPartitionStats()
+  }
+
+  @Test
+  def testPartitionStatsIndexWithSQL(): Unit = {
+    var hudiOpts = commonOpts
+    hudiOpts = hudiOpts + (
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = false)
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      validate = false)
+
+    createTempTable(hudiOpts)
+    verifyQueryPredicate(hudiOpts)
+  }
+
+  @Test
+  def testPartitionStatsWithPartitionBy(): Unit = {
+    val hudiOpts = commonOpts.-(PARTITIONPATH_FIELD.key)
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    inputDF.write.partitionBy("partition").format("hudi")

Review Comment:
   I found the directory structure to be as follows 
   
   ```
   partition=2015
                          |- 03
                                |- 15
                                |- 16
   ```
   
   and not sth like `partition=2015-03-16`  - is this expected? Did we just not 
escape `\` here? The resulting partition structure is neither hive-style nor 
hudi-style?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to