nsivabalan commented on code in PR #8875:
URL: https://github.com/apache/hudi/pull/8875#discussion_r1214879586
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -89,6 +89,62 @@ trait ProvidesHoodieConfig extends Logging {
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
+ /**
+ * Get the insert operation.
+ * See if we are able to set bulk insert, else use deduceOperation
+ */
+ private def getOperation(isPartitionedTable: Boolean,
+ isOverwritePartition: Boolean,
+ isOverwriteTable: Boolean,
+ insertModeSet: Boolean,
+ dropDuplicate: Option[String],
+ enableBulkInsert: Option[String],
+ isInsertInto: Boolean,
+ isNonStrictMode: Boolean,
+ hasPrecombineColumn: Boolean): String = {
+ val notSetToNonStrict = !insertModeSet || isNonStrictMode
+ //if options are not set, we assume they are configs to do bulk insert
+ (isInsertInto, notSetToNonStrict, enableBulkInsert.getOrElse("true"),
Review Comment:
we should call this only incase of INSERT Into. So, why do we have an
explicit argument for "isInsertInto". Why can't caller avoid calling this for
other cases.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -89,6 +89,62 @@ trait ProvidesHoodieConfig extends Logging {
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
+ /**
+ * Get the insert operation.
+ * See if we are able to set bulk insert, else use deduceOperation
+ */
+ private def getOperation(isPartitionedTable: Boolean,
+ isOverwritePartition: Boolean,
+ isOverwriteTable: Boolean,
+ insertModeSet: Boolean,
+ dropDuplicate: Option[String],
+ enableBulkInsert: Option[String],
+ isInsertInto: Boolean,
+ isNonStrictMode: Boolean,
+ hasPrecombineColumn: Boolean): String = {
+ val notSetToNonStrict = !insertModeSet || isNonStrictMode
+ //if options are not set, we assume they are configs to do bulk insert
+ (isInsertInto, notSetToNonStrict, enableBulkInsert.getOrElse("true"),
+ dropDuplicate.getOrElse("false"), isOverwritePartition,
isPartitionedTable) match {
+ case (true, true, "true", "false", false, _) =>
BULK_INSERT_OPERATION_OPT_VAL
Review Comment:
for now, can we focus on INSERT_INTO cases w/o bringing in overwrite
partition flows.
we don't need to use the same switch case as well.
we can make this into two sections.
fetch minimal configs and determine if bulk_insert can be enabled.
if not, go w/ master code path.
fetch minimal configs and determine if bulk_insert can be enabled:
```
if InsertInto, If insert mode is not set, bulk insert is not explicitly
enabled, dropDuplicate is not set.
```
In the else section, we can pretty much re-use whats in master. bcoz, for
other bulk_insert use-cases there are some guard rails. so, in the above
optimization we are adding, lets focus on enabling bulk_insert only when no
configs are explicitly set.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestInsertIntoOperation.scala:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.spark.sql
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+import org.apache.hudi.common.model.WriteOperationType
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.TimelineUtils
+
+import scala.collection.JavaConversions._
+
+@DisableCI
+class TestInsertIntoOperation extends HoodieSparkSqlTestBase {
+
+ /**
+ * asserts if number of commits = count
+ * returns true if last commit is bulk insert
+ */
+ def assertCommitCountAndIsLastBulkInsert(basePath: String, count: Int):
Boolean = {
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+ val timeline = metaClient.getActiveTimeline.getAllCommitsTimeline
+ assert(timeline.countInstants() == count)
+ val latestCommit = timeline.lastInstant()
+ assert(latestCommit.isPresent)
+ assert(latestCommit.get().isCompleted)
+ val metadata = TimelineUtils.getCommitMetadata(latestCommit.get(),
timeline)
+ metadata.getOperationType.equals(WriteOperationType.BULK_INSERT)
+ }
+
+ def createTable(tableName: String, writeOptions: String, tableBasePath:
String): Unit = {
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | timestamp long,
+ | _row_key string,
+ | rider string,
+ | driver string,
+ | begin_lat double,
+ | begin_lon double,
+ | end_lat double,
+ | end_lon double,
+ | fare STRUCT<
+ | amount: double,
+ | currency: string >,
+ | _hoodie_is_deleted boolean,
+ | partition_path string
+ |) using hudi
+ | partitioned by (partition_path)
+ | $writeOptions
+ | location '$tableBasePath'
+ |
+ """.stripMargin)
+ }
+
+ def generateInserts(dataGen: HoodieTestDataGenerator, instantTime: String,
n: Int): sql.DataFrame = {
+ val recs = dataGen.generateInsertsNestedExample(instantTime, n)
+ spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs), 2))
+ }
+
+ def doInsert(dataGen: HoodieTestDataGenerator, tableName: String,
instantTime: String): Unit = {
+ generateInserts(dataGen, instantTime, 100).select("timestamp", "_row_key",
"rider", "driver",
+ "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
"_hoodie_is_deleted", "partition_path").
+ createOrReplaceTempView("insert_temp_table")
+ spark.sql(s"insert into $tableName select * from insert_temp_table")
+ }
+
+ test("No configs set") {
Review Comment:
can we also test pk less table.
both table types.
no preCombine.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -89,6 +89,62 @@ trait ProvidesHoodieConfig extends Logging {
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
+ /**
+ * Get the insert operation.
+ * See if we are able to set bulk insert, else use deduceOperation
+ */
+ private def getOperation(isPartitionedTable: Boolean,
+ isOverwritePartition: Boolean,
+ isOverwriteTable: Boolean,
+ insertModeSet: Boolean,
+ dropDuplicate: Option[String],
+ enableBulkInsert: Option[String],
+ isInsertInto: Boolean,
+ isNonStrictMode: Boolean,
+ hasPrecombineColumn: Boolean): String = {
+ val notSetToNonStrict = !insertModeSet || isNonStrictMode
Review Comment:
if we pass in the incoming params, would that make things easier.
for eg
```
val insertModeOverridden = params.containsKey("hoodie.sql.insert.mode")
```
something like this
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -89,6 +89,62 @@ trait ProvidesHoodieConfig extends Logging {
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
+ /**
+ * Get the insert operation.
+ * See if we are able to set bulk insert, else use deduceOperation
+ */
+ private def getOperation(isPartitionedTable: Boolean,
+ isOverwritePartition: Boolean,
+ isOverwriteTable: Boolean,
+ insertModeSet: Boolean,
+ dropDuplicate: Option[String],
+ enableBulkInsert: Option[String],
+ isInsertInto: Boolean,
+ isNonStrictMode: Boolean,
+ hasPrecombineColumn: Boolean): String = {
+ val notSetToNonStrict = !insertModeSet || isNonStrictMode
+ //if options are not set, we assume they are configs to do bulk insert
+ (isInsertInto, notSetToNonStrict, enableBulkInsert.getOrElse("true"),
+ dropDuplicate.getOrElse("false"), isOverwritePartition,
isPartitionedTable) match {
+ case (true, true, "true", "false", false, _) =>
BULK_INSERT_OPERATION_OPT_VAL
+ case (true, true, "true", "false", true, false) =>
BULK_INSERT_OPERATION_OPT_VAL
+
+ //if config is set such that we cant make it bulk insert, we need to use
defaults for unset configs
+ case _ =>
deduceOperation(enableBulkInsert.getOrElse(SQL_ENABLE_BULK_INSERT.defaultValue).toBoolean,
+ isOverwritePartition, isOverwriteTable,
dropDuplicate.getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean,
+ isNonStrictMode, isPartitionedTable, hasPrecombineColumn)
+ }
+ }
+
+ /**
+ * Deduce the insert operation
+ */
+ private def deduceOperation(enableBulkInsert: Boolean, isOverwritePartition:
Boolean, isOverwriteTable: Boolean,
+ dropDuplicate: Boolean, isNonStrictMode:
Boolean, isPartitionedTable: Boolean,
+ hasPrecombineColumn: Boolean): String = {
+ (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate,
isNonStrictMode, isPartitionedTable) match {
Review Comment:
Is there any code changes here or we just moved the code around?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -127,43 +184,21 @@ trait ProvidesHoodieConfig extends Logging {
val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
.getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)
- val enableBulkInsert =
combinedOpts.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
- DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
- val dropDuplicate = sparkSession.conf
-
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
+ val enableBulkInsert = combinedOpts.get(SQL_ENABLE_BULK_INSERT.key)
Review Comment:
we may not need these changes if we follow the suggestions above.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -89,6 +89,62 @@ trait ProvidesHoodieConfig extends Logging {
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
+ /**
+ * Get the insert operation.
+ * See if we are able to set bulk insert, else use deduceOperation
+ */
+ private def getOperation(isPartitionedTable: Boolean,
+ isOverwritePartition: Boolean,
+ isOverwriteTable: Boolean,
+ insertModeSet: Boolean,
+ dropDuplicate: Option[String],
+ enableBulkInsert: Option[String],
+ isInsertInto: Boolean,
+ isNonStrictMode: Boolean,
+ hasPrecombineColumn: Boolean): String = {
+ val notSetToNonStrict = !insertModeSet || isNonStrictMode
+ //if options are not set, we assume they are configs to do bulk insert
+ (isInsertInto, notSetToNonStrict, enableBulkInsert.getOrElse("true"),
+ dropDuplicate.getOrElse("false"), isOverwritePartition,
isPartitionedTable) match {
+ case (true, true, "true", "false", false, _) =>
BULK_INSERT_OPERATION_OPT_VAL
+ case (true, true, "true", "false", true, false) =>
BULK_INSERT_OPERATION_OPT_VAL
+
+ //if config is set such that we cant make it bulk insert, we need to use
defaults for unset configs
+ case _ =>
deduceOperation(enableBulkInsert.getOrElse(SQL_ENABLE_BULK_INSERT.defaultValue).toBoolean,
+ isOverwritePartition, isOverwriteTable,
dropDuplicate.getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean,
+ isNonStrictMode, isPartitionedTable, hasPrecombineColumn)
+ }
+ }
+
+ /**
+ * Deduce the insert operation
+ */
+ private def deduceOperation(enableBulkInsert: Boolean, isOverwritePartition:
Boolean, isOverwriteTable: Boolean,
Review Comment:
deduceWriteOperationForInsertInfo
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -89,6 +89,62 @@ trait ProvidesHoodieConfig extends Logging {
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
+ /**
+ * Get the insert operation.
+ * See if we are able to set bulk insert, else use deduceOperation
+ */
+ private def getOperation(isPartitionedTable: Boolean,
+ isOverwritePartition: Boolean,
+ isOverwriteTable: Boolean,
+ insertModeSet: Boolean,
+ dropDuplicate: Option[String],
+ enableBulkInsert: Option[String],
+ isInsertInto: Boolean,
+ isNonStrictMode: Boolean,
+ hasPrecombineColumn: Boolean): String = {
+ val notSetToNonStrict = !insertModeSet || isNonStrictMode
+ //if options are not set, we assume they are configs to do bulk insert
Review Comment:
minor. "they". guess some typo in the comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]