nsivabalan commented on a change in pull request #2431:
URL: https://github.com/apache/hudi/pull/2431#discussion_r567464581
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -181,16 +183,37 @@ object DataSourceWriteOptions {
@Deprecated
val DEFAULT_STORAGE_TYPE_OPT_VAL = COW_STORAGE_TYPE_OPT_VAL
- def translateStorageTypeToTableType(optParams: Map[String, String]) :
Map[String, String] = {
+ def translateOptParams(optParams: Map[String, String]): Map[String, String]
= {
Review comment:
my 2 cents. May be we can introduce a new method here which should be
invoked directly. As of now, this is called within
parametersWithWriteDefaults() which does not sounds right. So, may be an
explicit call after parametersWithWriteDefaults() returns. something like
translateSqlOptions() or something which will take care of doing this
translations. bcoz, exiting method only takes care of some deprecated options,
but here we are trying to translate a sql option to Hudi option/config param.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -181,16 +183,37 @@ object DataSourceWriteOptions {
@Deprecated
val DEFAULT_STORAGE_TYPE_OPT_VAL = COW_STORAGE_TYPE_OPT_VAL
- def translateStorageTypeToTableType(optParams: Map[String, String]) :
Map[String, String] = {
+ def translateOptParams(optParams: Map[String, String]): Map[String, String]
= {
+ // translate StorageType to TableType
+ var newOptParams = optParams
if (optParams.contains(STORAGE_TYPE_OPT_KEY) &&
!optParams.contains(TABLE_TYPE_OPT_KEY)) {
log.warn(STORAGE_TYPE_OPT_KEY + " is deprecated and will be removed in a
later release; Please use " + TABLE_TYPE_OPT_KEY)
- optParams ++ Map(TABLE_TYPE_OPT_KEY -> optParams(STORAGE_TYPE_OPT_KEY))
- } else {
- optParams
+ newOptParams = optParams ++ Map(TABLE_TYPE_OPT_KEY ->
optParams(STORAGE_TYPE_OPT_KEY))
}
+ // translate the api partitionBy of spark DataFrameWriter to
PARTITIONPATH_FIELD_OPT_KEY
+ if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY) &&
!optParams.contains(PARTITIONPATH_FIELD_OPT_KEY)) {
+ val partitionColumns =
optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+ .map(SparkDataSourceUtils.decodePartitioningColumns)
+ .getOrElse(Nil)
+ val keyGeneratorClass =
optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY,
+ DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
+ val partitionPathField =
+ keyGeneratorClass match {
+ case "org.apache.hudi.keygen.CustomKeyGenerator" =>
Review comment:
Can we do classOf[CustomKeyGenerator].getName rather than hardcoding the
full path.
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -348,4 +351,65 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}
+
+ @Test def testPartitionByTranslateToPartitionPath() {
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val noPartitionPathOpts = commonOpts -
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+
+ // partitionBy takes effect
+ inputDF.write.format("hudi")
+ .partitionBy("current_date")
+ .options(noPartitionPathOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ var recordsReadDF = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*")
+
+ assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
col("current_date").cast("string")).count() == 0)
+
+ // PARTITIONPATH_FIELD_OPT_KEY takes effect
+ inputDF.write.format("hudi")
+ .partitionBy("current_date")
+ .options(noPartitionPathOpts)
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ recordsReadDF = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*")
+
+ assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
col("timestamp").cast("string")).count() == 0)
+
+ // CustomKeyGenerator with SIMPLE
Review comment:
also can we have one failure test. that invalid format should result in
failure. TestCustomKeyGenerator should have tests for your reference.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -181,16 +183,33 @@ object DataSourceWriteOptions {
@Deprecated
val DEFAULT_STORAGE_TYPE_OPT_VAL = COW_STORAGE_TYPE_OPT_VAL
- def translateStorageTypeToTableType(optParams: Map[String, String]) :
Map[String, String] = {
+ def translateOptParams(optParams: Map[String, String]): Map[String, String]
= {
+ // translate StorageType to TableType
+ var newOptParams = optParams
if (optParams.contains(STORAGE_TYPE_OPT_KEY) &&
!optParams.contains(TABLE_TYPE_OPT_KEY)) {
log.warn(STORAGE_TYPE_OPT_KEY + " is deprecated and will be removed in a
later release; Please use " + TABLE_TYPE_OPT_KEY)
- optParams ++ Map(TABLE_TYPE_OPT_KEY -> optParams(STORAGE_TYPE_OPT_KEY))
- } else {
- optParams
+ newOptParams = optParams ++ Map(TABLE_TYPE_OPT_KEY ->
optParams(STORAGE_TYPE_OPT_KEY))
}
+ // translate the api partitionBy of spark DataFrameWriter to
PARTITIONPATH_FIELD_OPT_KEY
+ if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY) &&
!optParams.contains(PARTITIONPATH_FIELD_OPT_KEY)) {
+ val partitionColumns =
optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+ .map(SparkDataSourceUtils.decodePartitioningColumns)
+ .getOrElse(Nil)
+
+ val keyGeneratorClass =
optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY,
+ DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
+ val partitionPathField =
+ keyGeneratorClass match {
+ case "org.apache.hudi.keygen.CustomKeyGenerator" =>
Review comment:
Can you please add a one java doc here wrt the format expected by
CustomKeyGenerator.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -46,6 +46,11 @@ class DefaultSource extends RelationProvider
with StreamSinkProvider
with Serializable {
+ SparkSession.getActiveSession.foreach { spark =>
+ // Enable "passPartitionByAsOptions" to support "write.partitionBy(...)"
+ spark.conf.set("spark.sql.legacy.sources.write.passPartitionByAsOptions",
"true")
Review comment:
from what I understand, this is not required in spark 3. If that's the
case, can you fetch spark version and set this only if spark 2 or Lower. bcoz,
even today one can run Hudi w/ spark3.
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -348,4 +351,65 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}
+
+ @Test def testPartitionByTranslateToPartitionPath() {
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val noPartitionPathOpts = commonOpts -
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+
+ // partitionBy takes effect
+ inputDF.write.format("hudi")
+ .partitionBy("current_date")
+ .options(noPartitionPathOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ var recordsReadDF = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*")
+
+ assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
col("current_date").cast("string")).count() == 0)
+
+ // PARTITIONPATH_FIELD_OPT_KEY takes effect
+ inputDF.write.format("hudi")
Review comment:
can we create separate tests for diff key gens. Also, can we please make
a private method and re-use the code in every test if possible.
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -348,4 +351,65 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}
+
+ @Test def testPartitionByTranslateToPartitionPath() {
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val noPartitionPathOpts = commonOpts -
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+
+ // partitionBy takes effect
+ inputDF.write.format("hudi")
+ .partitionBy("current_date")
+ .options(noPartitionPathOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ var recordsReadDF = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*")
+
+ assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
col("current_date").cast("string")).count() == 0)
+
+ // PARTITIONPATH_FIELD_OPT_KEY takes effect
+ inputDF.write.format("hudi")
+ .partitionBy("current_date")
+ .options(noPartitionPathOpts)
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ recordsReadDF = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*")
+
+ assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
col("timestamp").cast("string")).count() == 0)
+
+ // CustomKeyGenerator with SIMPLE
+ inputDF.write.format("hudi")
+ .partitionBy("current_ts")
+ .options(noPartitionPathOpts)
+ .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY,
"org.apache.hudi.keygen.CustomKeyGenerator")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ recordsReadDF = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*")
+
+ assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
col("current_ts").cast("string")).count() == 0)
+
+ // CustomKeyGenerator with TIMESTAMP
+ inputDF.write.format("hudi")
+ .partitionBy("current_ts")
+ .options(noPartitionPathOpts)
+ .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY,
"org.apache.hudi.keygen.CustomKeyGenerator")
+ .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
+ .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ recordsReadDF = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*")
+
+ val udf_date_format = udf((data: Long) => new
DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd")))
Review comment:
Can you please add more tests covering all keygens. ComplexKeygen,
TimestampBasedKeyGen.
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -348,4 +351,65 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}
+
+ @Test def testPartitionByTranslateToPartitionPath() {
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val noPartitionPathOpts = commonOpts -
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+
+ // partitionBy takes effect
+ inputDF.write.format("hudi")
+ .partitionBy("current_date")
+ .options(noPartitionPathOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ var recordsReadDF = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*")
+
+ assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
col("current_date").cast("string")).count() == 0)
+
+ // PARTITIONPATH_FIELD_OPT_KEY takes effect
+ inputDF.write.format("hudi")
+ .partitionBy("current_date")
+ .options(noPartitionPathOpts)
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ recordsReadDF = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*")
+
+ assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
col("timestamp").cast("string")).count() == 0)
+
+ // CustomKeyGenerator with SIMPLE
Review comment:
can you please add one more test for CustomKeyGenerator covering the
format "field1:simple,field2:timestamp". bcoz, this is the only one that has
special handling. would be nice to have more tests around them.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -181,16 +183,37 @@ object DataSourceWriteOptions {
@Deprecated
val DEFAULT_STORAGE_TYPE_OPT_VAL = COW_STORAGE_TYPE_OPT_VAL
- def translateStorageTypeToTableType(optParams: Map[String, String]) :
Map[String, String] = {
+ def translateOptParams(optParams: Map[String, String]): Map[String, String]
= {
+ // translate StorageType to TableType
+ var newOptParams = optParams
Review comment:
may be "translatedOptParams"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]