nsivabalan commented on a change in pull request #2431:
URL: https://github.com/apache/hudi/pull/2431#discussion_r567464581



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -181,16 +183,37 @@ object DataSourceWriteOptions {
   @Deprecated
   val DEFAULT_STORAGE_TYPE_OPT_VAL = COW_STORAGE_TYPE_OPT_VAL
 
-  def translateStorageTypeToTableType(optParams: Map[String, String]) : 
Map[String, String] = {
+  def translateOptParams(optParams: Map[String, String]): Map[String, String] 
= {

Review comment:
       my 2 cents. May be we can introduce a new method here which should be 
invoked directly. As of now, this is called within 
parametersWithWriteDefaults() which does not sounds right. So, may be an 
explicit call after  parametersWithWriteDefaults() returns. something like 
translateSqlOptions() or something which will take care of doing this 
translations. bcoz, exiting method only takes care of some deprecated options, 
but here we are trying to translate a sql option to Hudi option/config param. 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -181,16 +183,37 @@ object DataSourceWriteOptions {
   @Deprecated
   val DEFAULT_STORAGE_TYPE_OPT_VAL = COW_STORAGE_TYPE_OPT_VAL
 
-  def translateStorageTypeToTableType(optParams: Map[String, String]) : 
Map[String, String] = {
+  def translateOptParams(optParams: Map[String, String]): Map[String, String] 
= {
+    // translate StorageType to TableType
+    var newOptParams = optParams
     if (optParams.contains(STORAGE_TYPE_OPT_KEY) && 
!optParams.contains(TABLE_TYPE_OPT_KEY)) {
       log.warn(STORAGE_TYPE_OPT_KEY + " is deprecated and will be removed in a 
later release; Please use " + TABLE_TYPE_OPT_KEY)
-      optParams ++ Map(TABLE_TYPE_OPT_KEY -> optParams(STORAGE_TYPE_OPT_KEY))
-    } else {
-      optParams
+      newOptParams = optParams ++ Map(TABLE_TYPE_OPT_KEY -> 
optParams(STORAGE_TYPE_OPT_KEY))
     }
+    // translate the api partitionBy of spark DataFrameWriter to 
PARTITIONPATH_FIELD_OPT_KEY
+    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY) && 
!optParams.contains(PARTITIONPATH_FIELD_OPT_KEY)) {
+      val partitionColumns = 
optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+        .map(SparkDataSourceUtils.decodePartitioningColumns)
+        .getOrElse(Nil)
+      val keyGeneratorClass = 
optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY,
+        DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
+      val partitionPathField =
+        keyGeneratorClass match {
+          case "org.apache.hudi.keygen.CustomKeyGenerator" =>

Review comment:
       Can we do classOf[CustomKeyGenerator].getName rather than hardcoding the 
full path. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -348,4 +351,65 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
   }
+
+  @Test def testPartitionByTranslateToPartitionPath() {
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val noPartitionPathOpts = commonOpts - 
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+
+    // partitionBy takes effect
+    inputDF.write.format("hudi")
+      .partitionBy("current_date")
+      .options(noPartitionPathOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    var recordsReadDF = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*")
+
+    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
col("current_date").cast("string")).count() == 0)
+
+    // PARTITIONPATH_FIELD_OPT_KEY takes effect
+    inputDF.write.format("hudi")
+      .partitionBy("current_date")
+      .options(noPartitionPathOpts)
+      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    recordsReadDF = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*")
+
+    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
col("timestamp").cast("string")).count() == 0)
+
+    // CustomKeyGenerator with SIMPLE

Review comment:
       also can we have one failure test. that invalid format should result in 
failure. TestCustomKeyGenerator should have tests for your reference. 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -181,16 +183,33 @@ object DataSourceWriteOptions {
   @Deprecated
   val DEFAULT_STORAGE_TYPE_OPT_VAL = COW_STORAGE_TYPE_OPT_VAL
 
-  def translateStorageTypeToTableType(optParams: Map[String, String]) : 
Map[String, String] = {
+  def translateOptParams(optParams: Map[String, String]): Map[String, String] 
= {
+    // translate StorageType to TableType
+    var newOptParams = optParams
     if (optParams.contains(STORAGE_TYPE_OPT_KEY) && 
!optParams.contains(TABLE_TYPE_OPT_KEY)) {
       log.warn(STORAGE_TYPE_OPT_KEY + " is deprecated and will be removed in a 
later release; Please use " + TABLE_TYPE_OPT_KEY)
-      optParams ++ Map(TABLE_TYPE_OPT_KEY -> optParams(STORAGE_TYPE_OPT_KEY))
-    } else {
-      optParams
+      newOptParams = optParams ++ Map(TABLE_TYPE_OPT_KEY -> 
optParams(STORAGE_TYPE_OPT_KEY))
     }
+    // translate the api partitionBy of spark DataFrameWriter to 
PARTITIONPATH_FIELD_OPT_KEY
+    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY) && 
!optParams.contains(PARTITIONPATH_FIELD_OPT_KEY)) {
+      val partitionColumns = 
optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+        .map(SparkDataSourceUtils.decodePartitioningColumns)
+        .getOrElse(Nil)
+
+      val keyGeneratorClass = 
optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY,
+        DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
+      val partitionPathField =
+        keyGeneratorClass match {
+          case "org.apache.hudi.keygen.CustomKeyGenerator" =>

Review comment:
       Can you please add a one java doc here wrt the format expected by 
CustomKeyGenerator. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -46,6 +46,11 @@ class DefaultSource extends RelationProvider
   with StreamSinkProvider
   with Serializable {
 
+  SparkSession.getActiveSession.foreach { spark =>
+    // Enable "passPartitionByAsOptions" to support "write.partitionBy(...)"
+    spark.conf.set("spark.sql.legacy.sources.write.passPartitionByAsOptions", 
"true")

Review comment:
       from what I understand, this is not required in spark 3. If that's the 
case, can you fetch spark version and set this only if spark 2 or Lower. bcoz, 
even today one can run Hudi w/ spark3. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -348,4 +351,65 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
   }
+
+  @Test def testPartitionByTranslateToPartitionPath() {
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val noPartitionPathOpts = commonOpts - 
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+
+    // partitionBy takes effect
+    inputDF.write.format("hudi")
+      .partitionBy("current_date")
+      .options(noPartitionPathOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    var recordsReadDF = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*")
+
+    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
col("current_date").cast("string")).count() == 0)
+
+    // PARTITIONPATH_FIELD_OPT_KEY takes effect
+    inputDF.write.format("hudi")

Review comment:
       can we create separate tests for diff key gens. Also, can we please make 
a private method and re-use the code in every test if possible. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -348,4 +351,65 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
   }
+
+  @Test def testPartitionByTranslateToPartitionPath() {
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val noPartitionPathOpts = commonOpts - 
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+
+    // partitionBy takes effect
+    inputDF.write.format("hudi")
+      .partitionBy("current_date")
+      .options(noPartitionPathOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    var recordsReadDF = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*")
+
+    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
col("current_date").cast("string")).count() == 0)
+
+    // PARTITIONPATH_FIELD_OPT_KEY takes effect
+    inputDF.write.format("hudi")
+      .partitionBy("current_date")
+      .options(noPartitionPathOpts)
+      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    recordsReadDF = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*")
+
+    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
col("timestamp").cast("string")).count() == 0)
+
+    // CustomKeyGenerator with SIMPLE
+    inputDF.write.format("hudi")
+      .partitionBy("current_ts")
+      .options(noPartitionPathOpts)
+      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.CustomKeyGenerator")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    recordsReadDF = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*")
+
+    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
col("current_ts").cast("string")).count() == 0)
+
+    // CustomKeyGenerator with TIMESTAMP
+    inputDF.write.format("hudi")
+      .partitionBy("current_ts")
+      .options(noPartitionPathOpts)
+      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.CustomKeyGenerator")
+      .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
+      .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    recordsReadDF = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*")
+
+    val udf_date_format = udf((data: Long) => new 
DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd")))

Review comment:
       Can you please add more tests covering all keygens. ComplexKeygen, 
TimestampBasedKeyGen. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -348,4 +351,65 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
   }
+
+  @Test def testPartitionByTranslateToPartitionPath() {
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val noPartitionPathOpts = commonOpts - 
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+
+    // partitionBy takes effect
+    inputDF.write.format("hudi")
+      .partitionBy("current_date")
+      .options(noPartitionPathOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    var recordsReadDF = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*")
+
+    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
col("current_date").cast("string")).count() == 0)
+
+    // PARTITIONPATH_FIELD_OPT_KEY takes effect
+    inputDF.write.format("hudi")
+      .partitionBy("current_date")
+      .options(noPartitionPathOpts)
+      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    recordsReadDF = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*")
+
+    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
col("timestamp").cast("string")).count() == 0)
+
+    // CustomKeyGenerator with SIMPLE

Review comment:
       can you please add one more test for CustomKeyGenerator covering the 
format "field1:simple,field2:timestamp". bcoz, this is the only one that has 
special handling. would be nice to have more tests around them. 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -181,16 +183,37 @@ object DataSourceWriteOptions {
   @Deprecated
   val DEFAULT_STORAGE_TYPE_OPT_VAL = COW_STORAGE_TYPE_OPT_VAL
 
-  def translateStorageTypeToTableType(optParams: Map[String, String]) : 
Map[String, String] = {
+  def translateOptParams(optParams: Map[String, String]): Map[String, String] 
= {
+    // translate StorageType to TableType
+    var newOptParams = optParams

Review comment:
       may be "translatedOptParams"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to