viirya commented on code in PR #109:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/109#discussion_r1505483447
##########
spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala:
##########
@@ -913,407 +822,162 @@ abstract class CometShuffleSuiteBase extends
CometTestBase with AdaptiveSparkPla
test("Columnar shuffle for large shuffle partition number") {
Seq(10, 200, 201).foreach { numPartitions =>
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> "false",
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
- withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
- val df = sql("SELECT * FROM tbl")
-
- val shuffled = df.repartitionByRange(numPartitions, $"_2")
-
- val cometShuffleExecs = checkCometExchange(shuffled, 1, false)
- // `CometSerializedShuffleHandle` is used for large shuffle
partition number,
- // i.e., sort-based shuffle writer
- cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName
- .contains("CometSerializedShuffleHandle")
-
- checkSparkAnswer(shuffled)
- }
- }
- }
- }
-
- test("grouped aggregate: Comet shuffle") {
- withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
- withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") {
- val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1")
- checkCometExchange(df, 1, true)
- checkSparkAnswerAndOperator(df)
- }
- }
- }
-
- test("hash shuffle: Comet shuffle") {
- // Disable CometExec to explicit test Comet Arrow shuffle path
- Seq(true, false).foreach { execEnabled =>
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
(!execEnabled).toString) {
- withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
- val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
- val shuffled1 = df.repartition(10, $"_1")
-
- // If Comet execution is disabled, `Sort` operator is Spark operator
- // and jvm arrow shuffle is applied.
- checkCometExchange(shuffled1, 1, execEnabled)
- checkSparkAnswer(shuffled1)
-
- val shuffled2 = df.repartition(10, $"_1", $"_2")
-
- checkCometExchange(shuffled2, 1, execEnabled)
- checkSparkAnswer(shuffled2)
+ withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+ val df = sql("SELECT * FROM tbl")
- val shuffled3 = df.repartition(10, $"_2", $"_1")
+ val shuffled = df.repartitionByRange(numPartitions, $"_2")
- checkCometExchange(shuffled3, 1, execEnabled)
- checkSparkAnswer(shuffled3)
- }
- }
- }
- }
+ val cometShuffleExecs = checkCometExchange(shuffled, 1, false)
+ // `CometSerializedShuffleHandle` is used for large shuffle partition
number,
+ // i.e., sort-based shuffle writer
+ cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName
+ .contains("CometSerializedShuffleHandle")
- test("Comet shuffle: different data type") {
- // Disable CometExec to explicit test Comet native shuffle path
- Seq(true, false).foreach { execEnabled =>
- Seq(true, false).foreach { dictionaryEnabled =>
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "test.parquet")
- makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled,
10000)
- val all_types = if (isSpark34Plus) {
- Seq(
- $"_1",
- $"_2",
- $"_3",
- $"_4",
- $"_5",
- $"_6",
- $"_7",
- $"_8",
- $"_9",
- $"_10",
- $"_11",
- $"_13",
- $"_14",
- $"_15",
- $"_16",
- $"_17",
- $"_18",
- $"_19",
- $"_20")
- } else {
- Seq(
- $"_1",
- $"_2",
- $"_3",
- $"_4",
- $"_5",
- $"_6",
- $"_7",
- $"_8",
- $"_9",
- $"_10",
- $"_11",
- $"_13",
- $"_15",
- $"_16",
- $"_18",
- $"_19",
- $"_20")
- }
- all_types.foreach { col =>
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- "parquet.enable.dictionary" -> dictionaryEnabled.toString) {
- readParquetFile(path.toString) { df =>
- val shuffled = df
- .select($"_1")
- .repartition(10, col)
- checkCometExchange(shuffled, 1, true)
- if (execEnabled) {
- checkSparkAnswerAndOperator(shuffled)
- } else {
- checkSparkAnswer(shuffled)
- }
- }
- }
- }
- }
+ checkSparkAnswer(shuffled)
}
}
}
- test("hash shuffle: Comet columnar shuffle") {
+ test("hash-based columnar shuffle") {
Seq(10, 200, 201).foreach { numPartitions =>
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> "false",
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
- withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
- val df = sql("SELECT * FROM tbl")
+ withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+ val df = sql("SELECT * FROM tbl")
- val shuffled1 =
- df.repartitionByRange(numPartitions,
$"_2").limit(2).repartition(numPartitions, $"_1")
+ val shuffled1 =
+ df.repartitionByRange(numPartitions,
$"_2").limit(2).repartition(numPartitions, $"_1")
- // 3 exchanges are expected: 1) shuffle to repartition by range, 2)
shuffle to global limit, 3) hash shuffle
- checkCometExchange(shuffled1, 3, false)
- checkSparkAnswer(shuffled1)
+ // 3 exchanges are expected: 1) shuffle to repartition by range, 2)
shuffle to global limit, 3) hash shuffle
+ checkCometExchange(shuffled1, 3, false)
+ checkSparkAnswer(shuffled1)
- val shuffled2 = df
- .repartitionByRange(numPartitions, $"_2")
- .limit(2)
- .repartition(numPartitions, $"_1", $"_2")
+ val shuffled2 = df
+ .repartitionByRange(numPartitions, $"_2")
+ .limit(2)
+ .repartition(numPartitions, $"_1", $"_2")
- checkCometExchange(shuffled2, 3, false)
- checkSparkAnswer(shuffled2)
+ checkCometExchange(shuffled2, 3, false)
+ checkSparkAnswer(shuffled2)
- val shuffled3 = df
- .repartitionByRange(numPartitions, $"_2")
- .limit(2)
- .repartition(numPartitions, $"_2", $"_1")
+ val shuffled3 = df
+ .repartitionByRange(numPartitions, $"_2")
+ .limit(2)
+ .repartition(numPartitions, $"_2", $"_1")
- checkCometExchange(shuffled3, 3, false)
- checkSparkAnswer(shuffled3)
- }
+ checkCometExchange(shuffled3, 3, false)
+ checkSparkAnswer(shuffled3)
}
}
}
- test("Comet columnar shuffle shuffle: different data type") {
- Seq(10, 200, 201).foreach { numPartitions =>
- Seq(true, false).foreach { dictionaryEnabled =>
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "test.parquet")
- makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled,
10000)
-
- Seq(
- $"_1",
- $"_2",
- $"_3",
- $"_4",
- $"_5",
- $"_6",
- $"_7",
- $"_8",
- $"_9",
- $"_10",
- $"_11",
- $"_13",
- $"_14",
- $"_15",
- $"_16",
- $"_17",
- $"_18",
- $"_19",
- $"_20").foreach { col =>
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> "false",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
- readParquetFile(path.toString) { df =>
- val shuffled = df
- .select($"_1")
- .repartition(numPartitions, col)
- val cometShuffleExecs = checkCometExchange(shuffled, 1, false)
- if (numPartitions > 200) {
- // For sort-based shuffle writer
-
cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName
- .contains("CometSerializedShuffleHandle")
- }
- checkSparkAnswer(shuffled)
+ test("columnar shuffle: different data type") {
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled,
1000)
+
+ Seq(10, 201).foreach { numPartitions =>
+ (1 to 20).map(i => s"_$i").foreach { c =>
+ readParquetFile(path.toString) { df =>
+ val shuffled = df
+ .select($"_1")
+ .repartition(numPartitions, col(c))
+ val cometShuffleExecs = checkCometExchange(shuffled, 1, false)
+ if (numPartitions > 200) {
+ // For sort-based shuffle writer
+
cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName
+ .contains("CometSerializedShuffleHandle")
}
+ checkSparkAnswer(shuffled)
}
}
}
}
}
}
- test("Comet native operator after Comet shuffle") {
- Seq(true, false).foreach { columnarShuffle =>
- withSQLConf(
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
columnarShuffle.toString) {
- withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
- val df = sql("SELECT * FROM tbl")
-
- val shuffled1 = df
- .repartition(10, $"_2")
- .select($"_1", $"_1" + 1, $"_2" + 2)
- .repartition(10, $"_1")
- .filter($"_1" > 1)
-
- // 2 Comet shuffle exchanges are expected
- checkCometExchange(shuffled1, 2, !columnarShuffle)
- checkSparkAnswer(shuffled1)
-
- val shuffled2 = df
- .repartitionByRange(10, $"_2")
- .select($"_1", $"_1" + 1, $"_2" + 2)
- .repartition(10, $"_1")
- .filter($"_1" > 1)
-
- // 2 Comet shuffle exchanges are expected, if columnar shuffle is
enabled
- if (columnarShuffle) {
- checkCometExchange(shuffled2, 2, !columnarShuffle)
- } else {
- // Because the first exchange from the bottom is range exchange
which native shuffle
- // doesn't support. So Comet exec operators stop before the first
exchange and thus
- // there is no Comet exchange.
- checkCometExchange(shuffled2, 0, true)
- }
- checkSparkAnswer(shuffled2)
- }
- }
+ test("native operator after columnar shuffle") {
+ withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+ val df = sql("SELECT * FROM tbl")
+
+ val shuffled1 = df
+ .repartition(10, $"_2")
+ .select($"_1", $"_1" + 1, $"_2" + 2)
+ .repartition(10, $"_1")
+ .filter($"_1" > 1)
+
+ // 2 Comet shuffle exchanges are expected
+ checkCometExchange(shuffled1, 2, false)
+ checkSparkAnswer(shuffled1)
+
+ val shuffled2 = df
+ .repartitionByRange(10, $"_2")
+ .select($"_1", $"_1" + 1, $"_2" + 2)
+ .repartition(10, $"_1")
+ .filter($"_1" > 1)
+
+ // 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled
+ checkCometExchange(shuffled2, 2, false)
+ checkSparkAnswer(shuffled2)
}
}
- test("Comet shuffle: single partition") {
- Seq(true, false).foreach { execEnabled =>
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key ->
(!execEnabled).toString) {
- withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
- val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
+ test("columnar shuffle: single partition") {
+ withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+ val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
- val shuffled = df.repartition(1)
+ val shuffled = df.repartition(1)
- checkCometExchange(shuffled, 1, execEnabled)
- checkSparkAnswer(shuffled)
- }
- }
+ checkCometExchange(shuffled, 1, false)
+ checkSparkAnswer(shuffled)
}
}
- test("fix: comet native shuffle with binary data") {
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
- withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
- val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2
FROM tbl")
-
- val shuffled = df.repartition(1, $"binary")
+ test("sort-based columnar shuffle metrics") {
+ withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+ val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
+ val shuffled = df.repartition(201, $"_1")
- checkCometExchange(shuffled, 1, true)
- checkSparkAnswer(shuffled)
- }
- }
- }
+ checkCometExchange(shuffled, 1, false)
+ checkSparkAnswer(shuffled)
- test("Comet shuffle metrics") {
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
- withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
- val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
- val shuffled = df.repartition(10, $"_1")
+ // Materialize the shuffled data
+ shuffled.collect()
+ val metrics = find(shuffled.queryExecution.executedPlan) {
+ case _: CometShuffleExchangeExec => true
+ case _ => false
+ }.map(_.metrics).get
- checkCometExchange(shuffled, 1, true)
- checkSparkAnswer(shuffled)
+ assert(metrics.contains("shuffleRecordsWritten"))
+ assert(metrics("shuffleRecordsWritten").value == 5L)
- // Materialize the shuffled data
- shuffled.collect()
- val metrics = find(shuffled.queryExecution.executedPlan) {
- case _: CometShuffleExchangeExec => true
- case _ => false
- }.map(_.metrics).get
+ assert(metrics.contains("shuffleBytesWritten"))
+ assert(metrics("shuffleBytesWritten").value > 0)
- assert(metrics.contains("shuffleRecordsWritten"))
- assert(metrics("shuffleRecordsWritten").value == 5L)
- }
- }
- }
-
- test("sort-based shuffle metrics") {
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> "false",
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
- withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
- val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
- val shuffled = df.repartition(201, $"_1")
-
- checkCometExchange(shuffled, 1, false)
- checkSparkAnswer(shuffled)
-
- // Materialize the shuffled data
- shuffled.collect()
- val metrics = find(shuffled.queryExecution.executedPlan) {
- case _: CometShuffleExchangeExec => true
- case _ => false
- }.map(_.metrics).get
-
- assert(metrics.contains("shuffleRecordsWritten"))
- assert(metrics("shuffleRecordsWritten").value == 5L)
-
- assert(metrics.contains("shuffleBytesWritten"))
- assert(metrics("shuffleBytesWritten").value > 0)
-
- assert(metrics.contains("shuffleWriteTime"))
- assert(metrics("shuffleWriteTime").value > 0)
- }
+ assert(metrics.contains("shuffleWriteTime"))
+ assert(metrics("shuffleWriteTime").value > 0)
}
}
}
-class CometAsyncShuffleSuite extends CometShuffleSuiteBase {
+class CometAsyncShuffleSuite extends CometColumnarShuffleSuite {
override protected val asyncShuffleEnable: Boolean = true
protected val adaptiveExecutionEnabled: Boolean = true
}
-class CometAsyncNonFastMergeShuffleSuite extends CometShuffleSuiteBase {
- override protected val fastMergeEnabled: Boolean = false
-
- protected val adaptiveExecutionEnabled: Boolean = true
-
- protected val asyncShuffleEnable: Boolean = true
-}
-
-class CometNonFastMergeShuffleSuite extends CometShuffleSuiteBase {
- override protected val fastMergeEnabled: Boolean = false
-
- protected val adaptiveExecutionEnabled: Boolean = true
-
- protected val asyncShuffleEnable: Boolean = false
-}
-
-class CometShuffleSuite extends CometShuffleSuiteBase {
+class CometShuffleSuite extends CometColumnarShuffleSuite {
override protected val asyncShuffleEnable: Boolean = false
protected val adaptiveExecutionEnabled: Boolean = true
-
- import testImplicits._
-
- // TODO: this test takes ~5mins to run, we should reduce the test time.
- // Because this test takes too long, we only have it in `CometShuffleSuite`.
- test("fix: Too many task completion listener of ArrowReaderIterator causes
OOM") {
- withSQLConf(
- CometConf.COMET_EXEC_ENABLED.key -> "true",
- CometConf.COMET_BATCH_SIZE.key -> "1",
- CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false",
- CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
- withParquetTable((0 until 1000000).map(i => (1, (i + 1).toLong)), "tbl")
{
Review Comment:
I remember I set a big number because the bug only happens for many rows. We
probably can reduce the number just no sure which is a proper one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]