This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 21110cd [CARBONDATA-3721][CARBONDATA-3590] Optimize Bucket Table 21110cd is described below commit 21110cd2e053e6da2a9e4341eb2fc870b3b75563 Author: Zhangshunyu <zhangshunyu1...@126.com> AuthorDate: Sun Feb 9 22:04:48 2020 +0800 [CARBONDATA-3721][CARBONDATA-3590] Optimize Bucket Table Why is this PR needed? Support Bucket Table consistent with spark to improve the join performance by avoid shuffle for bucket column. The same time, fix bugs about load/compact query of bucket. What changes were proposed in this PR? Support Bucket Table and consistent with spark to improve the join performance by avoid shuffle for bucket column. Fix bugs also. 1. For create table, ddl support both tblproperties and clustered by like hive. 2. For loading, fix some problems in loading when bucket column specified, make it clusterd into diff files based on bucket column. 3. For query, the hash impl should either keep the same for a given value or keep same with parquet table, so that the join result of diff bucket tables give correct result. By the way, the hash impl is configurable. 4. For compaction, group the block files based on bucket id, the data should hash into diff carbondata files also, otherwise the query flow will group the files based on bucket num, all data compacted will com into 1 partition and the join result will mismatch, the performace will very slow. 5. For tests, add 19 test cases in TableBucketingTestCase Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3637 --- .../core/constants/CarbonCommonConstants.java | 14 + .../core/metadata/schema/table/CarbonTable.java | 14 + .../core/util/AbstractDataFileFooterConverter.java | 3 +- docs/ddl-of-carbondata.md | 10 +- .../org/apache/carbon/flink/TestCarbonWriter.scala | 118 ++- .../cluster/sdv/generated/BucketingTestCase.scala | 167 ---- .../org/apache/carbondata/spark/CarbonOption.scala | 8 +- .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 88 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 7 +- .../scala/org/apache/spark/sql/CarbonSource.scala | 22 +- .../command/carbonTableSchemaCommon.scala | 14 +- .../management/CarbonInsertFromStageCommand.scala | 19 +- ...nAlterTableColRenameDataTypeChangeCommand.scala | 14 + .../schema/CarbonAlterTableDropColumnCommand.scala | 13 + .../table/CarbonDescribeFormattedCommand.scala | 15 + .../strategy/CarbonLateDecodeStrategy.scala | 1 + .../sql/parser/CarbonSparkSqlParserUtil.scala | 22 +- .../testsuite/binary/TestBinaryDataType.scala | 45 +- .../badrecordloger/BadRecordActionTest.scala | 8 +- .../createTable/TestCreateTableLike.scala | 2 +- .../spark/carbondata/CarbonDataSourceSuite.scala | 11 +- .../carbondata/DataLoadFailAllTypeSortTest.scala | 4 +- .../bucketing/TableBucketingTestCase.scala | 886 ++++++++++++++++++++- pom.xml | 3 +- processing/pom.xml | 16 + .../loading/CarbonDataLoadConfiguration.java | 10 + .../processing/loading/DataLoadProcessBuilder.java | 18 +- .../processing/loading/model/CarbonLoadModel.java | 13 + .../partition/impl/HashPartitionerImpl.java | 10 +- .../impl/SparkHashExpressionPartitionerImpl.java | 182 +++++ .../steps/DataConverterProcessorStepImpl.java | 22 +- .../loading/steps/DataWriterProcessorStepImpl.java | 1 + .../InputProcessorStepWithNoConverterImpl.java | 71 +- .../merger/CompactionResultSortProcessor.java | 1 + .../merger/RowResultMergerProcessor.java | 1 + .../sortdata/SingleThreadFinalSortFilesMerger.java | 2 +- .../store/CarbonFactDataHandlerModel.java | 2 +- 38 files changed, 1531 insertions(+), 328 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 7565555..333c7b6 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -2379,4 +2379,18 @@ public final class CarbonCommonConstants { */ public static final String CARBON_SI_SEGMENT_MERGE_DEFAULT = "false"; + /** + * Hash method of bucket table + */ + public static final String BUCKET_HASH_METHOD = "bucket_hash_method"; + public static final String BUCKET_HASH_METHOD_DEFAULT = "spark_hash_expression"; + public static final String BUCKET_HASH_METHOD_SPARK_EXPRESSION = "spark_hash_expression"; + public static final String BUCKET_HASH_METHOD_NATIVE = "native"; + + /** + * bucket properties + */ + public static final String BUCKET_COLUMNS = "bucket_columns"; + public static final String BUCKET_NUMBER = "bucket_number"; + } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 68bdd47..e852f5f 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -717,6 +717,20 @@ public class CarbonTable implements Serializable, Writable { } } + public String getBucketHashMethod() { + String configuredMethod = tableInfo.getFactTable().getTableProperties() + .get(CarbonCommonConstants.BUCKET_HASH_METHOD); + if (configuredMethod == null) { + return CarbonCommonConstants.BUCKET_HASH_METHOD_DEFAULT; + } else { + if (CarbonCommonConstants.BUCKET_HASH_METHOD_NATIVE.equals(configuredMethod)) { + return CarbonCommonConstants.BUCKET_HASH_METHOD_NATIVE; + } + // by default we use spark_hash_expression hash method + return CarbonCommonConstants.BUCKET_HASH_METHOD_DEFAULT; + } + } + /** * to get the normal dimension or the primitive dimension of the complex type * diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java index 91cc456..e49aacc 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java @@ -137,7 +137,8 @@ public abstract class AbstractDataFileFooterConverter { boolean isTransactionalTable) throws IOException { CarbonIndexFileReader indexReader = new CarbonIndexFileReader(configuration); List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>(); - String parentPath = filePath.substring(0, filePath.lastIndexOf("/")); + String formattedPath = filePath.replace("\\", "/"); + String parentPath = formattedPath.substring(0, formattedPath.lastIndexOf("/")); try { // open the reader if (fileData != null) { diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md index 2f4cba1..3416426 100644 --- a/docs/ddl-of-carbondata.md +++ b/docs/ddl-of-carbondata.md @@ -104,8 +104,8 @@ CarbonData DDL statements are documented here,which includes: | [CACHE_LEVEL](#caching-at-block-or-blocklet-level) | Column metadata caching level. Whether to cache column metadata of block or blocklet | | [FLAT_FOLDER](#support-flat-folder-same-as-hiveparquet) | Whether to write all the carbondata files in a single folder.Not writing segments folder during incremental load | | [LONG_STRING_COLUMNS](#string-longer-than-32000-characters) | Columns which are greater than 32K characters | -| [BUCKETNUMBER](#bucketing) | Number of buckets to be created | -| [BUCKETCOLUMNS](#bucketing) | Columns which are to be placed in buckets | +| [BUCKET_NUMBER](#bucketing) | Number of buckets to be created | +| [BUCKET_COLUMNS](#bucketing) | Columns which are to be placed in buckets | | [LOAD_MIN_SIZE_INMB](#load-minimum-data-size) | Minimum input data size per node for data loading | | [Range Column](#range-column) | partition input data by range | @@ -991,8 +991,8 @@ Users can specify which columns to include and exclude for local dictionary gene CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type, ...)] STORED AS carbondata - TBLPROPERTIES('BUCKETNUMBER'='noOfBuckets', - 'BUCKETCOLUMNS'='columnname') + TBLPROPERTIES('BUCKET_NUMBER'='noOfBuckets', + 'BUCKET_COLUMNS'='columnname') ``` **NOTE:** @@ -1011,7 +1011,7 @@ Users can specify which columns to include and exclude for local dictionary gene productBatch STRING, revenue INT) STORED AS carbondata - TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='productName') + TBLPROPERTIES ('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='productName') ``` ## CACHE diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala index 9edf7b9..1d82a75 100644 --- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala +++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala @@ -20,20 +20,23 @@ package org.apache.carbon.flink import java.util.Properties import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink -import org.apache.spark.sql.Row +import org.apache.spark.sql.{CarbonEnv, Row} import org.apache.spark.sql.test.util.QueryTest - import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.spark.sql.execution.exchange.Exchange class TestCarbonWriter extends QueryTest { val tableName = "test_flink" + val bucketTableName = "insert_bucket_table" + test("Writing flink data to local carbon table") { sql(s"DROP TABLE IF EXISTS $tableName").collect() @@ -193,6 +196,117 @@ class TestCarbonWriter extends QueryTest { } } + test("test carbon writer of bucket table") { + sql(s"DROP TABLE IF EXISTS $tableName").collect() + sql(s"DROP TABLE IF EXISTS $bucketTableName").collect() + sql( + s""" + | CREATE TABLE $tableName (stringField string, intField int, shortField short) + | STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField') + """.stripMargin + ).collect() + sql( + s""" + | CREATE TABLE $bucketTableName (stringField string, intField int, shortField short) + | STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField') + """.stripMargin + ).collect() + + val rootPath = System.getProperty("user.dir") + "/target/test-classes" + + val dataTempPath = rootPath + "/data/temp/" + + try { + val flinkTablePath = storeLocation + "/" + tableName + "/" + + val writerProperties = newWriterProperties(dataTempPath, storeLocation) + val carbonProperties = newCarbonProperties(storeLocation) + + writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100") + + val environment = StreamExecutionEnvironment.getExecutionEnvironment + environment.setParallelism(1) + environment.setRestartStrategy(RestartStrategies.noRestart) + + val dataCount = 1000 + val source = new TestSource(dataCount) { + @throws[InterruptedException] + override def get(index: Int): Array[AnyRef] = { + val data = new Array[AnyRef](3) + data(0) = "test" + index + data(1) = index.asInstanceOf[AnyRef] + data(2) = 12345.asInstanceOf[AnyRef] + data + } + + @throws[InterruptedException] + override def onFinish(): Unit = { + Thread.sleep(5000L) + } + } + val stream = environment.addSource(source) + val factory = CarbonWriterFactory.builder("Local").build( + "default", + tableName, + flinkTablePath, + new Properties, + writerProperties, + carbonProperties + ) + val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build + + stream.addSink(streamSink) + + try environment.execute + catch { + case exception: Exception => + throw new UnsupportedOperationException(exception) + } + sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')") + val table = CarbonEnv.getCarbonTable(Option("default"), s"$tableName")(sqlContext.sparkSession) + val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0") + val dataFiles = segmentDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata") + }) + assert(dataFiles.length == 10) + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500))) + sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')") + val segmentDir2 = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_1") + val dataFiles2 = segmentDir2.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata") + }) + assert(dataFiles2.length == 10) + checkAnswer(sql(s"SELECT count(*) FROM $tableName where stringField != 'AAA'"), Seq(Row(1000))) + sql(s"insert into $bucketTableName select * from $tableName").collect() + + val plan = sql( + s""" + |select t1.*, t2.* + |from $tableName t1, $bucketTableName t2 + |where t1.stringField = t2.stringField + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + + checkAnswer(sql( + s"""select count(*) from + |(select t1.*, t2.* + |from $tableName t1, $bucketTableName t2 + |where t1.stringField = t2.stringField) temp + """.stripMargin), Row(1000)) + } finally { + sql(s"DROP TABLE IF EXISTS $tableName").collect() + sql(s"DROP TABLE IF EXISTS $bucketTableName").collect() + } + } + private def newWriterProperties( dataTempPath: String, storeLocation: String) = { diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala deleted file mode 100644 index e42b008..0000000 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.cluster.sdv.generated - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.spark.sql.common.util._ -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.scalatest.BeforeAndAfterAll - -class BucketingTestCase extends QueryTest with BeforeAndAfterAll { - - var threshold: Int = _ - var timeformat = CarbonProperties.getInstance() - .getProperty("carbon.timestamp.format", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) - - override def beforeAll { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") - threshold = sqlContext.getConf("spark.sql.autoBroadcastJoinThreshold").toInt - sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "-1") - sql("DROP TABLE IF EXISTS bucket_table") - } - - test("test exception if bucketcolumns be measure column") { - intercept[Exception] { - sql("DROP TABLE IF EXISTS bucket_table") - sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + - "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='ID')") - } - } - - test("test exception if bucketcolumns be complex data type column") { - intercept[Exception] { - sql("DROP TABLE IF EXISTS bucket_table") - sql("CREATE TABLE bucket_table (Id int, number double, name string, " + - "gamePoint array<double>, mac struct<num:double>) STORED AS carbondata TBLPROPERTIES" + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='gamePoint')") - } - } - - test("test multi columns as bucketcolumns") { - sql("DROP TABLE IF EXISTS bucket_table") - sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + - "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name,phonetype')") - sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") - val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_bucket_table") - if (table != null && table.getBucketingInfo != null) { - assert(true) - } else { - assert(false, "Bucketing info does not exist") - } - } - - test("test multi columns as bucketcolumns with bucket join") { - sql("DROP TABLE IF EXISTS bucket_table") - sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + - "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='country,name')") - sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") - - val plan = sql( - """ - |select t1.*, t2.* - |from bucket_table t1, bucket_table t2 - |where t1.country = t2.country and t1.name = t2.name - """.stripMargin).queryExecution.executedPlan - var shuffleExists = false - plan.collect { - case s: ShuffleExchangeExec => shuffleExists = true - } - assert(!shuffleExists, "shuffle should not exist on bucket column join") - } - - test("test non bucket column join") { - sql("DROP TABLE IF EXISTS bucket_table") - sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + - "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='country')") - sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") - - val plan = sql( - """ - |select t1.*, t2.* - |from bucket_table t1, bucket_table t2 - |where t1.name = t2.name - """.stripMargin).queryExecution.executedPlan - var shuffleExists = false - - plan.collect { - case s: ShuffleExchangeExec => shuffleExists = true - } - assert(shuffleExists, "shuffle should exist on non-bucket column join") - } - - test("test bucketcolumns through multi data loading plus compaction") { - sql("DROP TABLE IF EXISTS bucket_table") - sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + - "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") - val numOfLoad = 10 - for (j <- 0 until numOfLoad) { - sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") - } - sql("ALTER TABLE bucket_table COMPACT 'MAJOR'") - - val plan = sql( - """ - |select t1.*, t2.* - |from bucket_table t1, bucket_table t2 - |where t1.name = t2.name - """.stripMargin).queryExecution.executedPlan - var shuffleExists = false - plan.collect { - case s: ShuffleExchangeExec => shuffleExists = true - } - assert(!shuffleExists, "shuffle should not exist on bucket tables") - } - - test("drop non-bucket column, test bucket column join") { - sql("DROP TABLE IF EXISTS bucket_table") - sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, name String, phonetype String," + - "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") - sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE bucket_table") - - sql("ALTER TABLE bucket_table DROP COLUMNS (ID,country)") - - val plan = sql( - """ - |select t1.*, t2.* - |from bucket_table t1, bucket_table t2 - |where t1.name = t2.name - """.stripMargin).queryExecution.executedPlan - var shuffleExists = false - plan.collect { - case s: ShuffleExchangeExec => shuffleExists = true - } - assert(!shuffleExists, "shuffle should not exist on bucket tables") - } - - override def afterAll { - sql("DROP TABLE IF EXISTS bucket_table") - sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", threshold.toString) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeformat) - } -} diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index 5a41b50..06b2130 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -58,12 +58,12 @@ class CarbonOption(options: Map[String, String]) { lazy val tablePageSizeInMb: Option[String] = options.get("table_page_size_inmb") - lazy val bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt + lazy val bucketNumber: Int = options.getOrElse("bucket_number", "0").toInt - lazy val bucketColumns: String = options.getOrElse("bucketcolumns", "") + lazy val bucketColumns: String = options.getOrElse("bucket_columns", "") - lazy val isBucketingEnabled: Boolean = options.contains("bucketcolumns") && - options.contains("bucketnumber") + lazy val isBucketingEnabled: Boolean = options.contains("bucket_columns") && + options.contains("bucket_number") lazy val isStreaming: Boolean = { var stream = options.getOrElse("streaming", "false") diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index adb347e..a7603e2 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -990,7 +990,7 @@ object CarbonDataRDDFactory { * @param carbonLoadModel load model * @return Return an array that contains all of the elements in NewDataFrameLoaderRDD. */ - private def loadDataFrame( + def loadDataFrame( sqlContext: SQLContext, dataFrame: Option[DataFrame], scanResultRDD: Option[RDD[InternalRow]], diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index ccff4c1..4278ade 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -48,6 +48,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.blocklet.DataFileFooter import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} +import org.apache.carbondata.core.metadata.schema.BucketingInfo import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema} import org.apache.carbondata.core.mutate.UpdateVO @@ -91,6 +92,7 @@ class CarbonMergerRDD[K, V]( var singleRange = false var expressionMapForRangeCol: util.Map[Integer, Expression] = null var broadCastSplits: Broadcast[CarbonInputSplitWrapper] = null + val bucketInfo = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getBucketingInfo def makeBroadCast(splits: util.List[CarbonInputSplit]): Unit = { broadCastSplits = sparkContext.broadcast(new CarbonInputSplitWrapper(splits)) @@ -108,6 +110,12 @@ class CarbonMergerRDD[K, V]( } else { null } + val bucketId: Int = if (bucketInfo != null) { + carbonSparkPartition.idx + } else { + 0 + } + carbonLoadModel.setBucketId(bucketId); var mergeStatus = false var mergeNumber = "" @@ -584,36 +592,40 @@ class CarbonMergerRDD[K, V]( logInfo("no.of.nodes where data present=" + nodeBlockMap.size()) defaultParallelism = sparkContext.defaultParallelism - // Create Spark Partition for each task and assign blocks - nodeBlockMap.asScala.foreach { case (nodeName, splitList) => - val taskSplitList = new java.util.ArrayList[NodeInfo](0) - nodeTaskBlocksMap.put(nodeName, taskSplitList) - var blockletCount = 0 - splitList.asScala.foreach { splitInfo => - val splitsPerNode = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo] - blockletCount = blockletCount + splitsPerNode.getCarbonInputSplitList.size() - taskSplitList.add( - NodeInfo(splitsPerNode.getTaskId, splitsPerNode.getCarbonInputSplitList.size())) - - if (blockletCount != 0) { - val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo] - val multiBlockSplit = if (null == rangeColumn || singleRange) { - new CarbonMultiBlockSplit( - taskInfo.getCarbonInputSplitList, - Array(nodeName)) - } else { - var splitListForRange = new util.ArrayList[CarbonInputSplit]() - new CarbonMultiBlockSplit( - splitListForRange, - Array(nodeName)) + if (bucketInfo != null) { + distributeBucketPartitions(result, splits, bucketInfo) + } else { + // Create Spark Partition for each task and assign blocks + nodeBlockMap.asScala.foreach { case (nodeName, splitList) => + val taskSplitList = new java.util.ArrayList[NodeInfo](0) + nodeTaskBlocksMap.put(nodeName, taskSplitList) + var blockletCount = 0 + splitList.asScala.foreach { splitInfo => + val splitsPerNode = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo] + blockletCount = blockletCount + splitsPerNode.getCarbonInputSplitList.size() + taskSplitList.add( + NodeInfo(splitsPerNode.getTaskId, splitsPerNode.getCarbonInputSplitList.size())) + + if (blockletCount != 0) { + val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo] + val multiBlockSplit = if (null == rangeColumn || singleRange) { + new CarbonMultiBlockSplit( + taskInfo.getCarbonInputSplitList, + Array(nodeName)) + } else { + var splitListForRange = new util.ArrayList[CarbonInputSplit]() + new CarbonMultiBlockSplit( + splitListForRange, + Array(nodeName)) + } + result.add( + new CarbonSparkPartition( + id, + taskPartitionNo, + multiBlockSplit, + getPartitionNamesFromTask(taskInfo.getTaskId, partitionTaskMap))) + taskPartitionNo += 1 } - result.add( - new CarbonSparkPartition( - id, - taskPartitionNo, - multiBlockSplit, - getPartitionNamesFromTask(taskInfo.getTaskId, partitionTaskMap))) - taskPartitionNo += 1 } } } @@ -642,6 +654,24 @@ class CarbonMergerRDD[K, V]( result.toArray(new Array[Partition](result.size)) } + private def distributeBucketPartitions(result: util.ArrayList[Partition], + splits: util.List[InputSplit], bucketInfo: BucketingInfo): Unit = { + // distribute the files based on bucket id + var i = 0 + val bucketed = + splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => f.getBucketId) + (0 until bucketInfo.getNumOfRanges).map { bucketId => + val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil) + val multiBlockSplit = + new CarbonMultiBlockSplit( + bucketPartitions.asJava, + bucketPartitions.flatMap(_.getLocations).toArray) + val partition = new CarbonSparkPartition(id, i, multiBlockSplit) + i += 1 + result.add(partition) + } + } + private def getRangesFromRDD(rangeColumn: CarbonColumn, carbonTable: CarbonTable, defaultParallelism: Int, diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index ef5c9b1..ab97624 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -96,8 +96,6 @@ class CarbonScanRDD[T: ClassTag]( private var directFill = false - private val bucketedTable = tableInfo.getFactTable.getBucketingInfo - private var segmentsToAccess: Array[Segment] = _ @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -262,11 +260,12 @@ class CarbonScanRDD[T: ClassTag]( CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) } // If bucketing is enabled on table then partitions should be grouped based on buckets. - if (bucketedTable != null) { + val bucketInfo = tableInfo.getFactTable.getBucketingInfo + if (bucketInfo != null) { var i = 0 val bucketed = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => f.getBucketId) - (0 until bucketedTable.getNumOfRanges).map { bucketId => + (0 until bucketInfo.getNumOfRanges).map { bucketId => val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil) val multiBlockSplit = new CarbonMultiBlockSplit( diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 8ca96ab..4e8a00f 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.language.implicitConversions import org.apache.commons.lang.StringUtils -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.hive.CarbonMetaStore import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil @@ -364,13 +364,24 @@ object CarbonSource { table: CatalogTable, metaStore: CarbonMetaStore ): (TableInfo, CatalogTable) = { - val properties = CarbonSparkSqlParserUtil.getProperties(table) + val updatedProperties = new java.util.HashMap[String, String]() + CarbonSparkSqlParserUtil + .normalizeProperties(table.properties) + .foreach(e => updatedProperties.put(e._1, e._2)) + if (table.bucketSpec.isDefined) { + val bucketSpec = table.bucketSpec.get + updatedProperties.put("bucket_columns", bucketSpec.bucketColumnNames.mkString) + updatedProperties.put("bucket_number", bucketSpec.numBuckets.toString) + } + val updateTable: CatalogTable = table.copy( + properties = updatedProperties.asScala.toMap, bucketSpec = None) + val properties = CarbonSparkSqlParserUtil.getProperties(updateTable) if (isCreatedByCarbonExtension(properties)) { // Table is created by SparkSession with CarbonExtension, // There is no TableInfo yet, so create it from CatalogTable - val tableInfo = createTableInfo(sparkSession, table) + val tableInfo = createTableInfo(sparkSession, updateTable) val catalogTable = createCatalogTableForCarbonExtension( - table, tableInfo, properties, metaStore) + updateTable, tableInfo, properties, metaStore) (tableInfo, catalogTable) } else { // Legacy code path (table is created by CarbonSession) @@ -378,7 +389,8 @@ object CarbonSource { val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava) val isTransactionalTable = properties.getOrElse("isTransactional", "true").contains("true") tableInfo.setTransactionalTable(isTransactionalTable) - val catalogTable = createCatalogTableForCarbonSession(table, tableInfo, properties, metaStore) + val catalogTable = + createCatalogTableForCarbonSession(updateTable, tableInfo, properties, metaStore) (tableInfo, catalogTable) } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 5ee10e6..63044ab 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -806,16 +806,14 @@ class TableNewProcessor(cm: TableModel) { val col = allColumns.find(_.getColumnName.equalsIgnoreCase(b)) col match { case Some(colSchema: ColumnSchema) => - if (colSchema.isDimensionColumn && !colSchema.getDataType.isComplexType) { + if (!colSchema.getDataType.isComplexType && + !DataTypes.isDecimal(colSchema.getDataType)) { colSchema } else { - LOGGER.error(s"Bucket field must be dimension column and " + - s"should not be measure or complex column: ${ colSchema.getColumnName }") - CarbonException.analysisException(s"Bucket field must be dimension column and " + - s"should not be measure or complex column: ${ - colSchema - .getColumnName - }") + LOGGER.error(s"Bucket field should not be complex column or decimal" + + s"data type: ${colSchema.getColumnName}") + CarbonException.analysisException(s"Bucket field should not be complex column or" + + s" decimal data type: ${colSchema.getColumnName}") } case _ => LOGGER.error(s"Bucket field is not present in table columns") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala index 64f802f..d63ec24 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala @@ -47,6 +47,7 @@ import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory /** * Collect stage input files and trigger a loading into carbon table. @@ -269,13 +270,17 @@ case class CarbonInsertFromStageCommand( s"${table.getDatabaseName}.${table.getTableName}") val start = System.currentTimeMillis() val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits) - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( - spark, - Option(dataFrame), - loadModel, - SparkSQLUtil.sessionState(spark).newHadoopConf() - ).map { row => - (row._1, FailureCauses.NONE == row._2._2.failureCauses) + if (table.getBucketingInfo == null) { + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( + spark, + Option(dataFrame), + loadModel, + SparkSQLUtil.sessionState(spark).newHadoopConf() + ).map { row => + (row._1, FailureCauses.NONE == row._2._2.failureCauses) + } + } else { + CarbonDataRDDFactory.loadDataFrame(spark.sqlContext, Option(dataFrame), None, loadModel) } LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala index 190b776..299dfba 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala @@ -71,6 +71,20 @@ abstract class CarbonAlterTableColumnRenameCommand(oldColumnName: String, newCol } } + // if column rename operation is on bucket column, then fail the rename operation + if (null != carbonTable.getBucketingInfo) { + val bucketColumns = carbonTable.getBucketingInfo.getListOfColumns + bucketColumns.asScala.foreach { + col => + if (col.getColumnName.equalsIgnoreCase(oldColumnName)) { + throw new MalformedCarbonCommandException( + s"Column Rename Operation failed. Renaming " + + s"the bucket column $oldColumnName is not " + + s"allowed") + } + } + } + } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index 254e766..de0b0d3 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -88,6 +88,19 @@ private[sql] case class CarbonAlterTableDropColumnCommand( "partition columns") } } + val bucketInfo = carbonTable.getBucketingInfo + if (bucketInfo != null) { + val bucketColumnSchemaList = bucketInfo.getListOfColumns.asScala + .map(_.getColumnName) + // check each column existence in the table + val bucketColumns = alterTableDropColumnModel.columns.filter { + tableColumn => bucketColumnSchemaList.contains(tableColumn) + } + if (bucketColumns.nonEmpty) { + throwMetadataException(dbName, tableName, "Bucket columns cannot be dropped: " + + s"$bucketColumns") + } + } var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column .ColumnSchema]() diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala index ce240c7..85b6f72 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala @@ -242,6 +242,21 @@ private[sql] case class CarbonDescribeFormattedCommand( } ////////////////////////////////////////////////////////////////////////////// + // Bucket Information + ////////////////////////////////////////////////////////////////////////////// + val bucketInfo = carbonTable.getBucketingInfo() + if (bucketInfo != null) { + results ++= Seq( + ("", "", ""), + ("## Bucket Information", "", ""), + ("Bucket Columns", + bucketInfo.getListOfColumns.asScala.map { + col => s"${col.getColumnName}:${col.getDataType.getName}"}.mkString(", "), ""), + ("Number of Buckets", bucketInfo.getNumOfRanges.toString, "") + ) + } + + ////////////////////////////////////////////////////////////////////////////// // Dynamic Information ////////////////////////////////////////////////////////////////////////////// diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index ae7ab11..e65c65d 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -633,6 +633,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } } if (bucketColumns.size == cols.size) { + // use HashPartitioning will not shuffle HashPartitioning(bucketColumns, numBuckets) } else { UnknownPartitioning(0) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala index 2d128a4..5bfd68a 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala @@ -33,9 +33,7 @@ import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterT import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand} import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand} -import org.apache.spark.sql.hive.CarbonMVRules import org.apache.spark.sql.types.StructField -import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.exceptions.DeprecatedFeatureException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException @@ -79,6 +77,24 @@ object CarbonSparkSqlParserUtil { if (streaming != null && streaming.equalsIgnoreCase("true") && tablePath.startsWith("s3")) { throw new UnsupportedOperationException("streaming is not supported with s3 store") } + // bucket table should not set sort scope because the data can only sort inside bucket itself + val bucketColumnsProp = tableInfo.getFactTable.getTableProperties.get("bucket_columns") + val sortScopeProp = tableInfo.getFactTable.getTableProperties.get("sort_scope") + if (bucketColumnsProp != null) { + if (sortScopeProp != null) { + // bucket table can only sort inside bucket, can not set sort_scope for table. + throw new ProcessMetaDataException(tableInfo.getDatabaseName, + tableInfo.getFactTable.getTableName, "Bucket table only sort inside buckets," + + " can not set sort scope but can set sort columns."); + } else { + tableInfo.getFactTable.getListOfColumns.asScala.foreach(column => + if (column.getDataType == DataTypes.BINARY) { + throw new ProcessMetaDataException(tableInfo.getDatabaseName, + tableInfo.getFactTable.getTableName, "bucket table do not support binary."); + } + ) + } + } // Add validation for sort scope when create table val sortScope = tableInfo.getFactTable.getTableProperties.asScala .getOrElse("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) @@ -311,7 +327,7 @@ object CarbonSparkSqlParserUtil { sparkSession: SparkSession, selectQuery: Option[LogicalPlan] = None): TableInfo = { val tableProperties = normalizeProperties(getProperties(table)) - val options = new CarbonOption(tableProperties) + val options = new CarbonOption(Map(tableProperties.toSeq: _*)) // validate streaming property validateStreamingProperty(options) val parser = new CarbonSpark2SqlParser() diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala index f80316d..4cd2b66 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala @@ -17,13 +17,16 @@ package org.apache.carbondata.integration.spark.testsuite.binary import java.util.Arrays + import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.exception.ProcessMetaDataException import org.apache.commons.codec.binary.{Base64, Hex} import org.apache.spark.SparkException +import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.util.SparkUtil @@ -398,34 +401,26 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("SELECT COUNT(*) FROM binaryTable where binaryField =cast('hello' as binary)"), Seq(Row(1))) } - test("Test create table with buckets unsafe") { - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true") + test("Test create table with binary datatype in bucket table") { sql("DROP TABLE IF EXISTS binaryTable") + sql("DROP TABLE IF EXISTS binaryTable2") + val exception = intercept[ProcessMetaDataException] { sql( - s""" - | CREATE TABLE IF NOT EXISTS binaryTable ( - | id INT, - | label boolean, - | name STRING, - | binaryField BINARY, - | autoLabel boolean) - | STORED AS carbondata - | TBLPROPERTIES('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='binaryField') - """.stripMargin) - sql( - s""" - | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' - | INTO TABLE binaryTable - | OPTIONS('header'='false') - """.stripMargin) - - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false") - val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "binaryTable") - if (table != null && table.getBucketingInfo() != null) { - assert(true) - } else { - assert(false, "Bucketing info does not exist") + s""" + | CREATE TABLE IF NOT EXISTS binaryTable ( + | id INT, + | label boolean, + | name STRING, + | binaryField BINARY, + | autoLabel boolean) + | STORED AS carbondata + | TBLPROPERTIES('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='binaryField') + """.stripMargin) } + assert(exception.getMessage.contains("bucket table do not support binary")) + + sql("DROP TABLE IF EXISTS binaryTable") + sql("DROP TABLE IF EXISTS binaryTable2") } test("insert into for hive and carbon") { diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala index 7285101..27abdc6 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala @@ -233,10 +233,10 @@ class BadRecordActionTest extends QueryTest { " the detail reason")) } - test("test bad record with IGNORE option and sort scope as NO_SORT for bucketed table") { + test("test bad record with IGNORE option for bucketed table") { sql("drop table if exists sales_bucket") sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, country String," + - "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata TBLPROPERTIES ('BUCKETNUMBER'='2', 'BUCKETCOLUMNS'='country','sort_scope'='NO_SORT')") + "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='2', 'BUCKET_COLUMNS'='country')") sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket OPTIONS" + "('bad_records_action'='IGNORE', 'DELIMITER'=" + " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')") @@ -244,10 +244,10 @@ class BadRecordActionTest extends QueryTest { Seq(Row(2))) } - test("test bad record with REDIRECT option and sort scope as NO_SORT for bucketed table") { + test("test bad record with REDIRECT option for bucketed table") { sql("drop table if exists sales_bucket") sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, country String," + - "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata TBLPROPERTIES ('BUCKETNUMBER'='2', 'BUCKETCOLUMNS'='country', 'sort_scope'='NO_SORT')") + "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='2', 'BUCKET_COLUMNS'='country')") sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket OPTIONS" + "('bad_records_action'='REDIRECT', 'DELIMITER'=" + " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } + diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala index a0d4d58..e830b0f 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala @@ -169,7 +169,7 @@ class TestCreateTableLike extends QueryTest with BeforeAndAfterEach with BeforeA | CREATE TABLE IF NOT EXISTS bkt_tbl ( | a int, b string | ) STORED AS carbondata - | TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='b') + | TBLPROPERTIES ('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='b') | """.stripMargin) sql("create table targetTable like bkt_tbl") diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala index cbb8109..508d8eb 100644 --- a/integration/spark/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala +++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala @@ -227,24 +227,17 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS create_source_test2") } - test("test to create bucket columns with int field") { - sql("drop table if exists create_source") - intercept[Exception] { - sql("create table create_source(intField int, stringField string, complexField array<string>) USING carbondata OPTIONS('bucketnumber'='1', 'bucketcolumns'='intField')") - } - } - test("test to create bucket columns with complex data type field") { sql("drop table if exists create_source") intercept[Exception] { - sql("create table create_source(intField int, stringField string, complexField array<string>) USING carbondata OPTIONS('bucketnumber'='1', 'bucketcolumns'='complexField')") + sql("create table create_source(intField int, stringField string, complexField array<string>) USING carbondata OPTIONS('bucket_number'='1', 'bucket_columns'='complexField')") } } test("test check results of table with complex data type and bucketing") { sql("drop table if exists create_source") sql("create table create_source(intField int, stringField string, complexField array<int>) " + - "USING carbondata OPTIONS('bucketnumber'='1', 'bucketcolumns'='stringField')") + "USING carbondata OPTIONS('bucket_number'='1', 'BUCKET_COLUMNS'='stringField')") sql("insert into create_source values(1,'source',array(1,2,3))") checkAnswer(sql("select * from create_source"), Row(1,"source", mutable.WrappedArray.newBuilder[Int].+=(1,2,3))) sql("drop table if exists create_source") diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala index 61c48d4..f8c1beb 100644 --- a/integration/spark/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala +++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala @@ -192,8 +192,8 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL") sql("create table data_tbm(name String, dob long, weight int) " + - "STORED AS carbondata tblproperties('bucketnumber'='4', " + - "'bucketcolumns'='name', 'tableName'='data_tbm')") + "STORED AS carbondata tblproperties('bucket_number'='4', " + + "'BUCKET_COLUMNS'='name', 'tableName'='data_tbm')") val testData = s"$resourcesPath/badrecords/dummy.csv" sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_tbm""") } catch { diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala index 9b106ba..1ab3b38 100644 --- a/integration/spark/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala @@ -17,16 +17,21 @@ package org.apache.spark.carbondata.bucketing -import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.{CarbonEnv, Row} +import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.exchange.Exchange +import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll - import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.exception.ProcessMetaDataException class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { @@ -41,19 +46,170 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS t4") sql("DROP TABLE IF EXISTS t5") sql("DROP TABLE IF EXISTS t6") + sql("DROP TABLE IF EXISTS t6_") sql("DROP TABLE IF EXISTS t7") sql("DROP TABLE IF EXISTS t8") sql("DROP TABLE IF EXISTS t9") sql("DROP TABLE IF EXISTS t10") sql("DROP TABLE IF EXISTS t11") + sql("DROP TABLE IF EXISTS t12") + sql("DROP TABLE IF EXISTS t13") + sql("DROP TABLE IF EXISTS t14") + sql("DROP TABLE IF EXISTS t15") + sql("DROP TABLE IF EXISTS t16") + sql("DROP TABLE IF EXISTS t17") + sql("DROP TABLE IF EXISTS t18") + sql("DROP TABLE IF EXISTS t19") + sql("DROP TABLE IF EXISTS t20") + sql("DROP TABLE IF EXISTS t21") + sql("DROP TABLE IF EXISTS t22") + sql("DROP TABLE IF EXISTS t23") + sql("DROP TABLE IF EXISTS t24") + sql("DROP TABLE IF EXISTS t25") + sql("DROP TABLE IF EXISTS t26") + sql("DROP TABLE IF EXISTS t27") + sql("DROP TABLE IF EXISTS t28") + sql("DROP TABLE IF EXISTS t40") + sql("DROP TABLE IF EXISTS t41") + sql("DROP TABLE IF EXISTS t42") + sql("DROP TABLE IF EXISTS t43") + sql("DROP TABLE IF EXISTS t44") + sql("DROP TABLE IF EXISTS t45") + sql("DROP TABLE IF EXISTS t46") + sql("DROP TABLE IF EXISTS t47") + sql("DROP TABLE IF EXISTS t48") + sql("DROP TABLE IF EXISTS t49") + sql("DROP TABLE IF EXISTS t50") + sql("DROP TABLE IF EXISTS bucketed_parquet_table") + sql("DROP TABLE IF EXISTS parquet_table") } - test("test create table with buckets") { + test("test create table with buckets using table properties and loaded data will" + + " store into different files") { sql("CREATE TABLE t4 (ID Int, date Timestamp, country String, name String, phonetype String," + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t4") val table = CarbonEnv.getCarbonTable(Option("default"), "t4")(sqlContext.sparkSession) + val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0") + val dataFiles = segmentDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata") + }) + assert(dataFiles.length == 4) + checkAnswer(sql("select count(*) from t4"), Row(100)) + checkAnswer(sql("select count(*) from t4 where name='aaa99'"), Row(1)) + if (table != null && table.getBucketingInfo() != null) { + assert(true) + } else { + assert(false, "Bucketing info does not exist") + } + } + + test("test IUD of bucket table") { + sql("CREATE TABLE t40 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t40") + sql("CREATE TABLE t41 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t41") + + // insert + sql(s"insert into t40 select 101,'2015/10/16','china','aaa101','phone2569','ASD16163',15100") + checkAnswer(sql( + """select count(*) from t40 + """.stripMargin), Row(101)) + // update + sql(s"update t40 set (name) = ('aaa100') where name='aaa101'") + checkAnswer(sql( + """select count(*) from t40 + """.stripMargin), Row(101)) + checkAnswer(sql( + """select count(*) from t40 where name='aaa100' + """.stripMargin), Row(2)) + // delete + sql(s"delete from t40 where name='aaa100'") + checkAnswer(sql( + """select count(*) from t40 + """.stripMargin), Row(99)) + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t40 t1, t41 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(99)) + // insert again + sql(s"insert into t40 select 1011,'2015/10/16','china','aaa1011','phone2569','ASD16163',15100") + sql(s"insert into t40 select 1012,'2015/10/16','china','aaa1012','phone2569','ASD16163',15100") + sql(s"insert into t40 select 1013,'2015/10/16','china','aaa1013','phone2569','ASD16163',15100") + sql(s"insert into t40 select 1014,'2015/10/16','china','aaa1014','phone2569','ASD16163',15100") + checkAnswer(sql( + """select count(*) from t40 + """.stripMargin), Row(103)) + + // join after IUD + val plan = sql( + """ + |select t1.*, t2.* + |from t40 t1, t41 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t40 t1, t41 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(99)) + + // insert into t41 + sql(s"insert into t41 select 1014,'2015/10/16','china','aaa1014','phone2569','ASD16163',15100") + + // join after 2 tables both IUD + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t40 t1, t41 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) + val plan2 = sql( + """ + |select t1.*, t2.* + |from t40 t1, t41 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists2 = false + plan2.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists2 = true + } + assert(!shuffleExists2, "shuffle should not exist on bucket tables") + } + + test("test create carbon table with buckets like hive sql") { + sql("CREATE TABLE t13 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) INTO 4 BUCKETS") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t13") + val table = CarbonEnv.getCarbonTable(Option("default"), "t13")(sqlContext.sparkSession) + val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0") + val dataFiles = segmentDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata") + }) + assert(dataFiles.length == 4) + checkAnswer(sql("select count(*) from t13"), Row(100)) + checkAnswer(sql("select count(*) from t13 where name='aaa99'"), Row(1)) if (table != null && table.getBucketingInfo() != null) { assert(true) } else { @@ -65,7 +221,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true") sql("CREATE TABLE t10 (ID Int, date Timestamp, country String, name String, phonetype String," + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t10") CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false") val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "t10") @@ -84,7 +240,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Int) USING carbondata - OPTIONS("bucketnumber"="-1", "bucketcolumns"="name") + OPTIONS("bucket_number"="-1", "bucket_columns"="name") """) assert(false) } @@ -106,7 +262,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { | serialname String, | salary Int) | STORED AS carbondata - | TBLPROPERTIES('bucketnumber'='0', 'bucketcolumns'='name') + | TBLPROPERTIES('bucket_number'='0', 'bucket_columns'='name') """.stripMargin ) assert(false) @@ -116,7 +272,17 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { } } - test("test create table with no bucket join of carbon tables") { + test("Bucket table only sort inside buckets, can not set sort scope but can set sort columns.") { + val ex = intercept[ProcessMetaDataException] { + sql("CREATE TABLE t44 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name', 'sort_columns'='name', 'sort_scope'='global_sort')") + } + assert(ex.getMessage.contains("Bucket table only sort inside buckets," + + " can not set sort scope but can set sort columns.")) + } + + test("test create table with both no bucket join of carbon tables") { sql("CREATE TABLE t5 (ID Int, date Timestamp, country String, name String, phonetype String," + "serialname String, salary Int) STORED AS carbondata") sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t5") @@ -126,6 +292,12 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { |from t5 t1, t5 t2 |where t1.name = t2.name """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t5 t1, t5 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) var shuffleExists = false plan.collect { case s: Exchange if (s.getClass.getName.equals @@ -137,17 +309,94 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { assert(shuffleExists, "shuffle should exist on non bucket tables") } - test("test create table with bucket join of carbon tables") { + test("test join of carbon bucket table and non bucket parquet table") { + sql("CREATE TABLE t8 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t8") + + sql("DROP TABLE IF EXISTS parquet_table") + sql("select * from t8").write + .format("parquet") + .saveAsTable("parquet_table") + + val plan = sql( + """ + |select t1.*, t2.* + |from t8 t1, parquet_table t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t8 t1, parquet_table t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(shuffleExists, "shuffle should exist on non bucket tables") + sql("DROP TABLE parquet_table") + } + + test("test no shuffle when using bucket tables") { + sql("CREATE TABLE t12 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) INTO 4 BUCKETS") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t12") + + sql("DROP TABLE IF EXISTS bucketed_parquet_table") + sql("select * from t12").write + .format("parquet") + .bucketBy(4, "name") + .saveAsTable("bucketed_parquet_table") + + checkAnswer(sql("select count(*) from t12"), Row(100)) + checkAnswer(sql("select count(*) from bucketed_parquet_table"), Row(100)) + + val plan = sql( + """ + |select t1.*, t2.* + |from t12 t1, bucketed_parquet_table t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + sql("DROP TABLE bucketed_parquet_table") + } + + test("test join of carbon bucket tables") { sql("CREATE TABLE t6 (ID Int, date Timestamp, country String, name String, phonetype String," + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t6") + sql("CREATE TABLE t6_ (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t6_") val plan = sql( """ |select t1.*, t2.* - |from t6 t1, t6 t2 + |from t6 t1, t6_ t2 |where t1.name = t2.name """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t6 t1, t6_ t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) var shuffleExists = false plan.collect { case s: Exchange if (s.getClass.getName.equals @@ -159,24 +408,31 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { assert(!shuffleExists, "shuffle should not exist on bucket tables") } - test("test create table with bucket join of carbon table and parquet table") { + test("test join of carbon bucket table and parquet bucket table") { sql("CREATE TABLE t7 (ID Int, date Timestamp, country String, name String, phonetype String," + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") + "('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='name')") sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t7") sql("DROP TABLE IF EXISTS bucketed_parquet_table") sql("select * from t7").write .format("parquet") - .bucketBy(4, "name") + .bucketBy(9, "name") .saveAsTable("bucketed_parquet_table") - + // carbon join parquet, both bucket tables. val plan = sql( """ |select t1.*, t2.* |from t7 t1, bucketed_parquet_table t2 |where t1.name = t2.name """.stripMargin).queryExecution.executedPlan + // parquet join parquet, both bucket tables. + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t7 t1, bucketed_parquet_table t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) var shuffleExists = false plan.collect { case s: Exchange if (s.getClass.getName.equals @@ -189,23 +445,256 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE bucketed_parquet_table") } - test("test create table with bucket join of carbon table and non bucket parquet table") { - sql("CREATE TABLE t8 (ID Int, date Timestamp, country String, name String, phonetype String," + - "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + - "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')") - sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t8") + test("test join of carbon bucket tables using hive sql") { + sql("CREATE TABLE t14 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) INTO 4 BUCKETS") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t14") + sql("CREATE TABLE t15 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) INTO 4 BUCKETS") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t15") + val plan = sql( + """ + |select t1.*, t2.* + |from t14 t1, t15 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t14 t1, t15 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + } - sql("DROP TABLE IF EXISTS parquet_table") - sql("select * from t8").write + test("test join of diff data types as bucket column for carbon tables") { + sql("CREATE TABLE t16 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='ID')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t16") + sql("CREATE TABLE t17 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t17") + val plan = sql( + """ + |select t1.*, t2.* + |from t16 t1, t17 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t16 t1, t17 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) + } + + test("timestamp as bucket column, test join of carbon bucket tables") { + sql("CREATE TABLE t18 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='date')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t18") + sql("CREATE TABLE t19 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='date')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t19") + val plan = sql( + """ + |select t1.*, t2.* + |from t18 t1, t19 t2 + |where t1.date = t2.date + """.stripMargin).queryExecution.executedPlan + // here the time column in source.csv has some duplicate values + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t18 t1, t19 t2 + |where t1.date = t2.date) temp + """.stripMargin), Row(120)) + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + } + + test("timestamp as bucket column, test join of carbon bucket table and parquet table") { + sql("CREATE TABLE t20 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='date')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t20") + // parquet 1 + sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20") + sql("select * from t20").write .format("parquet") - .saveAsTable("parquet_table") + .bucketBy(9, "date") + .saveAsTable("bucketed_parquet_table_t20") + // parquet 2 + sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20_") + sql("select * from t20").write + .format("parquet") + .bucketBy(9, "date") + .saveAsTable("bucketed_parquet_table_t20_") + // parquet join with parquet + val plan2 = sql( + """ + |select t1.*, t2.* + |from bucketed_parquet_table_t20_ t1, bucketed_parquet_table_t20 t2 + |where t1.date = t2.date + """.stripMargin).queryExecution.executedPlan + var shuffleExists2 = false + plan2.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists2 = true + } + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from bucketed_parquet_table_t20_ t1, bucketed_parquet_table_t20 t2 + |where t1.date = t2.date) temp + """.stripMargin), Row(120)) + + // carbon join with parquet val plan = sql( """ |select t1.*, t2.* - |from t8 t1, parquet_table t2 + |from t20 t1, bucketed_parquet_table_t20 t2 + |where t1.date = t2.date + """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t20 t1, bucketed_parquet_table_t20 t2 + |where t1.date = t2.date) temp + """.stripMargin), Row(120)) + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + + assert(shuffleExists == shuffleExists2, "for no string bucket column, shuffle should " + + "keep the same behavior as parquet") + sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20") + sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20_") + + } + + test("long as bucket column, test join of carbon bucket table and parquet table") { + sql("CREATE TABLE t21 (ID long, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='ID')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t21") + // parquet 1 + sql("DROP TABLE IF EXISTS bucketed_parquet_table_t21") + sql("select * from t21").write + .format("parquet") + .bucketBy(9, "ID") + .saveAsTable("bucketed_parquet_table_t21") + + // carbon join with parquet + val plan = sql( + """ + |select t1.*, t2.* + |from t21 t1, bucketed_parquet_table_t21 t2 + |where t1.ID = t2.ID + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist in bucket table join") + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t21 t1, bucketed_parquet_table_t21 t2 + |where t1.ID = t2.ID) temp + """.stripMargin), Row(100)) + sql("DROP TABLE IF EXISTS bucketed_parquet_table_t21") + } + + test("int as bucket column, test join of carbon bucket table and parquet table") { + sql("CREATE TABLE t22 (ID int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='ID')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t22") + // parquet 1 + sql("DROP TABLE IF EXISTS bucketed_parquet_table_t22") + sql("select * from t22").write + .format("parquet") + .bucketBy(9, "ID") + .saveAsTable("bucketed_parquet_table_t22") + + // carbon join with parquet + val plan = sql( + """ + |select t1.*, t2.* + |from t22 t1, bucketed_parquet_table_t22 t2 + |where t1.ID = t2.ID + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist in bucket table join") + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t22 t1, bucketed_parquet_table_t22 t2 + |where t1.ID = t2.ID) temp + """.stripMargin), Row(100)) + sql("DROP TABLE IF EXISTS bucketed_parquet_table_t22") + } + + test("test bucket hash method config") { + sql("CREATE TABLE t23 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name', 'bucket_hash_method'='NATIVE' )") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t23") + sql("CREATE TABLE t24 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name', 'bucket_hash_method'='NATIVE')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t24") + val plan = sql( + """ + |select t1.*, t2.* + |from t23 t1, t24 t2 |where t1.name = t2.name """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t23 t1, t24 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) var shuffleExists = false plan.collect { case s: Exchange if (s.getClass.getName.equals @@ -214,24 +703,319 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) => shuffleExists = true } - assert(shuffleExists, "shuffle should exist on non bucket tables") - sql("DROP TABLE parquet_table") + assert(!shuffleExists, "shuffle should not exist on bucket tables") } - // TODO: make pluggable CarbonOptimizerUtil.transformForScalarSubQuery - ignore("test scalar subquery with equal") { + test("only shuffle 1 side whose bucket num larger when join of carbon bucket" + + " tables with diff bucket num") { + sql("CREATE TABLE t25 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='3', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t25") + sql("CREATE TABLE t26 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='7', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t26") + + val plan = sql( + """ + |select t1.*, t2.* + |from t25 t1, t26 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t25 t1, t26 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) + + var shuffleLeftExists = false + var shuffleRightExists = false + plan.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[SortMergeJoinExec].left.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleLeftExists = true + } + + plan.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[SortMergeJoinExec].right.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleRightExists = true + } + assert(shuffleLeftExists && !shuffleRightExists, "only shuffle 1 side whose bucket num larger") + } + + test("test compaction of bucket tables") { + sql("CREATE TABLE t27 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27") + sql(s"alter table t27 compact 'minor'") + + val table = CarbonEnv.getCarbonTable(Option("default"), "t27")(sqlContext.sparkSession) + // data should store into diff files bases on bucket id in compaction + val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0.1") + val dataFiles = segmentDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata") + }) + assert(dataFiles.length == 10) + + sql("CREATE TABLE t28 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t28") + val plan = sql( + """ + |select t1.*, t2.* + |from t27 t1, t28 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t27 t1, t28 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(500)) + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + } + + test("test alter column of bucket table") { + sql("CREATE TABLE t42 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t42") + + sql("CREATE TABLE t43 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t43") + + // bucket columns not allowed to change include rename change data type and drop. + val ex = intercept[MalformedCarbonCommandException] { + sql(s"alter table t42 change name name222 string") + } + assert(ex.getMessage.contains("Column Rename Operation failed." + + " Renaming the bucket column name is not allowed")) + val ex2 = intercept[ProcessMetaDataException] { + sql(s"alter table t42 drop columns(name)") + } + assert(ex2.getMessage.contains("Bucket columns cannot be dropped: List(name)")) + + // alter table column + sql(s"alter table t42 change salary slong long") + checkAnswer(sql( + """select count(*) from t42 where slong=15000 + """.stripMargin), Row(1)) + // join after alter table + val plan = sql( + """ + |select t1.*, t2.* + |from t42 t1, t43 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t42 t1, t43 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + + // test desc formatted + val descPar = sql("desc formatted t42").collect + descPar.find(_.get(0).toString.contains("Bucket Columns")) match { + case Some(row) => assert(row.get(1).toString.contains("name")) + case None => fail("Bucket Columns: not found in describe formatted") + } + descPar.find(_.get(0).toString.contains("Number of Buckets")) match { + case Some(row) => assert(row.get(1).toString.contains("10")) + case None => fail("Number of Buckets: not found in describe formatted") + } + } + + test("test insert into bucket table old insert flow") { + sql("CREATE TABLE t45 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t45") + sql("CREATE TABLE t46 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") + // use old flow + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true") + sql(s"INSERT INTO t46 SELECT * FROM t45") + + val table = CarbonEnv.getCarbonTable(Option("default"), "t46")(sqlContext.sparkSession) + val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0") + val dataFiles = segmentDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata") + }) + assert(dataFiles.length == 4) + checkAnswer(sql( + """select count(*) from t46 + """.stripMargin), Row(100)) sql( - """select sum(salary) from t4 t1 - |where ID = (select sum(ID) from t4 t2 where t1.name = t2.name)""".stripMargin) - .count() + """select * from t46 + """.stripMargin).show(100, false) + + val plan = sql( + """ + |select t1.*, t2.* + |from t45 t1, t46 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + checkAnswer(sql( + """select count(*) from t46 where name='aaa1' + """.stripMargin), Row(1)) + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t45 t1, t46 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false") } - // TODO: make pluggable CarbonOptimizerUtil.transformForScalarSubQuery - ignore("test scalar subquery with lessthan") { + test("test insert into bucket table new insert flow") { + sql("CREATE TABLE t47 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t47") + sql("CREATE TABLE t48 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')") + + // use new flow + sql(s"INSERT INTO t48 SELECT * FROM t47") + + val table = CarbonEnv.getCarbonTable(Option("default"), "t48")(sqlContext.sparkSession) + val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0") + val dataFiles = segmentDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".carbondata") + }) + assert(dataFiles.length == 4) + checkAnswer(sql( + """select count(*) from t48 + """.stripMargin), Row(100)) sql( - """select sum(salary) from t4 t1 - |where ID < (select sum(ID) from t4 t2 where t1.name = t2.name)""".stripMargin) - .count() + """select * from t48 + """.stripMargin).show(100, false) + + val plan = sql( + """ + |select t1.*, t2.* + |from t47 t1, t48 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + checkAnswer(sql( + """select count(*) from t48 where name='aaa1' + """.stripMargin), Row(1)) + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t47 t1, t48 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) + } + + test("test multi bucket columns") { + sql("CREATE TABLE t49 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name,date')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t49") + sql("CREATE TABLE t50 (ID Int, date Timestamp, country String, name String, phonetype String," + + "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " + + "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name,date')") + sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t50") + val plan = sql( + """ + |select t1.*, t2.* + |from t49 t1, t50 t2 + |where t1.name = t2.name and t1.date = t2.date + """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t49 t1, t50 t2 + |where t1.name = t2.name and t1.date = t2.date) temp + """.stripMargin), Row(100)) + var shuffleExists = false + plan.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist when all bucket columns" + + "in query filter") + + val plan2 = sql( + """ + |select t1.*, t2.* + |from t49 t1, t50 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + checkAnswer(sql( + """select count(*) from + |(select t1.*, t2.* + |from t49 t1, t50 t2 + |where t1.name = t2.name) temp + """.stripMargin), Row(100)) + var shuffleExists2 = false + plan2.collect { + case s: Exchange if (s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchange") || + s.getClass.getName.equals + ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec")) + => shuffleExists2 = true + } + assert(shuffleExists2, "shuffle should exist when some bucket columns not exist in filter") } override def afterAll { @@ -241,10 +1025,40 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS t4") sql("DROP TABLE IF EXISTS t5") sql("DROP TABLE IF EXISTS t6") + sql("DROP TABLE IF EXISTS t6_") sql("DROP TABLE IF EXISTS t7") sql("DROP TABLE IF EXISTS t8") sql("DROP TABLE IF EXISTS t9") sql("DROP TABLE IF EXISTS t10") + sql("DROP TABLE IF EXISTS t11") + sql("DROP TABLE IF EXISTS t12") + sql("DROP TABLE IF EXISTS t13") + sql("DROP TABLE IF EXISTS t14") + sql("DROP TABLE IF EXISTS t15") + sql("DROP TABLE IF EXISTS t16") + sql("DROP TABLE IF EXISTS t17") + sql("DROP TABLE IF EXISTS t18") + sql("DROP TABLE IF EXISTS t19") + sql("DROP TABLE IF EXISTS t20") + sql("DROP TABLE IF EXISTS t21") + sql("DROP TABLE IF EXISTS t22") + sql("DROP TABLE IF EXISTS t23") + sql("DROP TABLE IF EXISTS t24") + sql("DROP TABLE IF EXISTS t25") + sql("DROP TABLE IF EXISTS t26") + sql("DROP TABLE IF EXISTS t27") + sql("DROP TABLE IF EXISTS t28") + sql("DROP TABLE IF EXISTS t40") + sql("DROP TABLE IF EXISTS t41") + sql("DROP TABLE IF EXISTS t42") + sql("DROP TABLE IF EXISTS t43") + sql("DROP TABLE IF EXISTS t44") + sql("DROP TABLE IF EXISTS t45") + sql("DROP TABLE IF EXISTS t46") + sql("DROP TABLE IF EXISTS t47") + sql("DROP TABLE IF EXISTS t48") + sql("DROP TABLE IF EXISTS t49") + sql("DROP TABLE IF EXISTS t50") sql("DROP TABLE IF EXISTS bucketed_parquet_table") sql("DROP TABLE IF EXISTS parquet_table") sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", threshold.toString) diff --git a/pom.xml b/pom.xml index 3a819b6..40eb6ed 100644 --- a/pom.xml +++ b/pom.xml @@ -148,7 +148,7 @@ <id>central</id> <!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution --> <name>Maven Repository</name> - <url>https://repo1.maven.org/maven2</url> + <url>https://maven.aliyun.com/repository/public</url> <releases> <enabled>true</enabled> </releases> @@ -480,6 +480,7 @@ <exclude>**/org.apache.carbondata.cluster.sdv.generated.*</exclude> <exclude>**/org.apache.spark.sql.test.*</exclude> <exclude>**/org.apache.carbondata.format.*</exclude> + <exclude>**/org.apache.carbondata.core.unsafe*</exclude> </excludes> <includes> <include>**/org.apache.*</include> diff --git a/processing/pom.xml b/processing/pom.xml index 331d513..6600efa 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -41,6 +41,22 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-unsafe_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + <version>3.0.3</version> + </dependency> + <dependency> <groupId>com.univocity</groupId> <artifactId>univocity-parsers</artifactId> <version>2.2.1</version> diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index 0965ee6..1af4fe3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -47,6 +47,8 @@ public class CarbonDataLoadConfiguration { private BucketingInfo bucketingInfo; + private String bucketHashMethod; + private String segmentPath; private Map<String, Object> dataLoadProperties = new HashMap<>(); @@ -364,4 +366,12 @@ public class CarbonDataLoadConfiguration { public void setIndexColumnsPresent(boolean indexColumnsPresent) { isIndexColumnsPresent = indexColumnsPresent; } + + public String getBucketHashMethod() { + return bucketHashMethod; + } + + public void setBucketHashMethod(String bucketHashMethod) { + this.bucketHashMethod = bucketHashMethod; + } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 4b44a18..8586a61 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -63,17 +63,22 @@ public final class DataLoadProcessBuilder { CarbonIterator[] inputIterators) { CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration); - if (loadModel.isLoadWithoutConverterStep()) { + if (configuration.getBucketingInfo() != null && + CarbonProperties.isBadRecordHandlingEnabledForInsert()) { + // if use old flow, both load and insert of bucket table use same. Otherwise, load of bucket + // will use buildInternalForBucketing but insert will use buildInternalWithNoConverter. + return buildInternalForBucketing(inputIterators, configuration); + } else if (loadModel.isLoadWithoutConverterStep()) { return buildInternalWithNoConverter(inputIterators, configuration, sortScope, false); } else if (loadModel.isLoadWithoutConverterWithoutReArrangeStep()) { return buildInternalWithNoConverter(inputIterators, configuration, sortScope, true); } else if (loadModel.isJsonFileLoad()) { return buildInternalWithJsonInputProcessor(inputIterators, configuration, sortScope); - } else if (!configuration.isSortTable() || sortScope.equals( - SortScopeOptions.SortScope.NO_SORT)) { - return buildInternalForNoSort(inputIterators, configuration); } else if (configuration.getBucketingInfo() != null) { return buildInternalForBucketing(inputIterators, configuration); + } else if (!configuration.isSortTable() || sortScope.equals( + SortScopeOptions.SortScope.NO_SORT)) { + return buildInternalForNoSort(inputIterators, configuration); } else { return buildInternal(inputIterators, configuration); } @@ -117,7 +122,8 @@ public final class DataLoadProcessBuilder { // Wraps with dummy processor. AbstractDataLoadProcessorStep inputProcessorStep = new InputProcessorStepWithNoConverterImpl(configuration, inputIterators, withoutReArrange); - if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) { + if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT) || + configuration.getBucketingInfo() != null) { AbstractDataLoadProcessorStep sortProcessorStep = new SortProcessorStepImpl(configuration, inputProcessorStep); // Writes the sorted data in carbondata format. @@ -250,6 +256,7 @@ public final class DataLoadProcessBuilder { configuration.setDataFields( updateDataFieldsBasedOnSortColumns(dataFields).toArray(new DataField[dataFields.size()])); configuration.setBucketingInfo(carbonTable.getBucketingInfo()); + configuration.setBucketHashMethod(carbonTable.getBucketHashMethod()); configuration.setPreFetch(loadModel.isPreFetch()); configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns()); configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns()); @@ -469,4 +476,5 @@ public final class DataLoadProcessBuilder { updatedDataFields.addAll(nonSortFields); return updatedDataFields; } + } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index 91ed153..1c65dcb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -219,6 +219,11 @@ public class CarbonLoadModel implements Serializable { */ private int scaleFactor; + /** + * bucket id + */ + private int bucketId; + private OutputFilesInfoHolder outputFilesInfoHolder; public boolean isAggLoadRequest() { @@ -353,6 +358,14 @@ public class CarbonLoadModel implements Serializable { this.loadMinSize = loadMinSize; } + public int getBucketId() { + return bucketId; + } + + public void setBucketId(int bucketId) { + this.bucketId = bucketId; + } + /** * Get copy with taskNo. * Broadcast value is shared in process, so we need to copy it to make sure the value in each diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java index 1cc1000..c9da264 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java @@ -17,6 +17,7 @@ package org.apache.carbondata.processing.loading.partition.impl; +import java.io.UnsupportedEncodingException; import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; @@ -27,7 +28,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.processing.loading.partition.Partitioner; /** - * Hash partitioner implementation + * Hash partitioner implementation, not consistent with spark. */ @InterfaceAudience.Internal public class HashPartitionerImpl implements Partitioner<CarbonRow> { @@ -102,7 +103,12 @@ public class HashPartitionerImpl implements Partitioner<CarbonRow> { @Override public int getHash(Object[] value) { - return value[index] != null ? value[index].hashCode() : 0; + try { + String valueStr = new String((byte[]) value[index], "utf-8"); + return value[index] != null ? valueStr.hashCode() : 0; + } catch (UnsupportedEncodingException e) { + return 0; + } } } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/SparkHashExpressionPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/SparkHashExpressionPartitionerImpl.java new file mode 100644 index 0000000..81501e6 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/SparkHashExpressionPartitionerImpl.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.partition.impl; + +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.processing.loading.partition.Partitioner; + +import org.apache.spark.unsafe.hash.Murmur3_x86_32; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Hash partitioner implementation spark_hash_expression which using Murmur3_x86_32 keep the + * same hash value as spark for given input. + */ +@InterfaceAudience.Internal +public class SparkHashExpressionPartitionerImpl implements Partitioner<CarbonRow> { + + private int numberOfBuckets; + + private Hash[] hashes; + + public SparkHashExpressionPartitionerImpl(List<Integer> indexes, List<ColumnSchema> columnSchemas, + int numberOfBuckets) { + this.numberOfBuckets = numberOfBuckets; + hashes = new Hash[indexes.size()]; + for (int i = 0; i < indexes.size(); i++) { + DataType dataType = columnSchemas.get(i).getDataType(); + if (dataType == DataTypes.LONG || dataType == DataTypes.DOUBLE) { + hashes[i] = new LongHash(indexes.get(i)); + } else if (dataType == DataTypes.SHORT || dataType == DataTypes.INT || + dataType == DataTypes.FLOAT || dataType == DataTypes.BOOLEAN) { + hashes[i] = new IntegralHash(indexes.get(i)); + } else if (DataTypes.isDecimal(dataType)) { + hashes[i] = new DecimalHash(indexes.get(i)); + } else if (dataType == DataTypes.TIMESTAMP) { + hashes[i] = new TimestampHash(indexes.get(i)); + } else { + hashes[i] = new StringHash(indexes.get(i)); + } + } + } + + @Override + public int getPartition(CarbonRow key) { + int hashCode = 0; + for (Hash hash : hashes) { + hashCode += hash.getHash(key.getData()); + } + int reminder = hashCode % numberOfBuckets; + if (reminder < 0) { + return (reminder + numberOfBuckets) % numberOfBuckets; + } else { + return reminder; + } + } + + private interface Hash { + int getHash(Object[] value); + } + + private static class IntegralHash implements Hash { + + private int index; + + private IntegralHash(int index) { + this.index = index; + } + + public int getHash(Object[] value) { + if (value[index] == null) { + return 42; + } + int intValue = 0; + if (value[index] instanceof Boolean) { + boolean boolValue = (boolean) value[intValue]; + intValue = boolValue ? 1 : 0; + } else if (value[index] instanceof Float) { + intValue = Float.floatToIntBits((float) value[index]); + } else { + intValue = Integer.parseInt(value[index].toString()); + } + return Murmur3_x86_32.hashInt(intValue, 42); + } + } + + private static class LongHash implements Hash { + + private int index; + + private LongHash(int index) { + this.index = index; + } + + public int getHash(Object[] value) { + if (value[index] == null) { + return 42; + } + long longValue = 0L; + if (value[index] instanceof java.lang.Double) { + longValue = Double.doubleToLongBits((double) value[index]); + } else { + longValue = Long.parseLong(value[index].toString()); + } + return Murmur3_x86_32.hashLong(longValue, 42); + } + } + + private static class TimestampHash implements Hash { + + private int index; + + private TimestampHash(int index) { + this.index = index; + } + + public int getHash(Object[] value) { + if (value[index] == null) { + return 42; + } + long timeMilSec = (long) value[index]; + long timeMicSec = timeMilSec * 1000; + return Murmur3_x86_32.hashLong(timeMicSec, 42); + } + } + + private static class DecimalHash implements Hash { + + private int index; + + private DecimalHash(int index) { + this.index = index; + } + + public int getHash(Object[] value) { + if (value[index] == null) { + return 42; + } + return Double.valueOf(value[index].toString()).hashCode(); + } + } + + private static class StringHash implements Hash { + + private int index; + + private StringHash(int index) { + this.index = index; + } + + @Override + public int getHash(Object[] value) { + // we should use the same hash method as spark, otherwise the same value will hash into diff + // bucket in carbon/parquet bucket tables the result of join will not correct. + if (value[index] == null) { + return 42; + } + UTF8String utf8String = UTF8String.fromBytes((byte[]) value[index]); + return utf8String.hashCode(); + } + } +} diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java index 28178f0..f82b715 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.metadata.schema.BucketingInfo; import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo; @@ -42,6 +43,7 @@ import org.apache.carbondata.processing.loading.partition.Partitioner; import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl; import org.apache.carbondata.processing.loading.partition.impl.RangePartitionerImpl; import org.apache.carbondata.processing.loading.partition.impl.RawRowComparator; +import org.apache.carbondata.processing.loading.partition.impl.SparkHashExpressionPartitionerImpl; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; import org.apache.carbondata.processing.util.CarbonBadRecordUtil; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -105,8 +107,24 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte } // hash partitioner to dispatch rows by bucket column - this.partitioner = - new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumOfRanges()); + if (CarbonCommonConstants.BUCKET_HASH_METHOD_DEFAULT.equals( + configuration.getBucketHashMethod())) { + // keep consistent with both carbon and spark tables. + this.partitioner = new SparkHashExpressionPartitionerImpl( + indexes, columnSchemas, bucketingInfo.getNumOfRanges()); + } else if (CarbonCommonConstants.BUCKET_HASH_METHOD_NATIVE.equals( + configuration.getBucketHashMethod())) { + // native does not keep consistent with spark, it just use java hash method directly such as + // Long, String, etc. May have better performance during convert process. + // But, do not use it when the table need to join with spark bucket tables! + this.partitioner = new HashPartitionerImpl( + indexes, columnSchemas, bucketingInfo.getNumOfRanges()); + } else { + // by default we use SparkHashExpressionPartitionerImpl to hash. + this.partitioner = new SparkHashExpressionPartitionerImpl( + indexes, columnSchemas, bucketingInfo.getNumOfRanges()); + } + } /** diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index f1ed8df..dd42092 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -139,6 +139,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { throw new CarbonDataWriterException(e.getCause()); } } catch (CarbonDataWriterException e) { + LOGGER.error(e); throw new CarbonDataLoadingException("Error while initializing writer: " + e.getMessage(), e); } catch (Exception e) { throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e); diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java index 2e503bd..589b49d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java @@ -20,6 +20,7 @@ package org.apache.carbondata.processing.loading.steps; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -27,11 +28,14 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.BucketingInfo; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -45,6 +49,9 @@ import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFacto import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; import org.apache.carbondata.processing.loading.exception.BadRecordFoundException; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.loading.partition.Partitioner; +import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl; +import org.apache.carbondata.processing.loading.partition.impl.SparkHashExpressionPartitionerImpl; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -70,6 +77,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce // set to true when there is no need to reArrange the data private boolean withoutReArrange; + private boolean isBucketColumnEnabled = false; + private Partitioner<CarbonRow> partitioner; public InputProcessorStepWithNoConverterImpl(CarbonDataLoadConfiguration configuration, CarbonIterator<Object[]>[] inputIterators, boolean withoutReArrange) { @@ -109,6 +118,50 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce if (!withoutReArrange) { orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader()); } + if (null != configuration.getBucketingInfo()) { + this.isBucketColumnEnabled = true; + initializeBucketColumnPartitioner(); + } + } + + /** + * initialize partitioner for bucket column + */ + private void initializeBucketColumnPartitioner() { + List<Integer> indexes = new ArrayList<>(); + List<ColumnSchema> columnSchemas = new ArrayList<>(); + DataField[] inputDataFields = getOutput(); + BucketingInfo bucketingInfo = configuration.getBucketingInfo(); + for (int i = 0; i < inputDataFields.length; i++) { + for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) { + if (inputDataFields[i].getColumn().getColName() + .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) { + indexes.add(i); + columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema()); + break; + } + } + } + + // hash partitioner to dispatch rows by bucket column + if (CarbonCommonConstants.BUCKET_HASH_METHOD_DEFAULT.equals( + configuration.getBucketHashMethod())) { + // keep consistent with both carbon and spark tables. + this.partitioner = new SparkHashExpressionPartitionerImpl( + indexes, columnSchemas, bucketingInfo.getNumOfRanges()); + } else if (CarbonCommonConstants.BUCKET_HASH_METHOD_NATIVE.equals( + configuration.getBucketHashMethod())) { + // native does not keep consistent with spark, it just use java hash method directly such as + // Long, String, etc. May have better performance during convert process. + // But, do not use it when the table need to join with spark bucket tables! + this.partitioner = new HashPartitionerImpl( + indexes, columnSchemas, bucketingInfo.getNumOfRanges()); + } else { + // by default we use SparkHashExpressionPartitionerImpl hash. + this.partitioner = new SparkHashExpressionPartitionerImpl( + indexes, columnSchemas, bucketingInfo.getNumOfRanges()); + } + } private void convertComplexDataType(Map<Integer, GenericDataType> dataFieldsWithComplexDataType) { @@ -148,7 +201,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce outIterators[i] = new InputProcessorIterator(readerIterators[i], batchSize, rowCounter, orderOfData, noDictionaryMapping, dataTypes, configuration, - dataFieldsWithComplexDataType, rowConverter, withoutReArrange); + dataFieldsWithComplexDataType, rowConverter, withoutReArrange, isBucketColumnEnabled, + partitioner); } return outIterators; } @@ -208,14 +262,15 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce RowConverter converter; CarbonDataLoadConfiguration configuration; - + private boolean isBucketColumnEnabled = false; + private Partitioner<CarbonRow> partitioner; private boolean withoutReArrange; public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping, DataType[] dataTypes, CarbonDataLoadConfiguration configuration, Map<Integer, GenericDataType> dataFieldsWithComplexDataType, RowConverter converter, - boolean withoutReArrange) { + boolean withoutReArrange, boolean bucketColumnEnabled, Partitioner<CarbonRow> partitioner) { this.inputIterators = inputIterators; this.batchSize = batchSize; this.counter = 0; @@ -234,6 +289,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce this.configuration = configuration; this.converter = converter; this.withoutReArrange = withoutReArrange; + this.isBucketColumnEnabled = bucketColumnEnabled; + this.partitioner = partitioner; } @Override @@ -278,6 +335,10 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce if (configuration.isIndexColumnsPresent()) { carbonRow = converter.convert(carbonRow); } + if (isBucketColumnEnabled) { + short rangeNumber = (short) partitioner.getPartition(carbonRow); + carbonRow.setRangeId(rangeNumber); + } carbonRowBatch.addRow(carbonRow); count++; } @@ -288,6 +349,10 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce if (configuration.isIndexColumnsPresent()) { carbonRow = converter.convert(carbonRow); } + if (isBucketColumnEnabled) { + short rangeNumber = (short) partitioner.getPartition(carbonRow); + carbonRow.setRangeId(rangeNumber); + } carbonRowBatch.addRow(carbonRow); count++; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 08273b0..368f5d0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -511,6 +511,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, tempStoreLocation, carbonStoreLocation); carbonFactDataHandlerModel.setSegmentId(carbonLoadModel.getSegmentId()); + carbonFactDataHandlerModel.setBucketId(carbonLoadModel.getBucketId()); setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel); this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns(); dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel); diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 85413cc..121b798 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -90,6 +90,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel); carbonFactDataHandlerModel.setCompactionFlow(true); carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId()); + carbonFactDataHandlerModel.setBucketId(loadModel.getBucketId()); this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns(); dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java index 66c418e..3a21039 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java @@ -306,7 +306,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { * @return more element is present */ public boolean hasNext() { - return this.recordHolderHeapLocal.size() > 0; + return this.recordHolderHeapLocal != null && this.recordHolderHeapLocal.size() > 0; } public void close() { diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 6e98aaa..a95bf15 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -535,7 +535,7 @@ public class CarbonFactDataHandlerModel { return bucketId; } - public void setBucketId(Integer bucketId) { + public void setBucketId(int bucketId) { this.bucketId = bucketId; }