This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf65269 [HUDI-1230] Fix for preventing MOR datasource jobs from
hanging via spark-submit (#2046)
bf65269 is described below
commit bf65269f66075069b421dfa25f90b105bc4ec662
Author: Udit Mehrotra <[email protected]>
AuthorDate: Thu Sep 17 20:03:35 2020 -0700
[HUDI-1230] Fix for preventing MOR datasource jobs from hanging via
spark-submit (#2046)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 10 +-
.../functional/HoodieSparkSqlWriterSuite.scala | 138 ++++++++++++++++-----
2 files changed, 110 insertions(+), 38 deletions(-)
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 569ed34..450bd73 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -52,6 +52,7 @@ private[hudi] object HoodieSparkSqlWriter {
private val log = LogManager.getLogger(getClass)
private var tableExists: Boolean = false
+ private var asyncCompactionTriggerFnDefined: Boolean = false
def write(sqlContext: SQLContext,
mode: SaveMode,
@@ -67,6 +68,7 @@ private[hudi] object HoodieSparkSqlWriter {
val sparkContext = sqlContext.sparkContext
val path = parameters.get("path")
val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME)
+ asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
if (path.isEmpty || tblNameOp.isEmpty) {
throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path'
must be set.")
}
@@ -147,8 +149,7 @@ private[hudi] object HoodieSparkSqlWriter {
tblName, mapAsJavaMap(parameters)
)).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
- if (asyncCompactionTriggerFn.isDefined &&
- isAsyncCompactionEnabled(client, tableConfig, parameters,
jsc.hadoopConfiguration())) {
+ if (isAsyncCompactionEnabled(client, tableConfig, parameters,
jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
@@ -187,8 +188,7 @@ private[hudi] object HoodieSparkSqlWriter {
Schema.create(Schema.Type.NULL).toString, path.get, tblName,
mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
- if (asyncCompactionTriggerFn.isDefined &&
- isAsyncCompactionEnabled(client, tableConfig, parameters,
jsc.hadoopConfiguration())) {
+ if (isAsyncCompactionEnabled(client, tableConfig, parameters,
jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
@@ -441,7 +441,7 @@ private[hudi] object HoodieSparkSqlWriter {
tableConfig: HoodieTableConfig,
parameters: Map[String, String],
configuration: Configuration) : Boolean = {
log.info(s"Config.isInlineCompaction ?
${client.getConfig.isInlineCompaction}")
- if (!client.getConfig.isInlineCompaction
+ if (asyncCompactionTriggerFnDefined && !client.getConfig.isInlineCompaction
&& parameters.get(ASYNC_COMPACT_ENABLE_OPT_KEY).exists(r =>
r.toBoolean)) {
tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
} else {
diff --git
a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 8995b7c..bcd83db 100644
---
a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++
b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -22,17 +22,29 @@ import java.util.{Date, UUID}
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.client.HoodieWriteClient
+import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.hudi.testutils.DataSourceTestUtils
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSparkSqlWriter, HoodieWriterUtils}
-import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.hudi.{AvroConversionUtils, DataSourceUtils,
DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.{FunSuite, Matchers}
+import scala.collection.JavaConversions._
+
class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
+ var spark: SparkSession = _
+ var sc: SparkContext = _
+ var sqlContext: SQLContext = _
+
test("Parameters With Write Defaults") {
val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty)
val rhsKey = "hoodie.right.hand.side.key"
@@ -65,15 +77,10 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
test("throw hoodie exception when there already exist a table with different
name with Append Save mode") {
- val session = SparkSession.builder()
- .appName("test_append_mode")
- .master("local[2]")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .getOrCreate()
+ initSparkContext("test_append_mode")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
- val sqlContext = session.sqlContext
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
@@ -82,7 +89,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4")
val fooTableParams =
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
- val dataFrame =
session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new
Date().getTime)))
+ val dataFrame =
spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams,
dataFrame)
//on same path try append with different("hoodie_bar_tbl") table name
which should throw an exception
@@ -91,7 +98,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4")
val barTableParams =
HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier)
- val dataFrame2 =
session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new
Date().getTime)))
+ val dataFrame2 =
spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
val tableAlreadyExistException =
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext,
SaveMode.Append, barTableParams, dataFrame2))
assert(tableAlreadyExistException.getMessage.contains("hoodie table with
name " + hoodieFooTableName + " already exist"))
@@ -100,22 +107,16 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
val deleteCmdException =
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext,
SaveMode.Append, deleteTableParams, dataFrame2))
assert(deleteCmdException.getMessage.contains("hoodie table with name "
+ hoodieFooTableName + " already exist"))
} finally {
- session.stop()
+ spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
test("test bulk insert dataset with datasource impl") {
- val session = SparkSession.builder()
- .appName("test_bulk_insert_datasource")
- .master("local[2]")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .getOrCreate()
+ initSparkContext("test_bulk_insert_datasource")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
- val sqlContext = session.sqlContext
- val sc = session.sparkContext
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
@@ -134,7 +135,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
val structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
- val df = session.createDataFrame(sc.parallelize(recordsSeq), structType)
+ val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams,
df)
@@ -148,7 +149,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
}
// fetch all records from parquet files generated from write to hudi
- val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0),
fullPartitionPaths(1), fullPartitionPaths(2))
+ val actualDf = sqlContext.read.parquet(fullPartitionPaths(0),
fullPartitionPaths(1), fullPartitionPaths(2))
// remove metadata columns so that expected and actual DFs can be
compared as is
val trimmedDf =
actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
@@ -157,22 +158,16 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
assert(df.except(trimmedDf).count() == 0)
} finally {
- session.stop()
+ spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
test("test bulk insert dataset with datasource impl multiple rounds") {
- val session = SparkSession.builder()
- .appName("test_bulk_insert_datasource")
- .master("local[2]")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .getOrCreate()
+ initSparkContext("test_bulk_insert_datasource")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
- val sqlContext = session.sqlContext
- val sc = session.sparkContext
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
@@ -194,18 +189,18 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
- var totalExpectedDf = session.createDataFrame(sc.emptyRDD[Row],
structType)
+ var totalExpectedDf = spark.createDataFrame(sc.emptyRDD[Row], structType)
for (_ <- 0 to 2) {
// generate the inserts
val records = DataSourceTestUtils.generateRandomRows(200)
val recordsSeq = convertRowListToSeq(records)
- val df = session.createDataFrame(sc.parallelize(recordsSeq),
structType)
+ val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append,
fooTableParams, df)
// Fetch records from entire dataset
- val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0),
fullPartitionPaths(1), fullPartitionPaths(2))
+ val actualDf = sqlContext.read.parquet(fullPartitionPaths(0),
fullPartitionPaths(1), fullPartitionPaths(2))
// remove metadata columns so that expected and actual DFs can be
compared as is
val trimmedDf =
actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
@@ -218,11 +213,78 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
assert(totalExpectedDf.except(trimmedDf).count() == 0)
}
} finally {
- session.stop()
+ spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
+ List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .foreach(tableType => {
+ test("test basic HoodieSparkSqlWriter functionality with datasource
insert for " + tableType) {
+ initSparkContext("test_insert_datasource")
+ val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+ try {
+
+ val hoodieFooTableName = "hoodie_foo_tbl"
+
+ //create a new table
+ val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+ HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+ DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
+ HoodieWriteConfig.INSERT_PARALLELISM -> "4",
+ DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY ->
classOf[SimpleKeyGenerator].getCanonicalName)
+ val fooTableParams =
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+ // generate the inserts
+ val schema = DataSourceTestUtils.getStructTypeExampleSchema
+ val structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val records = DataSourceTestUtils.generateRandomRows(100)
+ val recordsSeq = convertRowListToSeq(records)
+ val df = spark.createDataFrame(sc.parallelize(recordsSeq),
structType)
+
+ val client = spy(DataSourceUtils.createHoodieClient(
+ new JavaSparkContext(sc),
+ schema.toString,
+ path.toAbsolutePath.toString,
+ hoodieFooTableName,
+
mapAsJavaMap(fooTableParams)).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]])
+
+ // write to Hudi
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append,
fooTableParams, df, Option.empty,
+ Option(client))
+ // Verify that asynchronous compaction is not scheduled
+ verify(client, times(0)).scheduleCompaction(any())
+ // Verify that HoodieWriteClient is closed correctly
+ verify(client, times(1)).close()
+
+ // collect all partition paths to issue read of parquet files
+ val partitions =
Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+ HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
+ // Check the entire dataset has all records still
+ val fullPartitionPaths = new Array[String](3)
+ for (i <- fullPartitionPaths.indices) {
+ fullPartitionPaths(i) = String.format("%s/%s/*",
path.toAbsolutePath.toString, partitions(i))
+ }
+
+ // fetch all records from parquet files generated from write to hudi
+ val actualDf = sqlContext.read.parquet(fullPartitionPaths(0),
fullPartitionPaths(1), fullPartitionPaths(2))
+
+ // remove metadata columns so that expected and actual DFs can be
compared as is
+ val trimmedDf =
actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+ .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+
+ assert(df.except(trimmedDf).count() == 0)
+ } finally {
+ spark.stop()
+ FileUtils.deleteDirectory(path.toFile)
+ }
+ }
+ })
+
case class Test(uuid: String, ts: Long)
import scala.collection.JavaConverters
@@ -230,4 +292,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] =
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
+ def initSparkContext(appName: String): Unit = {
+ spark = SparkSession.builder()
+ .appName(appName)
+ .master("local[2]")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .getOrCreate()
+ sc = spark.sparkContext
+ sc.setLogLevel("ERROR")
+ sqlContext = spark.sqlContext
+ }
}