yihua commented on code in PR #10153:
URL: https://github.com/apache/hudi/pull/10153#discussion_r1419477488
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala:
##########
@@ -140,6 +147,75 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
}
}
+ test("Test Functional Index With Hive Sync Non Partitioned Table") {
+ // There is a big difference between Java class loader architecture of
versions 1.8 and 17.
+ // Hive 2.3.7 is compiled with Java 1.8, and the class loader used there
throws error when Hive APIs are run on Java 17.
+ // So we special case this test only for Java 8.
+ if (HoodieSparkUtils.gteqSpark3_2 && HoodieTestUtils.getJavaVersion == 8) {
+ withTempDir { tmp =>
+ Seq("mor").foreach { tableType =>
+ val databaseName = "default"
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ """.stripMargin)
+ // ts=1000 and from_unixtime(ts, 'yyyy-MM-dd') = '1970-01-01'
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ // ts=100000 and from_unixtime(ts, 'yyyy-MM-dd') = '1970-01-02'
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 100000)")
+ // ts=10000000 and from_unixtime(ts, 'yyyy-MM-dd') = '1970-04-26'
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 10000000)")
+
+ val createIndexSql = s"create index idx_datestr on $tableName using
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')"
+ spark.sql(createIndexSql)
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+ assertTrue(metaClient.getFunctionalIndexMetadata.isPresent)
+ val functionalIndexMetadata =
metaClient.getFunctionalIndexMetadata.get()
+ assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+ assertEquals("func_index_idx_datestr",
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+
+ // sync to hive without partition metadata
+ val hiveSyncProps = new TypedProperties()
Review Comment:
Based on the logic, the user needs to unset the partition filed for meta
sync, correct? Could such a change by user not take effect in the meta-sync
logic in the write client (e.g., we automatically prepare the hive sync configs
based on the partition field)? Should we introduce a new config like
`hoodie.meta.sync.no_partition_metadata` to turn the new feature on or off?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala:
##########
@@ -140,6 +147,75 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
}
}
+ test("Test Functional Index With Hive Sync Non Partitioned Table") {
+ // There is a big difference between Java class loader architecture of
versions 1.8 and 17.
+ // Hive 2.3.7 is compiled with Java 1.8, and the class loader used there
throws error when Hive APIs are run on Java 17.
+ // So we special case this test only for Java 8.
+ if (HoodieSparkUtils.gteqSpark3_2 && HoodieTestUtils.getJavaVersion == 8) {
+ withTempDir { tmp =>
+ Seq("mor").foreach { tableType =>
+ val databaseName = "default"
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ """.stripMargin)
+ // ts=1000 and from_unixtime(ts, 'yyyy-MM-dd') = '1970-01-01'
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ // ts=100000 and from_unixtime(ts, 'yyyy-MM-dd') = '1970-01-02'
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 100000)")
+ // ts=10000000 and from_unixtime(ts, 'yyyy-MM-dd') = '1970-04-26'
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 10000000)")
+
+ val createIndexSql = s"create index idx_datestr on $tableName using
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')"
+ spark.sql(createIndexSql)
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+ assertTrue(metaClient.getFunctionalIndexMetadata.isPresent)
+ val functionalIndexMetadata =
metaClient.getFunctionalIndexMetadata.get()
+ assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+ assertEquals("func_index_idx_datestr",
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+
+ // sync to hive without partition metadata
+ val hiveSyncProps = new TypedProperties()
+ hiveSyncProps.setProperty(HIVE_USER.key, "")
+ hiveSyncProps.setProperty(HIVE_PASS.key, "")
+ hiveSyncProps.setProperty(META_SYNC_DATABASE_NAME.key, databaseName)
+ hiveSyncProps.setProperty(META_SYNC_TABLE_NAME.key, tableName)
+ hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key, basePath)
+ hiveSyncProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key,
"false")
+ HiveTestUtil.setUp(Option.of(hiveSyncProps), false)
+ val tool = new HiveSyncTool(hiveSyncProps, HiveTestUtil.getHiveConf)
+ tool.syncHoodieTable()
Review Comment:
Add a validation here that the HMS does not have partition information of
the table?
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala:
##########
@@ -369,4 +378,28 @@ object HoodieCatalog {
(identityCols, bucketSpec)
}
+
+ def buildPartitionTransforms(spark: SparkSession,
+ basePath: String): Array[Transform] = {
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(spark.sessionState.newHadoopConf())
+ .setBasePath(basePath)
+ .build()
Review Comment:
Do you know under what conditions the stage change methods are called? This
method has non-trivial latency. If often called, this may not scale.
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala:
##########
@@ -369,4 +378,28 @@ object HoodieCatalog {
(identityCols, bucketSpec)
}
+
+ def buildPartitionTransforms(spark: SparkSession,
+ basePath: String): Array[Transform] = {
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(spark.sessionState.newHadoopConf())
+ .setBasePath(basePath)
+ .build()
+ val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build()
+ val metadataFileSystemView =
FileSystemViewManager.createInMemoryFileSystemView(
+ new HoodieSparkEngineContext(spark.sparkContext), metaClient,
metadataConfig)
Review Comment:
For tables without MDT this can fail?
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala:
##########
@@ -54,7 +59,8 @@ class HoodieCatalog extends DelegatingCatalogExtension
override def stageCreate(ident: Identifier, schema: StructType, partitions:
Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
val locUriAndTableType = deduceTableLocationURIAndTableType(ident,
properties)
- HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
+ val partitionTransforms = if (partitions.isEmpty)
buildPartitionTransforms(spark, locUriAndTableType._1.getPath) else partitions
Review Comment:
When the table is non-partitioned, is `partitions` still empty, and can we
add a short circuit to skip listing the table partition if non-partitioned?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]