alexeykudinkin commented on code in PR #6213:
URL: https://github.com/apache/hudi/pull/6213#discussion_r929392564
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,100 +77,111 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig {
* @param extraOptions Extra options for insert.
*/
def run(sparkSession: SparkSession,
- table: CatalogTable,
- query: LogicalPlan,
- insertPartitions: Map[String, Option[String]],
- overwrite: Boolean,
- refreshTable: Boolean = true,
- extraOptions: Map[String, String] = Map.empty): Boolean = {
-
- val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
- val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession,
overwrite, insertPartitions, extraOptions)
-
- val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
- // insert overwrite non-partition table
+ table: CatalogTable,
+ query: LogicalPlan,
+ partitionSpec: Map[String, Option[String]],
+ overwrite: Boolean,
+ refreshTable: Boolean = true,
+ extraOptions: Map[String, String] = Map.empty): Boolean = {
+ val catalogTable = new HoodieCatalogTable(sparkSession, table)
+ val config = buildHoodieInsertConfig(catalogTable, sparkSession,
overwrite, partitionSpec, extraOptions)
+
+ // NOTE: In case of partitioned table we override specified "overwrite"
parameter
+ // to instead append to the dataset
+ val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
Review Comment:
Frankly, i don't have full context as to why -- i'm preserving existing
semantic
@leesf can you elaborate on why this was the way we handle `INSERT
OVERWRITE`?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,100 +77,111 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig {
* @param extraOptions Extra options for insert.
*/
def run(sparkSession: SparkSession,
- table: CatalogTable,
- query: LogicalPlan,
- insertPartitions: Map[String, Option[String]],
- overwrite: Boolean,
- refreshTable: Boolean = true,
- extraOptions: Map[String, String] = Map.empty): Boolean = {
-
- val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
- val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession,
overwrite, insertPartitions, extraOptions)
-
- val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
- // insert overwrite non-partition table
+ table: CatalogTable,
+ query: LogicalPlan,
+ partitionSpec: Map[String, Option[String]],
+ overwrite: Boolean,
+ refreshTable: Boolean = true,
+ extraOptions: Map[String, String] = Map.empty): Boolean = {
+ val catalogTable = new HoodieCatalogTable(sparkSession, table)
+ val config = buildHoodieInsertConfig(catalogTable, sparkSession,
overwrite, partitionSpec, extraOptions)
+
+ // NOTE: In case of partitioned table we override specified "overwrite"
parameter
+ // to instead append to the dataset
+ val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
SaveMode.Overwrite
} else {
- // for insert into or insert overwrite partition we use append mode.
SaveMode.Append
}
- val conf = sparkSession.sessionState.conf
- val alignedQuery = alignOutputFields(query, hoodieCatalogTable,
insertPartitions, conf)
- // If we create dataframe using the Dataset.ofRows(sparkSession,
alignedQuery),
- // The nullable attribute of fields will lost.
- // In order to pass the nullable attribute to the inputDF, we specify the
schema
- // of the rdd.
Review Comment:
We don't need this conversion at all actually
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]