nsivabalan commented on a change in pull request #2283:
URL: https://github.com/apache/hudi/pull/2283#discussion_r567244821



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -377,11 +388,71 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig.supportTimestamp = 
parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
     hiveSyncConfig.decodePartition = 
parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
       DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean
+    hiveSyncConfig.tableProperties = 
parameters.getOrElse(HIVE_TABLE_PROPERTIES, null)
+    hiveSyncConfig.serdeProperties = createSqlTableSerdeProperties(parameters, 
basePath.toString,
+      hiveSyncConfig.partitionFields.size())
     hiveSyncConfig
   }
 
-  private def metaSync(parameters: Map[String, String],
-                       basePath: Path,
+  /**
+    * Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES.
+    * @param sqlConf
+    * @param schema
+    * @param parameters
+    * @return A new parameters added the HIVE_TABLE_PROPERTIES property.
+    */
+  private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType,
+                                    parameters: Map[String, String]): 
Map[String, String] = {
+    val partitionSet = parameters(HIVE_PARTITION_FIELDS_OPT_KEY)
+      .split(",").map(_.trim).filter(!_.isEmpty).toSet
+    val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
+
+    val (partCols, dataCols) = schema.partition(c => 
partitionSet.contains(c.name))
+    val reOrdered = StructType(dataCols ++ partCols)
+    val parts = reOrdered.json.grouped(threshold).toSeq
+
+    var properties = Map(
+      "spark.sql.sources.provider" -> "hudi",

Review comment:
       I am new to these, hence might be wrong. but can you please clarify if 
this should be "org.apache.hudi" ? 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -377,11 +388,71 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig.supportTimestamp = 
parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
     hiveSyncConfig.decodePartition = 
parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
       DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean
+    hiveSyncConfig.tableProperties = 
parameters.getOrElse(HIVE_TABLE_PROPERTIES, null)
+    hiveSyncConfig.serdeProperties = createSqlTableSerdeProperties(parameters, 
basePath.toString,
+      hiveSyncConfig.partitionFields.size())
     hiveSyncConfig
   }
 
-  private def metaSync(parameters: Map[String, String],
-                       basePath: Path,
+  /**
+    * Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES.
+    * @param sqlConf
+    * @param schema
+    * @param parameters
+    * @return A new parameters added the HIVE_TABLE_PROPERTIES property.
+    */
+  private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType,
+                                    parameters: Map[String, String]): 
Map[String, String] = {
+    val partitionSet = parameters(HIVE_PARTITION_FIELDS_OPT_KEY)
+      .split(",").map(_.trim).filter(!_.isEmpty).toSet
+    val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
+
+    val (partCols, dataCols) = schema.partition(c => 
partitionSet.contains(c.name))
+    val reOrdered = StructType(dataCols ++ partCols)
+    val parts = reOrdered.json.grouped(threshold).toSeq
+
+    var properties = Map(
+      "spark.sql.sources.provider" -> "hudi",
+      "spark.sql.sources.schema.numParts" -> parts.size.toString
+    )
+    parts.zipWithIndex.foreach { case (part, index) =>

Review comment:
       can you please add a comment on why we are doing this rather than 
setting it as just one param. Also, do link any references so that devs are 
aware of why we are doing and what all props we may need to set. 

##########
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
##########
@@ -138,6 +138,27 @@ public void updatePartitionsToTable(String tableName, 
List<String> changedPartit
     }
   }
 
+  /**
+   * Update the table properties to the table.
+   * @param tableProperties
+   */
+  @Override
+  public void updateTableProperties(String tableName, Map<String, String> 
tableProperties) {
+    if (tableProperties == null || tableProperties.size() == 0) {
+      return;
+    }
+    try {
+      Table table = client.getTable(syncConfig.databaseName, tableName);
+      for (Map.Entry<String, String> entry: tableProperties.entrySet()) {
+        table.putToParameters(entry.getKey(), entry.getValue());
+      }
+      client.alter_table(syncConfig.databaseName, tableName, table);
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to get update table properties 
for table: "

Review comment:
       minor. "Failed to update...". remove extra "get"

##########
File path: 
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -63,7 +63,8 @@ public AbstractSyncHoodieClient(String basePath, boolean 
assumeDatePartitioning,
   }
 
   public abstract void createTable(String tableName, MessageType storageSchema,
-                                   String inputFormatClass, String 
outputFormatClass, String serdeClass);
+                                   String inputFormatClass, String 
outputFormatClass,

Review comment:
       I understand it's not part of this diff. But wondering if you can add 
some java docs to this class in general. I realized we don't have any one. (at 
line 41 ish) . 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##########
@@ -397,6 +401,46 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       }
     })
 
+  test("Test build sync config for spark sql") {
+    initSparkContext("test build sync config")
+    val addSqlTablePropertiesMethod =
+        
HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
+          classOf[SQLConf], classOf[StructType], classOf[Map[_,_]])
+    addSqlTablePropertiesMethod.setAccessible(true)
+
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val basePath = "/tmp/hoodie_test"
+    val params = Map(
+      "path" -> basePath,
+      DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
+      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition"
+    )
+    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
+    val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
+      spark.sessionState.conf, structType, parameters)
+      .asInstanceOf[Map[String, String]]
+
+    val buildSyncConfigMethod =
+      HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", 
classOf[Path],
+        classOf[Map[_,_]])
+    buildSyncConfigMethod.setAccessible(true)
+
+    val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
+      new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
+
+    assertResult("spark.sql.sources.provider=hudi\n" +
+      "spark.sql.sources.schema.partCol.0=partition\n" +
+      "spark.sql.sources.schema.numParts=1\n" +
+      "spark.sql.sources.schema.numPartCols=1\n" +
+      "spark.sql.sources.schema.part.0=" +
+      
"{\"type\":\"struct\",\"fields\":[{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},"
 +
+      "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," 
+
+      
"{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)
+
+    assertResult("path=/tmp/hoodie_test/*/*")(hiveSyncConfig.serdeProperties)

Review comment:
       same comment as above. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -377,11 +388,71 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig.supportTimestamp = 
parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
     hiveSyncConfig.decodePartition = 
parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
       DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean
+    hiveSyncConfig.tableProperties = 
parameters.getOrElse(HIVE_TABLE_PROPERTIES, null)
+    hiveSyncConfig.serdeProperties = createSqlTableSerdeProperties(parameters, 
basePath.toString,
+      hiveSyncConfig.partitionFields.size())
     hiveSyncConfig
   }
 
-  private def metaSync(parameters: Map[String, String],
-                       basePath: Path,
+  /**
+    * Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES.
+    * @param sqlConf
+    * @param schema
+    * @param parameters
+    * @return A new parameters added the HIVE_TABLE_PROPERTIES property.
+    */
+  private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType,
+                                    parameters: Map[String, String]): 
Map[String, String] = {
+    val partitionSet = parameters(HIVE_PARTITION_FIELDS_OPT_KEY)
+      .split(",").map(_.trim).filter(!_.isEmpty).toSet
+    val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
+
+    val (partCols, dataCols) = schema.partition(c => 
partitionSet.contains(c.name))
+    val reOrdered = StructType(dataCols ++ partCols)
+    val parts = reOrdered.json.grouped(threshold).toSeq
+
+    var properties = Map(
+      "spark.sql.sources.provider" -> "hudi",
+      "spark.sql.sources.schema.numParts" -> parts.size.toString
+    )
+    parts.zipWithIndex.foreach { case (part, index) =>
+      properties += s"spark.sql.sources.schema.part.$index" -> part
+    }
+    // add partition columns
+    if (partitionSet.nonEmpty) {
+      properties += "spark.sql.sources.schema.numPartCols" -> 
partitionSet.size.toString
+      partitionSet.zipWithIndex.foreach { case (partCol, index) =>
+        properties += s"spark.sql.sources.schema.partCol.$index" -> partCol
+      }
+    }
+    var sqlPropertyText = ConfigUtils.configToString(properties)
+    sqlPropertyText = if (parameters.containsKey(HIVE_TABLE_PROPERTIES)) {
+      sqlPropertyText + "\n" + parameters(HIVE_TABLE_PROPERTIES)
+    } else {
+      sqlPropertyText
+    }
+    parameters + (HIVE_TABLE_PROPERTIES -> sqlPropertyText)
+  }
+
+  private def createSqlTableSerdeProperties(parameters: Map[String, String],
+                                            basePath: String, pathDepth: Int): 
String = {
+    assert(pathDepth >= 0, "Path Depth must great or equal to 0")
+    var pathProp = s"path=$basePath"
+    if (pathProp.endsWith("/")) {
+      pathProp = pathProp.substring(0, pathProp.length - 1)
+    }
+    for (_ <- 0 until pathDepth + 1) {
+      pathProp = s"$pathProp/*"

Review comment:
       are we just overriding every time and finally setting pathProp to final 
value ? 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -377,11 +388,71 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig.supportTimestamp = 
parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
     hiveSyncConfig.decodePartition = 
parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
       DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean
+    hiveSyncConfig.tableProperties = 
parameters.getOrElse(HIVE_TABLE_PROPERTIES, null)
+    hiveSyncConfig.serdeProperties = createSqlTableSerdeProperties(parameters, 
basePath.toString,
+      hiveSyncConfig.partitionFields.size())
     hiveSyncConfig
   }
 
-  private def metaSync(parameters: Map[String, String],
-                       basePath: Path,
+  /**
+    * Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES.
+    * @param sqlConf
+    * @param schema
+    * @param parameters
+    * @return A new parameters added the HIVE_TABLE_PROPERTIES property.
+    */
+  private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType,
+                                    parameters: Map[String, String]): 
Map[String, String] = {
+    val partitionSet = parameters(HIVE_PARTITION_FIELDS_OPT_KEY)
+      .split(",").map(_.trim).filter(!_.isEmpty).toSet
+    val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
+
+    val (partCols, dataCols) = schema.partition(c => 
partitionSet.contains(c.name))
+    val reOrdered = StructType(dataCols ++ partCols)
+    val parts = reOrdered.json.grouped(threshold).toSeq

Review comment:
       can we rename to something like schemaParts or schemaSlices

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##########
@@ -397,6 +401,46 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       }
     })
 
+  test("Test build sync config for spark sql") {
+    initSparkContext("test build sync config")
+    val addSqlTablePropertiesMethod =
+        
HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
+          classOf[SQLConf], classOf[StructType], classOf[Map[_,_]])
+    addSqlTablePropertiesMethod.setAccessible(true)
+
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val basePath = "/tmp/hoodie_test"
+    val params = Map(
+      "path" -> basePath,
+      DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
+      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition"
+    )
+    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
+    val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
+      spark.sessionState.conf, structType, parameters)
+      .asInstanceOf[Map[String, String]]
+
+    val buildSyncConfigMethod =
+      HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", 
classOf[Path],
+        classOf[Map[_,_]])
+    buildSyncConfigMethod.setAccessible(true)
+
+    val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
+      new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
+
+    assertResult("spark.sql.sources.provider=hudi\n" +
+      "spark.sql.sources.schema.partCol.0=partition\n" +
+      "spark.sql.sources.schema.numParts=1\n" +
+      "spark.sql.sources.schema.numPartCols=1\n" +
+      "spark.sql.sources.schema.part.0=" +
+      
"{\"type\":\"struct\",\"fields\":[{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},"
 +

Review comment:
       Ideally it would be nice to construct the expected output rather than 
hardcoding. Can we at least use structType to construct the schema parts in 
this expected output. would be good to avoid hardcoding it. 

##########
File path: 
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -75,6 +76,8 @@ public abstract void createTable(String tableName, 
MessageType storageSchema,
 
   public abstract void updatePartitionsToTable(String tableName, List<String> 
changedPartitions);
 
+  public abstract void updateTableProperties(String tableName, Map<String, 
String> tableProperties);

Review comment:
       since this is abstract class and not every implementation will have some 
concrete override, can we make this empty here so that HoodieDLAClient does not 
need to do a no op override. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##########
@@ -397,6 +401,46 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       }
     })
 
+  test("Test build sync config for spark sql") {
+    initSparkContext("test build sync config")
+    val addSqlTablePropertiesMethod =
+        
HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
+          classOf[SQLConf], classOf[StructType], classOf[Map[_,_]])
+    addSqlTablePropertiesMethod.setAccessible(true)
+
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val basePath = "/tmp/hoodie_test"
+    val params = Map(
+      "path" -> basePath,
+      DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
+      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition"
+    )
+    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
+    val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
+      spark.sessionState.conf, structType, parameters)
+      .asInstanceOf[Map[String, String]]
+
+    val buildSyncConfigMethod =
+      HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", 
classOf[Path],
+        classOf[Map[_,_]])
+    buildSyncConfigMethod.setAccessible(true)
+
+    val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
+      new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
+
+    assertResult("spark.sql.sources.provider=hudi\n" +
+      "spark.sql.sources.schema.partCol.0=partition\n" +
+      "spark.sql.sources.schema.numParts=1\n" +
+      "spark.sql.sources.schema.numPartCols=1\n" +
+      "spark.sql.sources.schema.part.0=" +
+      
"{\"type\":\"struct\",\"fields\":[{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},"
 +
+      "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," 
+
+      
"{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)

Review comment:
       do you think we need to assert HIVE_TABLE_PROPERTIES as well ?

##########
File path: 
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
##########
@@ -249,6 +255,54 @@ public void testBasicSync(boolean useJdbc, boolean 
useSchemaFromCommitMetadata)
         "The last commit that was sycned should be 100");
   }
 
+  @ParameterizedTest
+  @MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
+  public void testSyncWithProperties(boolean useJdbc, boolean 
useSchemaFromCommitMetadata) throws Exception {

Review comment:
       Would be nice if you write a test for the actual problem you faced as 
per the title/desc. And ensure that the test fails w/o this patch and succeeds 
with this patch. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to