Repository: spark Updated Branches: refs/heads/master 17d83e1ee -> 1a45d2b2c
[SPARK-19670][SQL][TEST] Enable Bucketed Table Reading and Writing Testing Without Hive Support ### What changes were proposed in this pull request? Bucketed table reading and writing does not need Hive support. We can move the test cases from `sql/hive` to `sql/core`. After this PR, we can improve the test case coverage. Bucket table reading and writing can be tested with and without Hive support. ### How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes #17004 from gatorsmile/mvTestCaseForBuckets. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a45d2b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a45d2b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a45d2b2 Branch: refs/heads/master Commit: 1a45d2b2cc6466841fb73da21a61b61f14a5d5fb Parents: 17d83e1 Author: Xiao Li <[email protected]> Authored: Tue Feb 21 19:30:36 2017 -0800 Committer: gatorsmile <[email protected]> Committed: Tue Feb 21 19:30:36 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/sources/BucketedReadSuite.scala | 572 +++++++++++++++++++ .../spark/sql/sources/BucketedWriteSuite.scala | 249 ++++++++ .../spark/sql/sources/BucketedReadSuite.scala | 562 ------------------ .../BucketedReadWithHiveSupportSuite.scala | 28 + .../spark/sql/sources/BucketedWriteSuite.scala | 237 -------- .../BucketedWriteWithHiveSupportSuite.scala | 30 + 6 files changed, 879 insertions(+), 799 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1a45d2b2/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala new file mode 100644 index 0000000..9b65419 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -0,0 +1,572 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File +import java.net.URI + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.util.Utils +import org.apache.spark.util.collection.BitSet + +class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSQLContext { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + } +} + + +abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { + import testImplicits._ + + private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + private lazy val nullDF = (for { + i <- 0 to 50 + s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g") + } yield (i % 5, s, i % 13)).toDF("i", "j", "k") + + test("read bucketed data") { + withTable("bucketed_table") { + df.write + .format("parquet") + .partitionBy("i") + .bucketBy(8, "j", "k") + .saveAsTable("bucketed_table") + + for (i <- 0 until 5) { + val table = spark.table("bucketed_table").filter($"i" === i) + val query = table.queryExecution + val output = query.analyzed.output + val rdd = query.toRdd + + assert(rdd.partitions.length == 8) + + val attrs = table.select("j", "k").queryExecution.analyzed.output + val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => { + val getBucketId = UnsafeProjection.create( + HashPartitioning(attrs, 8).partitionIdExpression :: Nil, + output) + rows.map(row => getBucketId(row).getInt(0) -> index) + }) + checkBucketId.collect().foreach(r => assert(r._1 == r._2)) + } + } + } + + // To verify if the bucket pruning works, this function checks two conditions: + // 1) Check if the pruned buckets (before filtering) are empty. + // 2) Verify the final result is the same as the expected one + private def checkPrunedAnswers( + bucketSpec: BucketSpec, + bucketValues: Seq[Integer], + filterCondition: Column, + originalDataFrame: DataFrame): Unit = { + // This test verifies parts of the plan. Disable whole stage codegen. + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k") + val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec + // Limit: bucket pruning only works when the bucket column has one and only one column + assert(bucketColumnNames.length == 1) + val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head) + val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex) + val matchedBuckets = new BitSet(numBuckets) + bucketValues.foreach { value => + matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value)) + } + + // Filter could hide the bug in bucket pruning. Thus, skipping all the filters + val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan + val rdd = plan.find(_.isInstanceOf[DataSourceScanExec]) + assert(rdd.isDefined, plan) + + val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => + if (matchedBuckets.get(index % numBuckets) && iter.nonEmpty) Iterator(index) else Iterator() + } + // TODO: These tests are not testing the right columns. +// // checking if all the pruned buckets are empty +// val invalidBuckets = checkedResult.collect().toList +// if (invalidBuckets.nonEmpty) { +// fail(s"Buckets $invalidBuckets should have been pruned from:\n$plan") +// } + + checkAnswer( + bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), + originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")) + } + } + + test("read partitioning bucketed tables with bucket pruning filters") { + withTable("bucketed_table") { + val numBuckets = 8 + val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) + // json does not support predicate push-down, and thus json is used here + df.write + .format("json") + .partitionBy("i") + .bucketBy(numBuckets, "j") + .saveAsTable("bucketed_table") + + for (j <- 0 until 13) { + // Case 1: EqualTo + checkPrunedAnswers( + bucketSpec, + bucketValues = j :: Nil, + filterCondition = $"j" === j, + df) + + // Case 2: EqualNullSafe + checkPrunedAnswers( + bucketSpec, + bucketValues = j :: Nil, + filterCondition = $"j" <=> j, + df) + + // Case 3: In + checkPrunedAnswers( + bucketSpec, + bucketValues = Seq(j, j + 1, j + 2, j + 3), + filterCondition = $"j".isin(j, j + 1, j + 2, j + 3), + df) + } + } + } + + test("read non-partitioning bucketed tables with bucket pruning filters") { + withTable("bucketed_table") { + val numBuckets = 8 + val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) + // json does not support predicate push-down, and thus json is used here + df.write + .format("json") + .bucketBy(numBuckets, "j") + .saveAsTable("bucketed_table") + + for (j <- 0 until 13) { + checkPrunedAnswers( + bucketSpec, + bucketValues = j :: Nil, + filterCondition = $"j" === j, + df) + } + } + } + + test("read partitioning bucketed tables having null in bucketing key") { + withTable("bucketed_table") { + val numBuckets = 8 + val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) + // json does not support predicate push-down, and thus json is used here + nullDF.write + .format("json") + .partitionBy("i") + .bucketBy(numBuckets, "j") + .saveAsTable("bucketed_table") + + // Case 1: isNull + checkPrunedAnswers( + bucketSpec, + bucketValues = null :: Nil, + filterCondition = $"j".isNull, + nullDF) + + // Case 2: <=> null + checkPrunedAnswers( + bucketSpec, + bucketValues = null :: Nil, + filterCondition = $"j" <=> null, + nullDF) + } + } + + test("read partitioning bucketed tables having composite filters") { + withTable("bucketed_table") { + val numBuckets = 8 + val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) + // json does not support predicate push-down, and thus json is used here + df.write + .format("json") + .partitionBy("i") + .bucketBy(numBuckets, "j") + .saveAsTable("bucketed_table") + + for (j <- 0 until 13) { + checkPrunedAnswers( + bucketSpec, + bucketValues = j :: Nil, + filterCondition = $"j" === j && $"k" > $"j", + df) + + checkPrunedAnswers( + bucketSpec, + bucketValues = j :: Nil, + filterCondition = $"j" === j && $"i" > j % 5, + df) + } + } + } + + private lazy val df1 = + (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") + private lazy val df2 = + (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") + + case class BucketedTableTestSpec( + bucketSpec: Option[BucketSpec], + numPartitions: Int = 10, + expectedShuffle: Boolean = true, + expectedSort: Boolean = true) + + /** + * A helper method to test the bucket read functionality using join. It will save `df1` and `df2` + * to hive tables, bucketed or not, according to the given bucket specifics. Next we will join + * these 2 tables, and firstly make sure the answer is corrected, and then check if the shuffle + * exists as user expected according to the `shuffleLeft` and `shuffleRight`. + */ + private def testBucketing( + bucketedTableTestSpecLeft: BucketedTableTestSpec, + bucketedTableTestSpecRight: BucketedTableTestSpec, + joinType: String = "inner", + joinCondition: (DataFrame, DataFrame) => Column): Unit = { + val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) = + bucketedTableTestSpecLeft + val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) = + bucketedTableTestSpecRight + + withTable("bucketed_table1", "bucketed_table2") { + def withBucket( + writer: DataFrameWriter[Row], + bucketSpec: Option[BucketSpec]): DataFrameWriter[Row] = { + bucketSpec.map { spec => + writer.bucketBy( + spec.numBuckets, + spec.bucketColumnNames.head, + spec.bucketColumnNames.tail: _*) + + if (spec.sortColumnNames.nonEmpty) { + writer.sortBy( + spec.sortColumnNames.head, + spec.sortColumnNames.tail: _* + ) + } else { + writer + } + }.getOrElse(writer) + } + + withBucket(df1.repartition(numPartitionsLeft).write.format("parquet"), bucketSpecLeft) + .saveAsTable("bucketed_table1") + withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight) + .saveAsTable("bucketed_table2") + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val t1 = spark.table("bucketed_table1") + val t2 = spark.table("bucketed_table2") + val joined = t1.join(t2, joinCondition(t1, t2), joinType) + + // First check the result is corrected. + checkAnswer( + joined.sort("bucketed_table1.k", "bucketed_table2.k"), + df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k")) + + assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec]) + val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec] + + // check existence of shuffle + assert( + joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft, + s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}") + assert( + joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight, + s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}") + + // check existence of sort + assert( + joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft, + s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}") + assert( + joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight, + s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") + } + } + } + + private def joinCondition(joinCols: Seq[String]) (left: DataFrame, right: DataFrame): Column = { + joinCols.map(col => left(col) === right(col)).reduce(_ && _) + } + + test("avoid shuffle when join 2 bucketed tables") { + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + + // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 + ignore("avoid shuffle when join keys are a super-set of bucket keys") { + val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + + test("only shuffle one side when join bucketed table and non-bucketed table") { + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + + test("only shuffle one side when 2 bucketed tables have different bucket number") { + val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + + test("only shuffle one side when 2 bucketed tables have different bucket keys") { + val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i")) + ) + } + + test("shuffle when join keys are not equal to bucket keys") { + val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("j")) + ) + } + + test("shuffle when join 2 bucketed tables with bucketing disabled") { + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) + withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + } + + test("check sort and shuffle when bucket and sort columns are join keys") { + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given relation. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the relation has sort columns set + // Therefore, we still need to keep the Sort in both sides. + val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + + val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + val bucketedTableTestSpecRight1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, + bucketedTableTestSpecRight = bucketedTableTestSpecRight1, + joinCondition = joinCondition(Seq("i", "j")) + ) + + val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, + bucketedTableTestSpecRight = bucketedTableTestSpecRight2, + joinCondition = joinCondition(Seq("i", "j")) + ) + + val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + val bucketedTableTestSpecRight3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, + bucketedTableTestSpecRight = bucketedTableTestSpecRight3, + joinCondition = joinCondition(Seq("i", "j")) + ) + + val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, + bucketedTableTestSpecRight = bucketedTableTestSpecRight4, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + + test("avoid shuffle and sort when sort columns are a super set of join keys") { + val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) + val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) + val bucketedTableTestSpecLeft = BucketedTableTestSpec( + bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec( + bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i")) + ) + } + + test("only sort one side when sort columns are different") { + val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) + val bucketedTableTestSpecLeft = BucketedTableTestSpec( + bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec( + bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + + test("only sort one side when sort columns are same but their ordering is different") { + val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) + val bucketedTableTestSpecLeft = BucketedTableTestSpec( + bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec( + bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + + test("avoid shuffle when grouping keys are equal to bucket keys") { + withTable("bucketed_table") { + df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table") + val tbl = spark.table("bucketed_table") + val agged = tbl.groupBy("i", "j").agg(max("k")) + + checkAnswer( + agged.sort("i", "j"), + df1.groupBy("i", "j").agg(max("k")).sort("i", "j")) + + assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchange]).isEmpty) + } + } + + test("avoid shuffle when grouping keys are a super-set of bucket keys") { + withTable("bucketed_table") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") + val tbl = spark.table("bucketed_table") + val agged = tbl.groupBy("i", "j").agg(max("k")) + + checkAnswer( + agged.sort("i", "j"), + df1.groupBy("i", "j").agg(max("k")).sort("i", "j")) + + assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchange]).isEmpty) + } + } + + test("SPARK-17698 Join predicates should not contain filter clauses") { + val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i"))) + val bucketedTableTestSpecLeft = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinType = "fullouter", + joinCondition = (left: DataFrame, right: DataFrame) => { + val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _) + val filterLeft = left("i") === Literal("1") + val filterRight = right("i") === Literal("1") + joinPredicates && filterLeft && filterRight + } + ) + } + + test("error if there exists any malformed bucket files") { + withTable("bucketed_table") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") + val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath + val tableDir = new File(warehouseFilePath, "bucketed_table") + Utils.deleteRecursively(tableDir) + df1.write.parquet(tableDir.getAbsolutePath) + + val agged = spark.table("bucketed_table").groupBy("i").count() + val error = intercept[Exception] { + agged.count() + } + + assert(error.getCause().toString contains "Invalid bucket file") + } + } + + test("disable bucketing when the output doesn't contain all bucketing columns") { + withTable("bucketed_table") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") + + checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j")) + + checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")), + df1.groupBy("j").agg(max("k"))) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1a45d2b2/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala new file mode 100644 index 0000000..9082261 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File +import java.net.URI + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.datasources.BucketingUtils +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} + +class BucketedWriteWithoutHiveSupportSuite extends BucketedWriteSuite with SharedSQLContext { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + } + + override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "json") +} + +abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { + import testImplicits._ + + protected def fileFormatsToTest: Seq[String] + + test("bucketed by non-existing column") { + val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") + intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt")) + } + + test("numBuckets be greater than 0 but less than 100000") { + val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") + + Seq(-1, 0, 100000).foreach(numBuckets => { + val e = intercept[AnalysisException](df.write.bucketBy(numBuckets, "i").saveAsTable("tt")) + assert( + e.getMessage.contains("Number of buckets should be greater than 0 but less than 100000")) + }) + } + + test("specify sorting columns without bucketing columns") { + val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") + intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt")) + } + + test("sorting by non-orderable column") { + val df = Seq("a" -> Map(1 -> 1), "b" -> Map(2 -> 2)).toDF("i", "j") + intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt")) + } + + test("write bucketed data using save()") { + val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") + + val e = intercept[AnalysisException] { + df.write.bucketBy(2, "i").parquet("/tmp/path") + } + assert(e.getMessage == "'save' does not support bucketing right now;") + } + + test("write bucketed data using insertInto()") { + val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") + + val e = intercept[AnalysisException] { + df.write.bucketBy(2, "i").insertInto("tt") + } + assert(e.getMessage == "'insertInto' does not support bucketing right now;") + } + + private lazy val df = { + (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + } + + def tableDir: File = { + val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") + new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier))) + } + + /** + * A helper method to check the bucket write functionality in low level, i.e. check the written + * bucket files to see if the data are correct. User should pass in a data dir that these bucket + * files are written to, and the format of data(parquet, json, etc.), and the bucketing + * information. + */ + private def testBucketing( + dataDir: File, + source: String, + numBuckets: Int, + bucketCols: Seq[String], + sortCols: Seq[String] = Nil): Unit = { + val allBucketFiles = dataDir.listFiles().filterNot(f => + f.getName.startsWith(".") || f.getName.startsWith("_") + ) + + for (bucketFile <- allBucketFiles) { + val bucketId = BucketingUtils.getBucketId(bucketFile.getName).getOrElse { + fail(s"Unable to find the related bucket files.") + } + + // Remove the duplicate columns in bucketCols and sortCols; + // Otherwise, we got analysis errors due to duplicate names + val selectedColumns = (bucketCols ++ sortCols).distinct + // We may lose the type information after write(e.g. json format doesn't keep schema + // information), here we get the types from the original dataframe. + val types = df.select(selectedColumns.map(col): _*).schema.map(_.dataType) + val columns = selectedColumns.zip(types).map { + case (colName, dt) => col(colName).cast(dt) + } + + // Read the bucket file into a dataframe, so that it's easier to test. + val readBack = spark.read.format(source) + .load(bucketFile.getAbsolutePath) + .select(columns: _*) + + // If we specified sort columns while writing bucket table, make sure the data in this + // bucket file is already sorted. + if (sortCols.nonEmpty) { + checkAnswer(readBack.sort(sortCols.map(col): _*), readBack.collect()) + } + + // Go through all rows in this bucket file, calculate bucket id according to bucket column + // values, and make sure it equals to the expected bucket id that inferred from file name. + val qe = readBack.select(bucketCols.map(col): _*).queryExecution + val rows = qe.toRdd.map(_.copy()).collect() + val getBucketId = UnsafeProjection.create( + HashPartitioning(qe.analyzed.output, numBuckets).partitionIdExpression :: Nil, + qe.analyzed.output) + + for (row <- rows) { + val actualBucketId = getBucketId(row).getInt(0) + assert(actualBucketId == bucketId) + } + } + } + + test("write bucketed data") { + for (source <- fileFormatsToTest) { + withTable("bucketed_table") { + df.write + .format(source) + .partitionBy("i") + .bucketBy(8, "j", "k") + .saveAsTable("bucketed_table") + + for (i <- 0 until 5) { + testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j", "k")) + } + } + } + } + + test("write bucketed data with sortBy") { + for (source <- fileFormatsToTest) { + withTable("bucketed_table") { + df.write + .format(source) + .partitionBy("i") + .bucketBy(8, "j") + .sortBy("k") + .saveAsTable("bucketed_table") + + for (i <- 0 until 5) { + testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j"), Seq("k")) + } + } + } + } + + test("write bucketed data with the overlapping bucketBy/sortBy and partitionBy columns") { + val e1 = intercept[AnalysisException](df.write + .partitionBy("i", "j") + .bucketBy(8, "j", "k") + .sortBy("k") + .saveAsTable("bucketed_table")) + assert(e1.message.contains("bucketing column 'j' should not be part of partition columns")) + + val e2 = intercept[AnalysisException](df.write + .partitionBy("i", "j") + .bucketBy(8, "k") + .sortBy("i") + .saveAsTable("bucketed_table")) + assert(e2.message.contains("bucket sorting column 'i' should not be part of partition columns")) + } + + test("write bucketed data without partitionBy") { + for (source <- fileFormatsToTest) { + withTable("bucketed_table") { + df.write + .format(source) + .bucketBy(8, "i", "j") + .saveAsTable("bucketed_table") + + testBucketing(tableDir, source, 8, Seq("i", "j")) + } + } + } + + test("write bucketed data without partitionBy with sortBy") { + for (source <- fileFormatsToTest) { + withTable("bucketed_table") { + df.write + .format(source) + .bucketBy(8, "i", "j") + .sortBy("k") + .saveAsTable("bucketed_table") + + testBucketing(tableDir, source, 8, Seq("i", "j"), Seq("k")) + } + } + } + + test("write bucketed data with bucketing disabled") { + // The configuration BUCKETING_ENABLED does not affect the writing path + withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { + for (source <- fileFormatsToTest) { + withTable("bucketed_table") { + df.write + .format(source) + .partitionBy("i") + .bucketBy(8, "j", "k") + .saveAsTable("bucketed_table") + + for (i <- 0 until 5) { + testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j", "k")) + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1a45d2b2/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala deleted file mode 100644 index 4fc72b9..0000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ /dev/null @@ -1,562 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import java.io.File -import java.net.URI - -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec} -import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.execution.exchange.ShuffleExchange -import org.apache.spark.sql.execution.joins.SortMergeJoinExec -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.Utils -import org.apache.spark.util.collection.BitSet - -class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - import testImplicits._ - - private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - private val nullDF = (for { - i <- 0 to 50 - s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g") - } yield (i % 5, s, i % 13)).toDF("i", "j", "k") - - test("read bucketed data") { - withTable("bucketed_table") { - df.write - .format("parquet") - .partitionBy("i") - .bucketBy(8, "j", "k") - .saveAsTable("bucketed_table") - - for (i <- 0 until 5) { - val table = spark.table("bucketed_table").filter($"i" === i) - val query = table.queryExecution - val output = query.analyzed.output - val rdd = query.toRdd - - assert(rdd.partitions.length == 8) - - val attrs = table.select("j", "k").queryExecution.analyzed.output - val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => { - val getBucketId = UnsafeProjection.create( - HashPartitioning(attrs, 8).partitionIdExpression :: Nil, - output) - rows.map(row => getBucketId(row).getInt(0) -> index) - }) - checkBucketId.collect().foreach(r => assert(r._1 == r._2)) - } - } - } - - // To verify if the bucket pruning works, this function checks two conditions: - // 1) Check if the pruned buckets (before filtering) are empty. - // 2) Verify the final result is the same as the expected one - private def checkPrunedAnswers( - bucketSpec: BucketSpec, - bucketValues: Seq[Integer], - filterCondition: Column, - originalDataFrame: DataFrame): Unit = { - // This test verifies parts of the plan. Disable whole stage codegen. - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { - val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k") - val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec - // Limit: bucket pruning only works when the bucket column has one and only one column - assert(bucketColumnNames.length == 1) - val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head) - val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex) - val matchedBuckets = new BitSet(numBuckets) - bucketValues.foreach { value => - matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value)) - } - - // Filter could hide the bug in bucket pruning. Thus, skipping all the filters - val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan - val rdd = plan.find(_.isInstanceOf[DataSourceScanExec]) - assert(rdd.isDefined, plan) - - val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => - if (matchedBuckets.get(index % numBuckets) && iter.nonEmpty) Iterator(index) else Iterator() - } - // TODO: These tests are not testing the right columns. -// // checking if all the pruned buckets are empty -// val invalidBuckets = checkedResult.collect().toList -// if (invalidBuckets.nonEmpty) { -// fail(s"Buckets $invalidBuckets should have been pruned from:\n$plan") -// } - - checkAnswer( - bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), - originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")) - } - } - - test("read partitioning bucketed tables with bucket pruning filters") { - withTable("bucketed_table") { - val numBuckets = 8 - val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) - // json does not support predicate push-down, and thus json is used here - df.write - .format("json") - .partitionBy("i") - .bucketBy(numBuckets, "j") - .saveAsTable("bucketed_table") - - for (j <- 0 until 13) { - // Case 1: EqualTo - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" === j, - df) - - // Case 2: EqualNullSafe - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" <=> j, - df) - - // Case 3: In - checkPrunedAnswers( - bucketSpec, - bucketValues = Seq(j, j + 1, j + 2, j + 3), - filterCondition = $"j".isin(j, j + 1, j + 2, j + 3), - df) - } - } - } - - test("read non-partitioning bucketed tables with bucket pruning filters") { - withTable("bucketed_table") { - val numBuckets = 8 - val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) - // json does not support predicate push-down, and thus json is used here - df.write - .format("json") - .bucketBy(numBuckets, "j") - .saveAsTable("bucketed_table") - - for (j <- 0 until 13) { - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" === j, - df) - } - } - } - - test("read partitioning bucketed tables having null in bucketing key") { - withTable("bucketed_table") { - val numBuckets = 8 - val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) - // json does not support predicate push-down, and thus json is used here - nullDF.write - .format("json") - .partitionBy("i") - .bucketBy(numBuckets, "j") - .saveAsTable("bucketed_table") - - // Case 1: isNull - checkPrunedAnswers( - bucketSpec, - bucketValues = null :: Nil, - filterCondition = $"j".isNull, - nullDF) - - // Case 2: <=> null - checkPrunedAnswers( - bucketSpec, - bucketValues = null :: Nil, - filterCondition = $"j" <=> null, - nullDF) - } - } - - test("read partitioning bucketed tables having composite filters") { - withTable("bucketed_table") { - val numBuckets = 8 - val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil) - // json does not support predicate push-down, and thus json is used here - df.write - .format("json") - .partitionBy("i") - .bucketBy(numBuckets, "j") - .saveAsTable("bucketed_table") - - for (j <- 0 until 13) { - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" === j && $"k" > $"j", - df) - - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" === j && $"i" > j % 5, - df) - } - } - } - - private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") - private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") - - case class BucketedTableTestSpec( - bucketSpec: Option[BucketSpec], - numPartitions: Int = 10, - expectedShuffle: Boolean = true, - expectedSort: Boolean = true) - - /** - * A helper method to test the bucket read functionality using join. It will save `df1` and `df2` - * to hive tables, bucketed or not, according to the given bucket specifics. Next we will join - * these 2 tables, and firstly make sure the answer is corrected, and then check if the shuffle - * exists as user expected according to the `shuffleLeft` and `shuffleRight`. - */ - private def testBucketing( - bucketedTableTestSpecLeft: BucketedTableTestSpec, - bucketedTableTestSpecRight: BucketedTableTestSpec, - joinType: String = "inner", - joinCondition: (DataFrame, DataFrame) => Column): Unit = { - val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) = - bucketedTableTestSpecLeft - val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) = - bucketedTableTestSpecRight - - withTable("bucketed_table1", "bucketed_table2") { - def withBucket( - writer: DataFrameWriter[Row], - bucketSpec: Option[BucketSpec]): DataFrameWriter[Row] = { - bucketSpec.map { spec => - writer.bucketBy( - spec.numBuckets, - spec.bucketColumnNames.head, - spec.bucketColumnNames.tail: _*) - - if (spec.sortColumnNames.nonEmpty) { - writer.sortBy( - spec.sortColumnNames.head, - spec.sortColumnNames.tail: _* - ) - } else { - writer - } - }.getOrElse(writer) - } - - withBucket(df1.repartition(numPartitionsLeft).write.format("parquet"), bucketSpecLeft) - .saveAsTable("bucketed_table1") - withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight) - .saveAsTable("bucketed_table2") - - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { - val t1 = spark.table("bucketed_table1") - val t2 = spark.table("bucketed_table2") - val joined = t1.join(t2, joinCondition(t1, t2), joinType) - - // First check the result is corrected. - checkAnswer( - joined.sort("bucketed_table1.k", "bucketed_table2.k"), - df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k")) - - assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec]) - val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec] - - // check existence of shuffle - assert( - joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft, - s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}") - assert( - joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight, - s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}") - - // check existence of sort - assert( - joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft, - s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}") - assert( - joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight, - s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") - } - } - } - - private def joinCondition(joinCols: Seq[String]) (left: DataFrame, right: DataFrame): Column = { - joinCols.map(col => left(col) === right(col)).reduce(_ && _) - } - - test("avoid shuffle when join 2 bucketed tables") { - val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - - // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 - ignore("avoid shuffle when join keys are a super-set of bucket keys") { - val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - - test("only shuffle one side when join bucketed table and non-bucketed table") { - val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - - test("only shuffle one side when 2 bucketed tables have different bucket number") { - val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil)) - val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - - test("only shuffle one side when 2 bucketed tables have different bucket keys") { - val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil)) - val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i")) - ) - } - - test("shuffle when join keys are not equal to bucket keys") { - val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) - val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("j")) - ) - } - - test("shuffle when join 2 bucketed tables with bucketing disabled") { - val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) - val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) - val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) - withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - } - - test("check sort and shuffle when bucket and sort columns are join keys") { - // In case of bucketing, its possible to have multiple files belonging to the - // same bucket in a given relation. Each of these files are locally sorted - // but those files combined together are not globally sorted. Given that, - // the RDD partition will not be sorted even if the relation has sort columns set - // Therefore, we still need to keep the Sort in both sides. - val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - - val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketedTableTestSpecRight1 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, - bucketedTableTestSpecRight = bucketedTableTestSpecRight1, - joinCondition = joinCondition(Seq("i", "j")) - ) - - val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight2 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, - bucketedTableTestSpecRight = bucketedTableTestSpecRight2, - joinCondition = joinCondition(Seq("i", "j")) - ) - - val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketedTableTestSpecRight3 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, - bucketedTableTestSpecRight = bucketedTableTestSpecRight3, - joinCondition = joinCondition(Seq("i", "j")) - ) - - val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight4 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, - bucketedTableTestSpecRight = bucketedTableTestSpecRight4, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - - test("avoid shuffle and sort when sort columns are a super set of join keys") { - val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) - val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) - val bucketedTableTestSpecLeft = BucketedTableTestSpec( - bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec( - bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i")) - ) - } - - test("only sort one side when sort columns are different") { - val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) - val bucketedTableTestSpecLeft = BucketedTableTestSpec( - bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec( - bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - - test("only sort one side when sort columns are same but their ordering is different") { - val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) - val bucketedTableTestSpecLeft = BucketedTableTestSpec( - bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec( - bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j")) - ) - } - - test("avoid shuffle when grouping keys are equal to bucket keys") { - withTable("bucketed_table") { - df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table") - val tbl = spark.table("bucketed_table") - val agged = tbl.groupBy("i", "j").agg(max("k")) - - checkAnswer( - agged.sort("i", "j"), - df1.groupBy("i", "j").agg(max("k")).sort("i", "j")) - - assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchange]).isEmpty) - } - } - - test("avoid shuffle when grouping keys are a super-set of bucket keys") { - withTable("bucketed_table") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - val tbl = spark.table("bucketed_table") - val agged = tbl.groupBy("i", "j").agg(max("k")) - - checkAnswer( - agged.sort("i", "j"), - df1.groupBy("i", "j").agg(max("k")).sort("i", "j")) - - assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchange]).isEmpty) - } - } - - test("SPARK-17698 Join predicates should not contain filter clauses") { - val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i"))) - val bucketedTableTestSpecLeft = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinType = "fullouter", - joinCondition = (left: DataFrame, right: DataFrame) => { - val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _) - val filterLeft = left("i") === Literal("1") - val filterRight = right("i") === Literal("1") - joinPredicates && filterLeft && filterRight - } - ) - } - - test("error if there exists any malformed bucket files") { - withTable("bucketed_table") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - val warehouseFilePath = new URI(hiveContext.sparkSession.getWarehousePath).getPath - val tableDir = new File(warehouseFilePath, "bucketed_table") - Utils.deleteRecursively(tableDir) - df1.write.parquet(tableDir.getAbsolutePath) - - val agged = spark.table("bucketed_table").groupBy("i").count() - val error = intercept[Exception] { - agged.count() - } - - assert(error.getCause().toString contains "Invalid bucket file") - } - } - - test("disable bucketing when the output doesn't contain all bucketing columns") { - withTable("bucketed_table") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - - checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j")) - - checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")), - df1.groupBy("j").agg(max("k"))) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/1a45d2b2/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala new file mode 100644 index 0000000..f277f99 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION + +class BucketedReadWithHiveSupportSuite extends BucketedReadSuite with TestHiveSingleton { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1a45d2b2/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala deleted file mode 100644 index 61cef2a..0000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import java.io.File -import java.net.URI - -import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.datasources.BucketingUtils -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils - -class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - import testImplicits._ - - test("bucketed by non-existing column") { - val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt")) - } - - test("numBuckets be greater than 0 but less than 100000") { - val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - - Seq(-1, 0, 100000).foreach(numBuckets => { - val e = intercept[AnalysisException](df.write.bucketBy(numBuckets, "i").saveAsTable("tt")) - assert( - e.getMessage.contains("Number of buckets should be greater than 0 but less than 100000")) - }) - } - - test("specify sorting columns without bucketing columns") { - val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt")) - } - - test("sorting by non-orderable column") { - val df = Seq("a" -> Map(1 -> 1), "b" -> Map(2 -> 2)).toDF("i", "j") - intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt")) - } - - test("write bucketed data using save()") { - val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - - val e = intercept[AnalysisException] { - df.write.bucketBy(2, "i").parquet("/tmp/path") - } - assert(e.getMessage == "'save' does not support bucketing right now;") - } - - test("write bucketed data using insertInto()") { - val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - - val e = intercept[AnalysisException] { - df.write.bucketBy(2, "i").insertInto("tt") - } - assert(e.getMessage == "'insertInto' does not support bucketing right now;") - } - - private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - - def tableDir: File = { - val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier))) - } - - /** - * A helper method to check the bucket write functionality in low level, i.e. check the written - * bucket files to see if the data are correct. User should pass in a data dir that these bucket - * files are written to, and the format of data(parquet, json, etc.), and the bucketing - * information. - */ - private def testBucketing( - dataDir: File, - source: String, - numBuckets: Int, - bucketCols: Seq[String], - sortCols: Seq[String] = Nil): Unit = { - val allBucketFiles = dataDir.listFiles().filterNot(f => - f.getName.startsWith(".") || f.getName.startsWith("_") - ) - - for (bucketFile <- allBucketFiles) { - val bucketId = BucketingUtils.getBucketId(bucketFile.getName).getOrElse { - fail(s"Unable to find the related bucket files.") - } - - // Remove the duplicate columns in bucketCols and sortCols; - // Otherwise, we got analysis errors due to duplicate names - val selectedColumns = (bucketCols ++ sortCols).distinct - // We may lose the type information after write(e.g. json format doesn't keep schema - // information), here we get the types from the original dataframe. - val types = df.select(selectedColumns.map(col): _*).schema.map(_.dataType) - val columns = selectedColumns.zip(types).map { - case (colName, dt) => col(colName).cast(dt) - } - - // Read the bucket file into a dataframe, so that it's easier to test. - val readBack = spark.read.format(source) - .load(bucketFile.getAbsolutePath) - .select(columns: _*) - - // If we specified sort columns while writing bucket table, make sure the data in this - // bucket file is already sorted. - if (sortCols.nonEmpty) { - checkAnswer(readBack.sort(sortCols.map(col): _*), readBack.collect()) - } - - // Go through all rows in this bucket file, calculate bucket id according to bucket column - // values, and make sure it equals to the expected bucket id that inferred from file name. - val qe = readBack.select(bucketCols.map(col): _*).queryExecution - val rows = qe.toRdd.map(_.copy()).collect() - val getBucketId = UnsafeProjection.create( - HashPartitioning(qe.analyzed.output, numBuckets).partitionIdExpression :: Nil, - qe.analyzed.output) - - for (row <- rows) { - val actualBucketId = getBucketId(row).getInt(0) - assert(actualBucketId == bucketId) - } - } - } - - test("write bucketed data") { - for (source <- Seq("parquet", "json", "orc")) { - withTable("bucketed_table") { - df.write - .format(source) - .partitionBy("i") - .bucketBy(8, "j", "k") - .saveAsTable("bucketed_table") - - for (i <- 0 until 5) { - testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j", "k")) - } - } - } - } - - test("write bucketed data with sortBy") { - for (source <- Seq("parquet", "json", "orc")) { - withTable("bucketed_table") { - df.write - .format(source) - .partitionBy("i") - .bucketBy(8, "j") - .sortBy("k") - .saveAsTable("bucketed_table") - - for (i <- 0 until 5) { - testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j"), Seq("k")) - } - } - } - } - - test("write bucketed data with the overlapping bucketBy/sortBy and partitionBy columns") { - val e1 = intercept[AnalysisException](df.write - .partitionBy("i", "j") - .bucketBy(8, "j", "k") - .sortBy("k") - .saveAsTable("bucketed_table")) - assert(e1.message.contains("bucketing column 'j' should not be part of partition columns")) - - val e2 = intercept[AnalysisException](df.write - .partitionBy("i", "j") - .bucketBy(8, "k") - .sortBy("i") - .saveAsTable("bucketed_table")) - assert(e2.message.contains("bucket sorting column 'i' should not be part of partition columns")) - } - - test("write bucketed data without partitionBy") { - for (source <- Seq("parquet", "json", "orc")) { - withTable("bucketed_table") { - df.write - .format(source) - .bucketBy(8, "i", "j") - .saveAsTable("bucketed_table") - - testBucketing(tableDir, source, 8, Seq("i", "j")) - } - } - } - - test("write bucketed data without partitionBy with sortBy") { - for (source <- Seq("parquet", "json", "orc")) { - withTable("bucketed_table") { - df.write - .format(source) - .bucketBy(8, "i", "j") - .sortBy("k") - .saveAsTable("bucketed_table") - - testBucketing(tableDir, source, 8, Seq("i", "j"), Seq("k")) - } - } - } - - test("write bucketed data with bucketing disabled") { - // The configuration BUCKETING_ENABLED does not affect the writing path - withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { - for (source <- Seq("parquet", "json", "orc")) { - withTable("bucketed_table") { - df.write - .format(source) - .partitionBy("i") - .bucketBy(8, "j", "k") - .saveAsTable("bucketed_table") - - for (i <- 0 until 5) { - testBucketing(new File(tableDir, s"i=$i"), source, 8, Seq("j", "k")) - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/1a45d2b2/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala new file mode 100644 index 0000000..454e2f6 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION + +class BucketedWriteWithHiveSupportSuite extends BucketedWriteSuite with TestHiveSingleton { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive") + } + + override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "orc") +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
