This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 21110cd  [CARBONDATA-3721][CARBONDATA-3590] Optimize Bucket Table
21110cd is described below

commit 21110cd2e053e6da2a9e4341eb2fc870b3b75563
Author: Zhangshunyu <zhangshunyu1...@126.com>
AuthorDate: Sun Feb 9 22:04:48 2020 +0800

    [CARBONDATA-3721][CARBONDATA-3590] Optimize Bucket Table
    
    Why is this PR needed?
    
    Support Bucket Table consistent with spark to improve the join performance 
by avoid shuffle for bucket column. The same time, fix bugs about load/compact 
query of bucket.
    
    What changes were proposed in this PR?
    
    Support Bucket Table and consistent with spark to improve the join 
performance by avoid shuffle for bucket column. Fix bugs also.
    
    1. For create table, ddl support both tblproperties and clustered by like 
hive.
    2. For loading, fix some problems in loading when bucket column specified, 
make it clusterd into diff files based on bucket column.
    3. For query, the hash impl should either keep the same for a given value 
or keep same with parquet table, so that the join result of diff bucket tables 
give correct result. By the way, the hash impl is configurable.
    4. For compaction, group the block files based on bucket id, the data 
should hash into diff carbondata files also, otherwise the query flow will 
group the files based on bucket num, all data compacted will com into 1 
partition and the join result will mismatch, the performace will very slow.
    5. For tests, add 19 test cases in TableBucketingTestCase
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3637
---
 .../core/constants/CarbonCommonConstants.java      |  14 +
 .../core/metadata/schema/table/CarbonTable.java    |  14 +
 .../core/util/AbstractDataFileFooterConverter.java |   3 +-
 docs/ddl-of-carbondata.md                          |  10 +-
 .../org/apache/carbon/flink/TestCarbonWriter.scala | 118 ++-
 .../cluster/sdv/generated/BucketingTestCase.scala  | 167 ----
 .../org/apache/carbondata/spark/CarbonOption.scala |   8 +-
 .../spark/rdd/CarbonDataRDDFactory.scala           |   2 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     |  88 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |   7 +-
 .../scala/org/apache/spark/sql/CarbonSource.scala  |  22 +-
 .../command/carbonTableSchemaCommon.scala          |  14 +-
 .../management/CarbonInsertFromStageCommand.scala  |  19 +-
 ...nAlterTableColRenameDataTypeChangeCommand.scala |  14 +
 .../schema/CarbonAlterTableDropColumnCommand.scala |  13 +
 .../table/CarbonDescribeFormattedCommand.scala     |  15 +
 .../strategy/CarbonLateDecodeStrategy.scala        |   1 +
 .../sql/parser/CarbonSparkSqlParserUtil.scala      |  22 +-
 .../testsuite/binary/TestBinaryDataType.scala      |  45 +-
 .../badrecordloger/BadRecordActionTest.scala       |   8 +-
 .../createTable/TestCreateTableLike.scala          |   2 +-
 .../spark/carbondata/CarbonDataSourceSuite.scala   |  11 +-
 .../carbondata/DataLoadFailAllTypeSortTest.scala   |   4 +-
 .../bucketing/TableBucketingTestCase.scala         | 886 ++++++++++++++++++++-
 pom.xml                                            |   3 +-
 processing/pom.xml                                 |  16 +
 .../loading/CarbonDataLoadConfiguration.java       |  10 +
 .../processing/loading/DataLoadProcessBuilder.java |  18 +-
 .../processing/loading/model/CarbonLoadModel.java  |  13 +
 .../partition/impl/HashPartitionerImpl.java        |  10 +-
 .../impl/SparkHashExpressionPartitionerImpl.java   | 182 +++++
 .../steps/DataConverterProcessorStepImpl.java      |  22 +-
 .../loading/steps/DataWriterProcessorStepImpl.java |   1 +
 .../InputProcessorStepWithNoConverterImpl.java     |  71 +-
 .../merger/CompactionResultSortProcessor.java      |   1 +
 .../merger/RowResultMergerProcessor.java           |   1 +
 .../sortdata/SingleThreadFinalSortFilesMerger.java |   2 +-
 .../store/CarbonFactDataHandlerModel.java          |   2 +-
 38 files changed, 1531 insertions(+), 328 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 7565555..333c7b6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2379,4 +2379,18 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_SI_SEGMENT_MERGE_DEFAULT = "false";
 
+  /**
+   * Hash method of bucket table
+   */
+  public static final String BUCKET_HASH_METHOD = "bucket_hash_method";
+  public static final String BUCKET_HASH_METHOD_DEFAULT = 
"spark_hash_expression";
+  public static final String BUCKET_HASH_METHOD_SPARK_EXPRESSION = 
"spark_hash_expression";
+  public static final String BUCKET_HASH_METHOD_NATIVE = "native";
+
+  /**
+   * bucket properties
+   */
+  public static final String BUCKET_COLUMNS = "bucket_columns";
+  public static final String BUCKET_NUMBER = "bucket_number";
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 68bdd47..e852f5f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -717,6 +717,20 @@ public class CarbonTable implements Serializable, Writable 
{
     }
   }
 
+  public String getBucketHashMethod() {
+    String configuredMethod = tableInfo.getFactTable().getTableProperties()
+        .get(CarbonCommonConstants.BUCKET_HASH_METHOD);
+    if (configuredMethod == null) {
+      return CarbonCommonConstants.BUCKET_HASH_METHOD_DEFAULT;
+    } else {
+      if 
(CarbonCommonConstants.BUCKET_HASH_METHOD_NATIVE.equals(configuredMethod)) {
+        return CarbonCommonConstants.BUCKET_HASH_METHOD_NATIVE;
+      }
+      // by default we use spark_hash_expression hash method
+      return CarbonCommonConstants.BUCKET_HASH_METHOD_DEFAULT;
+    }
+  }
+
   /**
    * to get the normal dimension or the primitive dimension of the complex type
    *
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 91cc456..e49aacc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -137,7 +137,8 @@ public abstract class AbstractDataFileFooterConverter {
       boolean isTransactionalTable) throws IOException {
     CarbonIndexFileReader indexReader = new 
CarbonIndexFileReader(configuration);
     List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
-    String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
+    String formattedPath = filePath.replace("\\", "/");
+    String parentPath = formattedPath.substring(0, 
formattedPath.lastIndexOf("/"));
     try {
       // open the reader
       if (fileData != null) {
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 2f4cba1..3416426 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -104,8 +104,8 @@ CarbonData DDL statements are documented here,which 
includes:
 | [CACHE_LEVEL](#caching-at-block-or-blocklet-level)           | Column 
metadata caching level. Whether to cache column metadata of block or blocklet |
 | [FLAT_FOLDER](#support-flat-folder-same-as-hiveparquet)      | Whether to 
write all the carbondata files in a single folder.Not writing segments folder 
during incremental load |
 | [LONG_STRING_COLUMNS](#string-longer-than-32000-characters)  | Columns which 
are greater than 32K characters                |
-| [BUCKETNUMBER](#bucketing)                                   | Number of 
buckets to be created                              |
-| [BUCKETCOLUMNS](#bucketing)                                  | Columns which 
are to be placed in buckets                    |
+| [BUCKET_NUMBER](#bucketing)                                   | Number of 
buckets to be created                              |
+| [BUCKET_COLUMNS](#bucketing)                                  | Columns 
which are to be placed in buckets                    |
 | [LOAD_MIN_SIZE_INMB](#load-minimum-data-size)                | Minimum input 
data size per node for data loading          |
 | [Range Column](#range-column)                                | partition 
input data by range                              |
 
@@ -991,8 +991,8 @@ Users can specify which columns to include and exclude for 
local dictionary gene
   CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
                     [(col_name data_type, ...)]
   STORED AS carbondata
-  TBLPROPERTIES('BUCKETNUMBER'='noOfBuckets',
-  'BUCKETCOLUMNS'='columnname')
+  TBLPROPERTIES('BUCKET_NUMBER'='noOfBuckets',
+  'BUCKET_COLUMNS'='columnname')
   ```
 
   **NOTE:**
@@ -1011,7 +1011,7 @@ Users can specify which columns to include and exclude 
for local dictionary gene
     productBatch STRING,
     revenue INT)
   STORED AS carbondata
-  TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='productName')
+  TBLPROPERTIES ('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='productName')
   ```
 
 ## CACHE
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 9edf7b9..1d82a75 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -20,20 +20,23 @@ package org.apache.carbon.flink
 import java.util.Properties
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
-
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.spark.sql.execution.exchange.Exchange
 
 class TestCarbonWriter extends QueryTest {
 
   val tableName = "test_flink"
+  val bucketTableName = "insert_bucket_table"
+
 
   test("Writing flink data to local carbon table") {
     sql(s"DROP TABLE IF EXISTS $tableName").collect()
@@ -193,6 +196,117 @@ class TestCarbonWriter extends QueryTest {
     }
   }
 
+  test("test carbon writer of bucket table") {
+    sql(s"DROP TABLE IF EXISTS $tableName").collect()
+    sql(s"DROP TABLE IF EXISTS $bucketTableName").collect()
+    sql(
+      s"""
+         | CREATE TABLE $tableName (stringField string, intField int, 
shortField short)
+         | STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 
'BUCKET_COLUMNS'='stringField')
+      """.stripMargin
+    ).collect()
+    sql(
+      s"""
+         | CREATE TABLE $bucketTableName (stringField string, intField int, 
shortField short)
+         | STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 
'BUCKET_COLUMNS'='stringField')
+      """.stripMargin
+    ).collect()
+
+    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+    val dataTempPath = rootPath + "/data/temp/"
+
+    try {
+      val flinkTablePath = storeLocation + "/" + tableName + "/"
+
+      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.setParallelism(1)
+      environment.setRestartStrategy(RestartStrategies.noRestart)
+
+      val dataCount = 1000
+      val source = new TestSource(dataCount) {
+        @throws[InterruptedException]
+        override def get(index: Int): Array[AnyRef] = {
+          val data = new Array[AnyRef](3)
+          data(0) = "test" + index
+          data(1) = index.asInstanceOf[AnyRef]
+          data(2) = 12345.asInstanceOf[AnyRef]
+          data
+        }
+
+        @throws[InterruptedException]
+        override def onFinish(): Unit = {
+          Thread.sleep(5000L)
+        }
+      }
+      val stream = environment.addSource(source)
+      val factory = CarbonWriterFactory.builder("Local").build(
+        "default",
+        tableName,
+        flinkTablePath,
+        new Properties,
+        writerProperties,
+        carbonProperties
+      )
+      val streamSink = StreamingFileSink.forBulkFormat(new 
Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+      stream.addSink(streamSink)
+
+      try environment.execute
+      catch {
+        case exception: Exception =>
+          throw new UnsupportedOperationException(exception)
+      }
+      sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
+      val table = CarbonEnv.getCarbonTable(Option("default"), 
s"$tableName")(sqlContext.sparkSession)
+      val segmentDir = FileFactory.getCarbonFile(table.getTablePath + 
"/Fact/Part0/Segment_0")
+      val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".carbondata")
+      })
+      assert(dataFiles.length == 10)
+      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500)))
+      sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
+      val segmentDir2 = FileFactory.getCarbonFile(table.getTablePath + 
"/Fact/Part0/Segment_1")
+      val dataFiles2 = segmentDir2.listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".carbondata")
+      })
+      assert(dataFiles2.length == 10)
+      checkAnswer(sql(s"SELECT count(*) FROM $tableName where stringField != 
'AAA'"), Seq(Row(1000)))
+      sql(s"insert into $bucketTableName select * from $tableName").collect()
+
+      val plan = sql(
+        s"""
+          |select t1.*, t2.*
+          |from $tableName t1, $bucketTableName t2
+          |where t1.stringField = t2.stringField
+      """.stripMargin).queryExecution.executedPlan
+      var shuffleExists = false
+      plan.collect {
+        case s: Exchange if (s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+          s.getClass.getName.equals
+          ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+        => shuffleExists = true
+      }
+      assert(!shuffleExists, "shuffle should not exist on bucket tables")
+
+      checkAnswer(sql(
+        s"""select count(*) from
+          |(select t1.*, t2.*
+          |from $tableName t1, $bucketTableName t2
+          |where t1.stringField = t2.stringField) temp
+      """.stripMargin), Row(1000))
+    } finally {
+      sql(s"DROP TABLE IF EXISTS $tableName").collect()
+      sql(s"DROP TABLE IF EXISTS $bucketTableName").collect()
+    }
+  }
+
   private def newWriterProperties(
     dataTempPath: String,
     storeLocation: String) = {
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala
deleted file mode 100644
index e42b008..0000000
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/BucketingTestCase.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.cluster.sdv.generated
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.spark.sql.common.util._
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-import org.scalatest.BeforeAndAfterAll
-
-class BucketingTestCase extends QueryTest with BeforeAndAfterAll {
-
-  var threshold: Int = _
-  var timeformat = CarbonProperties.getInstance()
-    .getProperty("carbon.timestamp.format", 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-
-  override def beforeAll {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-    threshold = 
sqlContext.getConf("spark.sql.autoBroadcastJoinThreshold").toInt
-    sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "-1")
-    sql("DROP TABLE IF EXISTS bucket_table")
-  }
-
-  test("test exception if bucketcolumns be measure column") {
-    intercept[Exception] {
-      sql("DROP TABLE IF EXISTS bucket_table")
-      sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, 
name String, phonetype String," +
-          "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " 
+
-          "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='ID')")
-    }
-  }
-
-  test("test exception if bucketcolumns be complex data type column") {
-    intercept[Exception] {
-      sql("DROP TABLE IF EXISTS bucket_table")
-      sql("CREATE TABLE bucket_table (Id int, number double, name string, " +
-          "gamePoint array<double>, mac struct<num:double>) STORED AS 
carbondata TBLPROPERTIES" +
-          "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='gamePoint')")
-    }
-  }
-
-  test("test multi columns as bucketcolumns") {
-    sql("DROP TABLE IF EXISTS bucket_table")
-    sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, 
name String, phonetype String," +
-        "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name,phonetype')")
-    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE 
bucket_table")
-    val table: CarbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default_bucket_table")
-    if (table != null && table.getBucketingInfo != null) {
-      assert(true)
-    } else {
-      assert(false, "Bucketing info does not exist")
-    }
-  }
-
-  test("test multi columns as bucketcolumns with bucket join") {
-    sql("DROP TABLE IF EXISTS bucket_table")
-    sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, 
name String, phonetype String," +
-        "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='country,name')")
-    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE 
bucket_table")
-
-    val plan = sql(
-      """
-        |select t1.*, t2.*
-        |from bucket_table t1, bucket_table t2
-        |where t1.country = t2.country and t1.name = t2.name
-      """.stripMargin).queryExecution.executedPlan
-    var shuffleExists = false
-    plan.collect {
-      case s: ShuffleExchangeExec => shuffleExists = true
-    }
-    assert(!shuffleExists, "shuffle should not exist on bucket column join")
-  }
-
-  test("test non bucket column join") {
-    sql("DROP TABLE IF EXISTS bucket_table")
-    sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, 
name String, phonetype String," +
-        "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='country')")
-    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE 
bucket_table")
-
-    val plan = sql(
-      """
-        |select t1.*, t2.*
-        |from bucket_table t1, bucket_table t2
-        |where t1.name = t2.name
-      """.stripMargin).queryExecution.executedPlan
-    var shuffleExists = false
-
-    plan.collect {
-      case s: ShuffleExchangeExec => shuffleExists = true
-    }
-    assert(shuffleExists, "shuffle should exist on non-bucket column join")
-  }
-
-  test("test bucketcolumns through multi data loading plus compaction") {
-    sql("DROP TABLE IF EXISTS bucket_table")
-    sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, 
name String, phonetype String," +
-        "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
-    val numOfLoad = 10
-    for (j <- 0 until numOfLoad) {
-      sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE 
bucket_table")
-    }
-    sql("ALTER TABLE bucket_table COMPACT 'MAJOR'")
-
-    val plan = sql(
-      """
-        |select t1.*, t2.*
-        |from bucket_table t1, bucket_table t2
-        |where t1.name = t2.name
-      """.stripMargin).queryExecution.executedPlan
-    var shuffleExists = false
-    plan.collect {
-      case s: ShuffleExchangeExec => shuffleExists = true
-    }
-    assert(!shuffleExists, "shuffle should not exist on bucket tables")
-  }
-
-  test("drop non-bucket column, test bucket column join") {
-    sql("DROP TABLE IF EXISTS bucket_table")
-    sql("CREATE TABLE bucket_table (ID Int, date Timestamp, country String, 
name String, phonetype String," +
-        "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
-    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE 
bucket_table")
-
-    sql("ALTER TABLE bucket_table DROP COLUMNS (ID,country)")
-
-    val plan = sql(
-      """
-        |select t1.*, t2.*
-        |from bucket_table t1, bucket_table t2
-        |where t1.name = t2.name
-      """.stripMargin).queryExecution.executedPlan
-    var shuffleExists = false
-    plan.collect {
-      case s: ShuffleExchangeExec => shuffleExists = true
-    }
-    assert(!shuffleExists, "shuffle should not exist on bucket tables")
-  }
-
-  override def afterAll {
-    sql("DROP TABLE IF EXISTS bucket_table")
-    sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", 
threshold.toString)
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeformat)
-  }
-}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 5a41b50..06b2130 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -58,12 +58,12 @@ class CarbonOption(options: Map[String, String]) {
 
   lazy val tablePageSizeInMb: Option[String] = 
options.get("table_page_size_inmb")
 
-  lazy val bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt
+  lazy val bucketNumber: Int = options.getOrElse("bucket_number", "0").toInt
 
-  lazy val bucketColumns: String = options.getOrElse("bucketcolumns", "")
+  lazy val bucketColumns: String = options.getOrElse("bucket_columns", "")
 
-  lazy val isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
-                                    options.contains("bucketnumber")
+  lazy val isBucketingEnabled: Boolean = options.contains("bucket_columns") &&
+                                    options.contains("bucket_number")
 
   lazy val isStreaming: Boolean = {
     var stream = options.getOrElse("streaming", "false")
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index adb347e..a7603e2 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -990,7 +990,7 @@ object CarbonDataRDDFactory {
    * @param carbonLoadModel load model
    * @return Return an array that contains all of the elements in 
NewDataFrameLoaderRDD.
    */
-  private def loadDataFrame(
+   def loadDataFrame(
       sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
       scanResultRDD: Option[RDD[InternalRow]],
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index ccff4c1..4278ade 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.schema.BucketingInfo
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, 
CarbonDimension, ColumnSchema}
 import org.apache.carbondata.core.mutate.UpdateVO
@@ -91,6 +92,7 @@ class CarbonMergerRDD[K, V](
   var singleRange = false
   var expressionMapForRangeCol: util.Map[Integer, Expression] = null
   var broadCastSplits: Broadcast[CarbonInputSplitWrapper] = null
+  val bucketInfo = 
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getBucketingInfo
 
   def makeBroadCast(splits: util.List[CarbonInputSplit]): Unit = {
     broadCastSplits = sparkContext.broadcast(new 
CarbonInputSplitWrapper(splits))
@@ -108,6 +110,12 @@ class CarbonMergerRDD[K, V](
       } else {
         null
       }
+      val bucketId: Int = if (bucketInfo != null) {
+        carbonSparkPartition.idx
+      } else {
+        0
+      }
+      carbonLoadModel.setBucketId(bucketId);
 
       var mergeStatus = false
       var mergeNumber = ""
@@ -584,36 +592,40 @@ class CarbonMergerRDD[K, V](
     logInfo("no.of.nodes where data present=" + nodeBlockMap.size())
     defaultParallelism = sparkContext.defaultParallelism
 
-    // Create Spark Partition for each task and assign blocks
-    nodeBlockMap.asScala.foreach { case (nodeName, splitList) =>
-      val taskSplitList = new java.util.ArrayList[NodeInfo](0)
-      nodeTaskBlocksMap.put(nodeName, taskSplitList)
-      var blockletCount = 0
-      splitList.asScala.foreach { splitInfo =>
-        val splitsPerNode = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
-        blockletCount = blockletCount + 
splitsPerNode.getCarbonInputSplitList.size()
-        taskSplitList.add(
-          NodeInfo(splitsPerNode.getTaskId, 
splitsPerNode.getCarbonInputSplitList.size()))
-
-        if (blockletCount != 0) {
-          val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
-          val multiBlockSplit = if (null == rangeColumn || singleRange) {
-            new CarbonMultiBlockSplit(
-              taskInfo.getCarbonInputSplitList,
-              Array(nodeName))
-          } else {
-            var splitListForRange = new util.ArrayList[CarbonInputSplit]()
-            new CarbonMultiBlockSplit(
-              splitListForRange,
-              Array(nodeName))
+    if (bucketInfo != null) {
+      distributeBucketPartitions(result, splits, bucketInfo)
+    } else {
+      // Create Spark Partition for each task and assign blocks
+      nodeBlockMap.asScala.foreach { case (nodeName, splitList) =>
+        val taskSplitList = new java.util.ArrayList[NodeInfo](0)
+        nodeTaskBlocksMap.put(nodeName, taskSplitList)
+        var blockletCount = 0
+        splitList.asScala.foreach { splitInfo =>
+          val splitsPerNode = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
+          blockletCount = blockletCount + 
splitsPerNode.getCarbonInputSplitList.size()
+          taskSplitList.add(
+            NodeInfo(splitsPerNode.getTaskId, 
splitsPerNode.getCarbonInputSplitList.size()))
+
+          if (blockletCount != 0) {
+            val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
+            val multiBlockSplit = if (null == rangeColumn || singleRange) {
+              new CarbonMultiBlockSplit(
+                taskInfo.getCarbonInputSplitList,
+                Array(nodeName))
+            } else {
+              var splitListForRange = new util.ArrayList[CarbonInputSplit]()
+              new CarbonMultiBlockSplit(
+                splitListForRange,
+                Array(nodeName))
+            }
+            result.add(
+              new CarbonSparkPartition(
+                id,
+                taskPartitionNo,
+                multiBlockSplit,
+                getPartitionNamesFromTask(taskInfo.getTaskId, 
partitionTaskMap)))
+            taskPartitionNo += 1
           }
-          result.add(
-            new CarbonSparkPartition(
-              id,
-              taskPartitionNo,
-              multiBlockSplit,
-              getPartitionNamesFromTask(taskInfo.getTaskId, partitionTaskMap)))
-          taskPartitionNo += 1
         }
       }
     }
@@ -642,6 +654,24 @@ class CarbonMergerRDD[K, V](
     result.toArray(new Array[Partition](result.size))
   }
 
+  private def distributeBucketPartitions(result: util.ArrayList[Partition],
+      splits: util.List[InputSplit], bucketInfo: BucketingInfo): Unit = {
+    // distribute the files based on bucket id
+    var i = 0
+    val bucketed =
+      splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => 
f.getBucketId)
+    (0 until bucketInfo.getNumOfRanges).map { bucketId =>
+      val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
+      val multiBlockSplit =
+        new CarbonMultiBlockSplit(
+          bucketPartitions.asJava,
+          bucketPartitions.flatMap(_.getLocations).toArray)
+      val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+      i += 1
+      result.add(partition)
+    }
+  }
+
   private def getRangesFromRDD(rangeColumn: CarbonColumn,
       carbonTable: CarbonTable,
       defaultParallelism: Int,
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index ef5c9b1..ab97624 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -96,8 +96,6 @@ class CarbonScanRDD[T: ClassTag](
 
   private var directFill = false
 
-  private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
-
   private var segmentsToAccess: Array[Segment] = _
 
   @transient val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getName)
@@ -262,11 +260,12 @@ class CarbonScanRDD[T: ClassTag](
           CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
       }
       // If bucketing is enabled on table then partitions should be grouped 
based on buckets.
-      if (bucketedTable != null) {
+      val bucketInfo = tableInfo.getFactTable.getBucketingInfo
+      if (bucketInfo != null) {
         var i = 0
         val bucketed =
           splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => 
f.getBucketId)
-        (0 until bucketedTable.getNumOfRanges).map { bucketId =>
+        (0 until bucketInfo.getNumOfRanges).map { bucketId =>
           val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
           val multiBlockSplit =
             new CarbonMultiBlockSplit(
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 8ca96ab..4e8a00f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTableType}
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.hive.CarbonMetaStore
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
@@ -364,13 +364,24 @@ object CarbonSource {
       table: CatalogTable,
       metaStore: CarbonMetaStore
   ): (TableInfo, CatalogTable) = {
-    val properties = CarbonSparkSqlParserUtil.getProperties(table)
+    val updatedProperties = new java.util.HashMap[String, String]()
+    CarbonSparkSqlParserUtil
+      .normalizeProperties(table.properties)
+      .foreach(e => updatedProperties.put(e._1, e._2))
+    if (table.bucketSpec.isDefined) {
+      val bucketSpec = table.bucketSpec.get
+      updatedProperties.put("bucket_columns", 
bucketSpec.bucketColumnNames.mkString)
+      updatedProperties.put("bucket_number", bucketSpec.numBuckets.toString)
+    }
+    val updateTable: CatalogTable = table.copy(
+      properties = updatedProperties.asScala.toMap, bucketSpec = None)
+    val properties = CarbonSparkSqlParserUtil.getProperties(updateTable)
     if (isCreatedByCarbonExtension(properties)) {
       // Table is created by SparkSession with CarbonExtension,
       // There is no TableInfo yet, so create it from CatalogTable
-      val tableInfo = createTableInfo(sparkSession, table)
+      val tableInfo = createTableInfo(sparkSession, updateTable)
       val catalogTable = createCatalogTableForCarbonExtension(
-        table, tableInfo, properties, metaStore)
+        updateTable, tableInfo, properties, metaStore)
       (tableInfo, catalogTable)
     } else {
       // Legacy code path (table is created by CarbonSession)
@@ -378,7 +389,8 @@ object CarbonSource {
       val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
       val isTransactionalTable = properties.getOrElse("isTransactional", 
"true").contains("true")
       tableInfo.setTransactionalTable(isTransactionalTable)
-      val catalogTable = createCatalogTableForCarbonSession(table, tableInfo, 
properties, metaStore)
+      val catalogTable =
+        createCatalogTableForCarbonSession(updateTable, tableInfo, properties, 
metaStore)
       (tableInfo, catalogTable)
     }
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 5ee10e6..63044ab 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -806,16 +806,14 @@ class TableNewProcessor(cm: TableModel) {
         val col = allColumns.find(_.getColumnName.equalsIgnoreCase(b))
         col match {
           case Some(colSchema: ColumnSchema) =>
-            if (colSchema.isDimensionColumn && 
!colSchema.getDataType.isComplexType) {
+            if (!colSchema.getDataType.isComplexType &&
+              !DataTypes.isDecimal(colSchema.getDataType)) {
               colSchema
             } else {
-              LOGGER.error(s"Bucket field must be dimension column and " +
-                           s"should not be measure or complex column: ${ 
colSchema.getColumnName }")
-              CarbonException.analysisException(s"Bucket field must be 
dimension column and " +
-                                                s"should not be measure or 
complex column: ${
-                                                  colSchema
-                                                    .getColumnName
-                                                }")
+              LOGGER.error(s"Bucket field should not be complex column or 
decimal" +
+                s"data type: ${colSchema.getColumnName}")
+              CarbonException.analysisException(s"Bucket field should not be 
complex column or" +
+                s" decimal data type: ${colSchema.getColumnName}")
             }
           case _ =>
             LOGGER.error(s"Bucket field is not present in table columns")
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 64f802f..d63ec24 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -47,6 +47,7 @@ import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 
 /**
  * Collect stage input files and trigger a loading into carbon table.
@@ -269,13 +270,17 @@ case class CarbonInsertFromStageCommand(
                   s"${table.getDatabaseName}.${table.getTableName}")
       val start = System.currentTimeMillis()
       val dataFrame = 
DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
-      DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
-        spark,
-        Option(dataFrame),
-        loadModel,
-        SparkSQLUtil.sessionState(spark).newHadoopConf()
-      ).map { row =>
-        (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+      if (table.getBucketingInfo == null) {
+        DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+          spark,
+          Option(dataFrame),
+          loadModel,
+          SparkSQLUtil.sessionState(spark).newHadoopConf()
+        ).map { row =>
+          (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+        }
+      } else {
+        CarbonDataRDDFactory.loadDataFrame(spark.sqlContext, 
Option(dataFrame), None, loadModel)
       }
       LOGGER.info(s"finish data loading, time taken 
${System.currentTimeMillis() - start}ms")
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 190b776..299dfba 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -71,6 +71,20 @@ abstract class 
CarbonAlterTableColumnRenameCommand(oldColumnName: String, newCol
       }
     }
 
+    // if column rename operation is on bucket column, then fail the rename 
operation
+    if (null != carbonTable.getBucketingInfo) {
+      val bucketColumns = carbonTable.getBucketingInfo.getListOfColumns
+      bucketColumns.asScala.foreach {
+        col =>
+          if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
+            throw new MalformedCarbonCommandException(
+              s"Column Rename Operation failed. Renaming " +
+                s"the bucket column $oldColumnName is not " +
+                s"allowed")
+          }
+      }
+    }
+
   }
 }
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 254e766..de0b0d3 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -88,6 +88,19 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
             "partition columns")
         }
       }
+      val bucketInfo = carbonTable.getBucketingInfo
+      if (bucketInfo != null) {
+        val bucketColumnSchemaList = bucketInfo.getListOfColumns.asScala
+          .map(_.getColumnName)
+        // check each column existence in the table
+        val bucketColumns = alterTableDropColumnModel.columns.filter {
+          tableColumn => bucketColumnSchemaList.contains(tableColumn)
+        }
+        if (bucketColumns.nonEmpty) {
+          throwMetadataException(dbName, tableName, "Bucket columns cannot be 
dropped: " +
+            s"$bucketColumns")
+        }
+      }
 
       var dictionaryColumns = 
Seq[org.apache.carbondata.core.metadata.schema.table.column
       .ColumnSchema]()
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index ce240c7..85b6f72 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -242,6 +242,21 @@ private[sql] case class CarbonDescribeFormattedCommand(
     }
 
     
//////////////////////////////////////////////////////////////////////////////
+    // Bucket Information
+    
//////////////////////////////////////////////////////////////////////////////
+    val bucketInfo = carbonTable.getBucketingInfo()
+    if (bucketInfo != null) {
+      results ++= Seq(
+        ("", "", ""),
+        ("## Bucket Information", "", ""),
+        ("Bucket Columns",
+          bucketInfo.getListOfColumns.asScala.map {
+            col => 
s"${col.getColumnName}:${col.getDataType.getName}"}.mkString(", "), ""),
+        ("Number of Buckets", bucketInfo.getNumOfRanges.toString, "")
+      )
+    }
+
+    
//////////////////////////////////////////////////////////////////////////////
     // Dynamic Information
     
//////////////////////////////////////////////////////////////////////////////
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index ae7ab11..e65c65d 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -633,6 +633,7 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
         }
       }
       if (bucketColumns.size == cols.size) {
+        // use HashPartitioning will not shuffle
         HashPartitioning(bucketColumns, numBuckets)
       } else {
         UnknownPartitioning(0)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 2d128a4..5bfd68a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -33,9 +33,7 @@ import 
org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterT
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import 
org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,
 CarbonAlterTableColRenameDataTypeChangeCommand}
 import 
org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, 
CarbonCreateTableCommand}
-import org.apache.spark.sql.hive.CarbonMVRules
 import org.apache.spark.sql.types.StructField
-import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -79,6 +77,24 @@ object CarbonSparkSqlParserUtil {
     if (streaming != null && streaming.equalsIgnoreCase("true") && 
tablePath.startsWith("s3")) {
       throw new UnsupportedOperationException("streaming is not supported with 
s3 store")
     }
+    // bucket table should not set sort scope because the data can only sort 
inside bucket itself
+    val bucketColumnsProp = 
tableInfo.getFactTable.getTableProperties.get("bucket_columns")
+    val sortScopeProp = 
tableInfo.getFactTable.getTableProperties.get("sort_scope")
+    if (bucketColumnsProp != null) {
+      if (sortScopeProp != null) {
+        // bucket table can only sort inside bucket, can not set sort_scope 
for table.
+        throw new ProcessMetaDataException(tableInfo.getDatabaseName,
+          tableInfo.getFactTable.getTableName, "Bucket table only sort inside 
buckets," +
+            " can not set sort scope but can set sort columns.");
+      } else {
+        tableInfo.getFactTable.getListOfColumns.asScala.foreach(column =>
+          if (column.getDataType == DataTypes.BINARY) {
+            throw new ProcessMetaDataException(tableInfo.getDatabaseName,
+              tableInfo.getFactTable.getTableName, "bucket table do not 
support binary.");
+          }
+        )
+      }
+    }
     // Add validation for sort scope when create table
     val sortScope = tableInfo.getFactTable.getTableProperties.asScala
       .getOrElse("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
@@ -311,7 +327,7 @@ object CarbonSparkSqlParserUtil {
       sparkSession: SparkSession,
       selectQuery: Option[LogicalPlan] = None): TableInfo = {
     val tableProperties = normalizeProperties(getProperties(table))
-    val options = new CarbonOption(tableProperties)
+    val options = new CarbonOption(Map(tableProperties.toSeq: _*))
     // validate streaming property
     validateStreamingProperty(options)
     val parser = new CarbonSpark2SqlParser()
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index f80316d..4cd2b66 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -17,13 +17,16 @@
 package org.apache.carbondata.integration.spark.testsuite.binary
 
 import java.util.Arrays
+
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 import org.apache.commons.codec.binary.{Base64, Hex}
 import org.apache.spark.SparkException
+import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil
@@ -398,34 +401,26 @@ class TestBinaryDataType extends QueryTest with 
BeforeAndAfterAll {
         checkAnswer(sql("SELECT COUNT(*) FROM binaryTable where binaryField 
=cast('hello' as binary)"), Seq(Row(1)))
     }
 
-    test("Test create table with buckets unsafe") {
-        
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
 "true")
+    test("Test create table with binary datatype in bucket table") {
         sql("DROP TABLE IF EXISTS binaryTable")
+        sql("DROP TABLE IF EXISTS binaryTable2")
+        val exception = intercept[ProcessMetaDataException] {
         sql(
-            s"""
-               | CREATE TABLE IF NOT EXISTS binaryTable (
-               |    id INT,
-               |    label boolean,
-               |    name STRING,
-               |    binaryField BINARY,
-               |    autoLabel boolean)
-               | STORED AS carbondata
-               | TBLPROPERTIES('BUCKETNUMBER'='4', 
'BUCKETCOLUMNS'='binaryField')
-             """.stripMargin)
-        sql(
-            s"""
-               | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
-               | INTO TABLE binaryTable
-               | OPTIONS('header'='false')
-             """.stripMargin)
-
-        
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
 "false")
-        val table: CarbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default", "binaryTable")
-        if (table != null && table.getBucketingInfo() != null) {
-            assert(true)
-        } else {
-            assert(false, "Bucketing info does not exist")
+              s"""
+                 | CREATE TABLE IF NOT EXISTS binaryTable (
+                 |    id INT,
+                 |    label boolean,
+                 |    name STRING,
+                 |    binaryField BINARY,
+                 |    autoLabel boolean)
+                 | STORED AS carbondata
+                 | TBLPROPERTIES('BUCKET_NUMBER'='4', 
'BUCKET_COLUMNS'='binaryField')
+               """.stripMargin)
         }
+        assert(exception.getMessage.contains("bucket table do not support 
binary"))
+
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql("DROP TABLE IF EXISTS binaryTable2")
     }
 
     test("insert into for hive and carbon") {
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
index 7285101..27abdc6 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
@@ -233,10 +233,10 @@ class BadRecordActionTest extends QueryTest {
         " the detail reason"))
   }
 
-  test("test bad record with IGNORE option and sort scope as NO_SORT for 
bucketed table") {
+  test("test bad record with IGNORE option for bucketed table") {
     sql("drop table if exists sales_bucket")
     sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, 
country String," +
-          "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
AS carbondata TBLPROPERTIES ('BUCKETNUMBER'='2', 
'BUCKETCOLUMNS'='country','sort_scope'='NO_SORT')")
+          "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='2', 'BUCKET_COLUMNS'='country')")
     sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket 
OPTIONS" +
         "('bad_records_action'='IGNORE', 'DELIMITER'=" +
         " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
@@ -244,10 +244,10 @@ class BadRecordActionTest extends QueryTest {
       Seq(Row(2)))
   }
 
-  test("test bad record with REDIRECT option and sort scope as NO_SORT for 
bucketed table") {
+  test("test bad record with REDIRECT option for bucketed table") {
     sql("drop table if exists sales_bucket")
     sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, 
country String," +
-        "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
AS carbondata TBLPROPERTIES ('BUCKETNUMBER'='2', 'BUCKETCOLUMNS'='country', 
'sort_scope'='NO_SORT')")
+        "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED 
AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='2', 'BUCKET_COLUMNS'='country')")
     sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket 
OPTIONS" +
         "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
         " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { 
badRecordFilePath.getCanonicalPath } +
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
index a0d4d58..e830b0f 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableLike.scala
@@ -169,7 +169,7 @@ class TestCreateTableLike extends QueryTest with 
BeforeAndAfterEach with BeforeA
         | CREATE TABLE IF NOT EXISTS bkt_tbl (
         |   a int, b string
         | ) STORED AS carbondata
-        | TBLPROPERTIES ('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='b')
+        | TBLPROPERTIES ('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='b')
         | """.stripMargin)
 
     sql("create table targetTable like bkt_tbl")
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index cbb8109..508d8eb 100644
--- 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -227,24 +227,17 @@ class CarbonDataSourceSuite extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS create_source_test2")
   }
 
-  test("test to create bucket columns with int field") {
-    sql("drop table if exists create_source")
-    intercept[Exception] {
-      sql("create table create_source(intField int, stringField string, 
complexField array<string>) USING carbondata OPTIONS('bucketnumber'='1', 
'bucketcolumns'='intField')")
-    }
-  }
-
   test("test to create bucket columns with complex data type field") {
     sql("drop table if exists create_source")
     intercept[Exception] {
-      sql("create table create_source(intField int, stringField string, 
complexField array<string>) USING carbondata OPTIONS('bucketnumber'='1', 
'bucketcolumns'='complexField')")
+      sql("create table create_source(intField int, stringField string, 
complexField array<string>) USING carbondata OPTIONS('bucket_number'='1', 
'bucket_columns'='complexField')")
     }
   }
 
   test("test check results of table with complex data type and bucketing") {
     sql("drop table if exists create_source")
     sql("create table create_source(intField int, stringField string, 
complexField array<int>) " +
-        "USING carbondata OPTIONS('bucketnumber'='1', 
'bucketcolumns'='stringField')")
+        "USING carbondata OPTIONS('bucket_number'='1', 
'BUCKET_COLUMNS'='stringField')")
     sql("insert into create_source values(1,'source',array(1,2,3))")
     checkAnswer(sql("select * from create_source"), Row(1,"source", 
mutable.WrappedArray.newBuilder[Int].+=(1,2,3)))
     sql("drop table if exists create_source")
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index 61c48d4..f8c1beb 100644
--- 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -192,8 +192,8 @@ class DataLoadFailAllTypeSortTest extends QueryTest with 
BeforeAndAfterAll {
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
       sql("create table data_tbm(name String, dob long, weight int) " +
-          "STORED AS carbondata tblproperties('bucketnumber'='4', " +
-          "'bucketcolumns'='name', 'tableName'='data_tbm')")
+          "STORED AS carbondata tblproperties('bucket_number'='4', " +
+          "'BUCKET_COLUMNS'='name', 'tableName'='data_tbm')")
       val testData = s"$resourcesPath/badrecords/dummy.csv"
       sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_tbm""")
     } catch {
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 9b106ba..1ab3b38 100644
--- 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -17,16 +17,21 @@
 
 package org.apache.spark.carbondata.bucketing
 
-import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.exchange.Exchange
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
-
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -41,19 +46,170 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS t4")
     sql("DROP TABLE IF EXISTS t5")
     sql("DROP TABLE IF EXISTS t6")
+    sql("DROP TABLE IF EXISTS t6_")
     sql("DROP TABLE IF EXISTS t7")
     sql("DROP TABLE IF EXISTS t8")
     sql("DROP TABLE IF EXISTS t9")
     sql("DROP TABLE IF EXISTS t10")
     sql("DROP TABLE IF EXISTS t11")
+    sql("DROP TABLE IF EXISTS t12")
+    sql("DROP TABLE IF EXISTS t13")
+    sql("DROP TABLE IF EXISTS t14")
+    sql("DROP TABLE IF EXISTS t15")
+    sql("DROP TABLE IF EXISTS t16")
+    sql("DROP TABLE IF EXISTS t17")
+    sql("DROP TABLE IF EXISTS t18")
+    sql("DROP TABLE IF EXISTS t19")
+    sql("DROP TABLE IF EXISTS t20")
+    sql("DROP TABLE IF EXISTS t21")
+    sql("DROP TABLE IF EXISTS t22")
+    sql("DROP TABLE IF EXISTS t23")
+    sql("DROP TABLE IF EXISTS t24")
+    sql("DROP TABLE IF EXISTS t25")
+    sql("DROP TABLE IF EXISTS t26")
+    sql("DROP TABLE IF EXISTS t27")
+    sql("DROP TABLE IF EXISTS t28")
+    sql("DROP TABLE IF EXISTS t40")
+    sql("DROP TABLE IF EXISTS t41")
+    sql("DROP TABLE IF EXISTS t42")
+    sql("DROP TABLE IF EXISTS t43")
+    sql("DROP TABLE IF EXISTS t44")
+    sql("DROP TABLE IF EXISTS t45")
+    sql("DROP TABLE IF EXISTS t46")
+    sql("DROP TABLE IF EXISTS t47")
+    sql("DROP TABLE IF EXISTS t48")
+    sql("DROP TABLE IF EXISTS t49")
+    sql("DROP TABLE IF EXISTS t50")
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table")
+    sql("DROP TABLE IF EXISTS parquet_table")
   }
 
-  test("test create table with buckets") {
+  test("test create table with buckets using table properties and loaded data 
will" +
+    " store into different files") {
     sql("CREATE TABLE t4 (ID Int, date Timestamp, country String, name String, 
phonetype String," +
         "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
+        "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
     sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t4")
     val table = CarbonEnv.getCarbonTable(Option("default"), 
"t4")(sqlContext.sparkSession)
+    val segmentDir = FileFactory.getCarbonFile(table.getTablePath + 
"/Fact/Part0/Segment_0")
+    val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".carbondata")
+    })
+    assert(dataFiles.length == 4)
+    checkAnswer(sql("select count(*) from t4"), Row(100))
+    checkAnswer(sql("select count(*) from t4 where name='aaa99'"), Row(1))
+    if (table != null && table.getBucketingInfo() != null) {
+      assert(true)
+    } else {
+      assert(false, "Bucketing info does not exist")
+    }
+  }
+
+  test("test IUD of bucket table") {
+    sql("CREATE TABLE t40 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t40")
+    sql("CREATE TABLE t41 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t41")
+
+    // insert
+    sql(s"insert into t40 select 
101,'2015/10/16','china','aaa101','phone2569','ASD16163',15100")
+    checkAnswer(sql(
+      """select count(*) from t40
+      """.stripMargin), Row(101))
+    // update
+    sql(s"update t40 set (name) = ('aaa100') where name='aaa101'")
+    checkAnswer(sql(
+      """select count(*) from t40
+      """.stripMargin), Row(101))
+    checkAnswer(sql(
+      """select count(*) from t40 where name='aaa100'
+      """.stripMargin), Row(2))
+    // delete
+    sql(s"delete from t40 where name='aaa100'")
+    checkAnswer(sql(
+      """select count(*) from t40
+      """.stripMargin), Row(99))
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t40 t1, t41 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(99))
+    // insert again
+    sql(s"insert into t40 select 
1011,'2015/10/16','china','aaa1011','phone2569','ASD16163',15100")
+    sql(s"insert into t40 select 
1012,'2015/10/16','china','aaa1012','phone2569','ASD16163',15100")
+    sql(s"insert into t40 select 
1013,'2015/10/16','china','aaa1013','phone2569','ASD16163',15100")
+    sql(s"insert into t40 select 
1014,'2015/10/16','china','aaa1014','phone2569','ASD16163',15100")
+    checkAnswer(sql(
+      """select count(*) from t40
+      """.stripMargin), Row(103))
+
+    // join after IUD
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t40 t1, t41 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t40 t1, t41 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(99))
+
+    // insert into t41
+    sql(s"insert into t41 select 
1014,'2015/10/16','china','aaa1014','phone2569','ASD16163',15100")
+
+    // join after 2 tables both IUD
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t40 t1, t41 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
+    val plan2 = sql(
+      """
+        |select t1.*, t2.*
+        |from t40 t1, t41 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists2 = false
+    plan2.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists2 = true
+    }
+    assert(!shuffleExists2, "shuffle should not exist on bucket tables")
+  }
+
+  test("test create carbon table with buckets like hive sql") {
+    sql("CREATE TABLE t13 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) 
INTO 4 BUCKETS")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t13")
+    val table = CarbonEnv.getCarbonTable(Option("default"), 
"t13")(sqlContext.sparkSession)
+    val segmentDir = FileFactory.getCarbonFile(table.getTablePath + 
"/Fact/Part0/Segment_0")
+    val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".carbondata")
+    })
+    assert(dataFiles.length == 4)
+    checkAnswer(sql("select count(*) from t13"), Row(100))
+    checkAnswer(sql("select count(*) from t13 where name='aaa99'"), Row(1))
     if (table != null && table.getBucketingInfo() != null) {
       assert(true)
     } else {
@@ -65,7 +221,7 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
 "true")
     sql("CREATE TABLE t10 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
         "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
+        "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
     sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t10")
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
 "false")
     val table: CarbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default", "t10")
@@ -84,7 +240,7 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
            (ID Int, date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int)
            USING carbondata
-           OPTIONS("bucketnumber"="-1", "bucketcolumns"="name")
+           OPTIONS("bucket_number"="-1", "bucket_columns"="name")
         """)
       assert(false)
     }
@@ -106,7 +262,7 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
           | serialname String,
           | salary Int)
           | STORED AS carbondata
-          | TBLPROPERTIES('bucketnumber'='0', 'bucketcolumns'='name')
+          | TBLPROPERTIES('bucket_number'='0', 'bucket_columns'='name')
         """.stripMargin
       )
       assert(false)
@@ -116,7 +272,17 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
     }
   }
 
-  test("test create table with no bucket join of carbon tables") {
+  test("Bucket table only sort inside buckets, can not set sort scope but can 
set sort columns.") {
+    val ex = intercept[ProcessMetaDataException] {
+      sql("CREATE TABLE t44 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+        "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+        "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name', 'sort_columns'='name', 
'sort_scope'='global_sort')")
+    }
+    assert(ex.getMessage.contains("Bucket table only sort inside buckets," +
+      " can not set sort scope but can set sort columns."))
+  }
+
+  test("test create table with both no bucket join of carbon tables") {
     sql("CREATE TABLE t5 (ID Int, date Timestamp, country String, name String, 
phonetype String," +
         "serialname String, salary Int) STORED AS carbondata")
     sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t5")
@@ -126,6 +292,12 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
         |from t5 t1, t5 t2
         |where t1.name = t2.name
       """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t5 t1, t5 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
     var shuffleExists = false
     plan.collect {
       case s: Exchange if (s.getClass.getName.equals
@@ -137,17 +309,94 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(shuffleExists, "shuffle should exist on non bucket tables")
   }
 
-  test("test create table with bucket join of carbon tables") {
+  test("test join of carbon bucket table and non bucket parquet table") {
+    sql("CREATE TABLE t8 (ID Int, date Timestamp, country String, name String, 
phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t8")
+
+    sql("DROP TABLE IF EXISTS parquet_table")
+    sql("select * from t8").write
+      .format("parquet")
+      .saveAsTable("parquet_table")
+
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t8 t1, parquet_table t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t8 t1, parquet_table t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(shuffleExists, "shuffle should exist on non bucket tables")
+    sql("DROP TABLE parquet_table")
+  }
+
+  test("test no shuffle when using bucket tables") {
+    sql("CREATE TABLE t12 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) 
INTO 4 BUCKETS")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t12")
+
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table")
+    sql("select * from t12").write
+      .format("parquet")
+      .bucketBy(4, "name")
+      .saveAsTable("bucketed_parquet_table")
+
+    checkAnswer(sql("select count(*) from t12"), Row(100))
+    checkAnswer(sql("select count(*) from bucketed_parquet_table"), Row(100))
+
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t12 t1, bucketed_parquet_table t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+    sql("DROP TABLE bucketed_parquet_table")
+  }
+
+  test("test join of carbon bucket tables") {
     sql("CREATE TABLE t6 (ID Int, date Timestamp, country String, name String, 
phonetype String," +
         "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
+        "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
     sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t6")
+    sql("CREATE TABLE t6_ (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t6_")
     val plan = sql(
       """
         |select t1.*, t2.*
-        |from t6 t1, t6 t2
+        |from t6 t1, t6_ t2
         |where t1.name = t2.name
       """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t6 t1, t6_ t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
     var shuffleExists = false
     plan.collect {
       case s: Exchange if (s.getClass.getName.equals
@@ -159,24 +408,31 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(!shuffleExists, "shuffle should not exist on bucket tables")
   }
 
-  test("test create table with bucket join of carbon table and parquet table") 
{
+  test("test join of carbon bucket table and parquet bucket table") {
     sql("CREATE TABLE t7 (ID Int, date Timestamp, country String, name String, 
phonetype String," +
         "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
+        "('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='name')")
     sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t7")
 
     sql("DROP TABLE IF EXISTS bucketed_parquet_table")
     sql("select * from t7").write
       .format("parquet")
-      .bucketBy(4, "name")
+      .bucketBy(9, "name")
       .saveAsTable("bucketed_parquet_table")
-
+    // carbon join parquet, both bucket tables.
     val plan = sql(
       """
         |select t1.*, t2.*
         |from t7 t1, bucketed_parquet_table t2
         |where t1.name = t2.name
       """.stripMargin).queryExecution.executedPlan
+    // parquet join parquet, both bucket tables.
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t7 t1, bucketed_parquet_table t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
     var shuffleExists = false
     plan.collect {
       case s: Exchange if (s.getClass.getName.equals
@@ -189,23 +445,256 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE bucketed_parquet_table")
   }
 
-  test("test create table with bucket join of carbon table and non bucket 
parquet table") {
-    sql("CREATE TABLE t8 (ID Int, date Timestamp, country String, name String, 
phonetype String," +
-        "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
-        "('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='name')")
-    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t8")
+  test("test join of carbon bucket tables using hive sql") {
+    sql("CREATE TABLE t14 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) 
INTO 4 BUCKETS")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t14")
+    sql("CREATE TABLE t15 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata CLUSTERED BY (name) 
INTO 4 BUCKETS")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t15")
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t14 t1, t15 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t14 t1, t15 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+  }
 
-    sql("DROP TABLE IF EXISTS parquet_table")
-    sql("select * from t8").write
+  test("test join of diff data types as bucket column for carbon tables") {
+    sql("CREATE TABLE t16 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='ID')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t16")
+    sql("CREATE TABLE t17 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t17")
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t16 t1, t17 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t16 t1, t17 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
+  }
+
+  test("timestamp as bucket column, test join of carbon bucket tables") {
+    sql("CREATE TABLE t18 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='date')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t18")
+    sql("CREATE TABLE t19 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='date')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t19")
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t18 t1, t19 t2
+        |where t1.date = t2.date
+      """.stripMargin).queryExecution.executedPlan
+    // here the time column in source.csv has some duplicate values
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t18 t1, t19 t2
+        |where t1.date = t2.date) temp
+      """.stripMargin), Row(120))
+        var shuffleExists = false
+        plan.collect {
+          case s: Exchange if (s.getClass.getName.equals
+          ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+            s.getClass.getName.equals
+            ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+          => shuffleExists = true
+        }
+        assert(!shuffleExists, "shuffle should not exist on bucket tables")
+  }
+
+  test("timestamp as bucket column, test join of carbon bucket table and 
parquet table") {
+    sql("CREATE TABLE t20 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='date')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t20")
+    // parquet 1
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20")
+    sql("select * from t20").write
       .format("parquet")
-      .saveAsTable("parquet_table")
+      .bucketBy(9, "date")
+      .saveAsTable("bucketed_parquet_table_t20")
+    // parquet 2
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20_")
+    sql("select * from t20").write
+      .format("parquet")
+      .bucketBy(9, "date")
+      .saveAsTable("bucketed_parquet_table_t20_")
 
+    // parquet join with parquet
+    val plan2 = sql(
+      """
+        |select t1.*, t2.*
+        |from bucketed_parquet_table_t20_ t1, bucketed_parquet_table_t20 t2
+        |where t1.date = t2.date
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists2 = false
+    plan2.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists2 = true
+    }
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from bucketed_parquet_table_t20_ t1, bucketed_parquet_table_t20 t2
+        |where t1.date = t2.date) temp
+      """.stripMargin), Row(120))
+
+    // carbon join with parquet
     val plan = sql(
       """
         |select t1.*, t2.*
-        |from t8 t1, parquet_table t2
+        |from t20 t1, bucketed_parquet_table_t20 t2
+        |where t1.date = t2.date
+      """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t20 t1, bucketed_parquet_table_t20 t2
+        |where t1.date = t2.date) temp
+      """.stripMargin), Row(120))
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+
+    assert(shuffleExists == shuffleExists2, "for no string bucket column, 
shuffle should " +
+      "keep the same behavior as parquet")
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20")
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table_t20_")
+
+  }
+
+  test("long as bucket column, test join of carbon bucket table and parquet 
table") {
+    sql("CREATE TABLE t21 (ID long, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='ID')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t21")
+    // parquet 1
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table_t21")
+    sql("select * from t21").write
+      .format("parquet")
+      .bucketBy(9, "ID")
+      .saveAsTable("bucketed_parquet_table_t21")
+
+    // carbon join with parquet
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t21 t1, bucketed_parquet_table_t21 t2
+        |where t1.ID = t2.ID
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist in bucket table join")
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t21 t1, bucketed_parquet_table_t21 t2
+        |where t1.ID = t2.ID) temp
+      """.stripMargin), Row(100))
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table_t21")
+  }
+
+  test("int as bucket column, test join of carbon bucket table and parquet 
table") {
+    sql("CREATE TABLE t22 (ID int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='9', 'BUCKET_COLUMNS'='ID')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t22")
+    // parquet 1
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table_t22")
+    sql("select * from t22").write
+      .format("parquet")
+      .bucketBy(9, "ID")
+      .saveAsTable("bucketed_parquet_table_t22")
+
+    // carbon join with parquet
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t22 t1, bucketed_parquet_table_t22 t2
+        |where t1.ID = t2.ID
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist in bucket table join")
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t22 t1, bucketed_parquet_table_t22 t2
+        |where t1.ID = t2.ID) temp
+      """.stripMargin), Row(100))
+    sql("DROP TABLE IF EXISTS bucketed_parquet_table_t22")
+  }
+
+  test("test bucket hash method config") {
+    sql("CREATE TABLE t23 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name', 
'bucket_hash_method'='NATIVE' )")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t23")
+    sql("CREATE TABLE t24 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name', 
'bucket_hash_method'='NATIVE')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t24")
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t23 t1, t24 t2
         |where t1.name = t2.name
       """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t23 t1, t24 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
     var shuffleExists = false
     plan.collect {
       case s: Exchange if (s.getClass.getName.equals
@@ -214,24 +703,319 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
         ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
       => shuffleExists = true
     }
-    assert(shuffleExists, "shuffle should exist on non bucket tables")
-    sql("DROP TABLE parquet_table")
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
   }
 
-  // TODO: make pluggable CarbonOptimizerUtil.transformForScalarSubQuery
-  ignore("test scalar subquery with equal") {
+  test("only shuffle 1 side whose bucket num larger when join of carbon 
bucket" +
+    " tables with diff bucket num") {
+    sql("CREATE TABLE t25 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='3', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t25")
+    sql("CREATE TABLE t26 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='7', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t26")
+
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t25 t1, t26 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t25 t1, t26 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
+
+    var shuffleLeftExists = false
+    var shuffleRightExists = false
+    
plan.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[SortMergeJoinExec].left.collect
 {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleLeftExists = true
+    }
+
+    
plan.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[SortMergeJoinExec].right.collect
 {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleRightExists = true
+    }
+    assert(shuffleLeftExists && !shuffleRightExists, "only shuffle 1 side 
whose bucket num larger")
+  }
+
+  test("test compaction of bucket tables") {
+    sql("CREATE TABLE t27 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t27")
+    sql(s"alter table t27 compact 'minor'")
+
+    val table = CarbonEnv.getCarbonTable(Option("default"), 
"t27")(sqlContext.sparkSession)
+    // data should store into diff files bases on bucket id in compaction
+    val segmentDir = FileFactory.getCarbonFile(table.getTablePath + 
"/Fact/Part0/Segment_0.1")
+    val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".carbondata")
+    })
+    assert(dataFiles.length == 10)
+
+    sql("CREATE TABLE t28 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t28")
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t27 t1, t28 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t27 t1, t28 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(500))
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+  }
+
+  test("test alter column of bucket table") {
+    sql("CREATE TABLE t42 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t42")
+
+    sql("CREATE TABLE t43 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t43")
+
+    // bucket columns not allowed to change include rename change data type 
and drop.
+    val ex = intercept[MalformedCarbonCommandException] {
+      sql(s"alter table t42 change name name222 string")
+    }
+    assert(ex.getMessage.contains("Column Rename Operation failed." +
+      " Renaming the bucket column name is not allowed"))
+    val ex2 = intercept[ProcessMetaDataException] {
+      sql(s"alter table t42 drop columns(name)")
+    }
+    assert(ex2.getMessage.contains("Bucket columns cannot be dropped: 
List(name)"))
+
+    // alter table column
+    sql(s"alter table t42 change salary slong long")
+    checkAnswer(sql(
+      """select count(*) from t42 where slong=15000
+      """.stripMargin), Row(1))
+    // join after alter table
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t42 t1, t43 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t42 t1, t43 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+
+    // test desc formatted
+    val descPar = sql("desc formatted t42").collect
+    descPar.find(_.get(0).toString.contains("Bucket Columns")) match {
+      case Some(row) => assert(row.get(1).toString.contains("name"))
+      case None => fail("Bucket Columns: not found in describe formatted")
+    }
+    descPar.find(_.get(0).toString.contains("Number of Buckets")) match {
+      case Some(row) => assert(row.get(1).toString.contains("10"))
+      case None => fail("Number of Buckets: not found in describe formatted")
+    }
+  }
+
+  test("test insert into bucket table old insert flow") {
+    sql("CREATE TABLE t45 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t45")
+    sql("CREATE TABLE t46 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
+    // use old flow
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, 
"true")
+    sql(s"INSERT INTO t46 SELECT * FROM t45")
+
+    val table = CarbonEnv.getCarbonTable(Option("default"), 
"t46")(sqlContext.sparkSession)
+    val segmentDir = FileFactory.getCarbonFile(table.getTablePath + 
"/Fact/Part0/Segment_0")
+    val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".carbondata")
+    })
+    assert(dataFiles.length == 4)
+    checkAnswer(sql(
+      """select count(*) from t46
+      """.stripMargin), Row(100))
     sql(
-      """select sum(salary) from t4 t1
-        |where ID = (select sum(ID) from t4 t2 where t1.name = 
t2.name)""".stripMargin)
-      .count()
+      """select * from t46
+      """.stripMargin).show(100, false)
+
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t45 t1, t46 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+    checkAnswer(sql(
+      """select count(*) from t46 where name='aaa1'
+      """.stripMargin), Row(1))
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t45 t1, t46 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, 
"false")
   }
 
-  // TODO: make pluggable CarbonOptimizerUtil.transformForScalarSubQuery
-  ignore("test scalar subquery with lessthan") {
+  test("test insert into bucket table new insert flow") {
+    sql("CREATE TABLE t47 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t47")
+    sql("CREATE TABLE t48 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name')")
+
+    // use new flow
+    sql(s"INSERT INTO t48 SELECT * FROM t47")
+
+    val table = CarbonEnv.getCarbonTable(Option("default"), 
"t48")(sqlContext.sparkSession)
+    val segmentDir = FileFactory.getCarbonFile(table.getTablePath + 
"/Fact/Part0/Segment_0")
+    val dataFiles = segmentDir.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = 
file.getName.endsWith(".carbondata")
+    })
+    assert(dataFiles.length == 4)
+    checkAnswer(sql(
+      """select count(*) from t48
+      """.stripMargin), Row(100))
     sql(
-      """select sum(salary) from t4 t1
-        |where ID < (select sum(ID) from t4 t2 where t1.name = 
t2.name)""".stripMargin)
-      .count()
+      """select * from t48
+      """.stripMargin).show(100, false)
+
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t47 t1, t48 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+    checkAnswer(sql(
+      """select count(*) from t48 where name='aaa1'
+      """.stripMargin), Row(1))
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t47 t1, t48 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
+  }
+
+  test("test multi bucket columns") {
+    sql("CREATE TABLE t49 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name,date')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t49")
+    sql("CREATE TABLE t50 (ID Int, date Timestamp, country String, name 
String, phonetype String," +
+      "serialname String, salary Int) STORED AS carbondata TBLPROPERTIES " +
+      "('BUCKET_NUMBER'='4', 'BUCKET_COLUMNS'='name,date')")
+    sql(s"LOAD DATA INPATH '$resourcesPath/source.csv' INTO TABLE t50")
+    val plan = sql(
+      """
+        |select t1.*, t2.*
+        |from t49 t1, t50 t2
+        |where t1.name = t2.name and t1.date = t2.date
+      """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t49 t1, t50 t2
+        |where t1.name = t2.name and t1.date = t2.date) temp
+      """.stripMargin), Row(100))
+    var shuffleExists = false
+    plan.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist when all bucket columns" +
+      "in query filter")
+
+    val plan2 = sql(
+      """
+        |select t1.*, t2.*
+        |from t49 t1, t50 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    checkAnswer(sql(
+      """select count(*) from
+        |(select t1.*, t2.*
+        |from t49 t1, t50 t2
+        |where t1.name = t2.name) temp
+      """.stripMargin), Row(100))
+    var shuffleExists2 = false
+    plan2.collect {
+      case s: Exchange if (s.getClass.getName.equals
+      ("org.apache.spark.sql.execution.exchange.ShuffleExchange") ||
+        s.getClass.getName.equals
+        ("org.apache.spark.sql.execution.exchange.ShuffleExchangeExec"))
+      => shuffleExists2 = true
+    }
+    assert(shuffleExists2, "shuffle should exist when some bucket columns not 
exist in filter")
   }
 
   override def afterAll {
@@ -241,10 +1025,40 @@ class TableBucketingTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS t4")
     sql("DROP TABLE IF EXISTS t5")
     sql("DROP TABLE IF EXISTS t6")
+    sql("DROP TABLE IF EXISTS t6_")
     sql("DROP TABLE IF EXISTS t7")
     sql("DROP TABLE IF EXISTS t8")
     sql("DROP TABLE IF EXISTS t9")
     sql("DROP TABLE IF EXISTS t10")
+    sql("DROP TABLE IF EXISTS t11")
+    sql("DROP TABLE IF EXISTS t12")
+    sql("DROP TABLE IF EXISTS t13")
+    sql("DROP TABLE IF EXISTS t14")
+    sql("DROP TABLE IF EXISTS t15")
+    sql("DROP TABLE IF EXISTS t16")
+    sql("DROP TABLE IF EXISTS t17")
+    sql("DROP TABLE IF EXISTS t18")
+    sql("DROP TABLE IF EXISTS t19")
+    sql("DROP TABLE IF EXISTS t20")
+    sql("DROP TABLE IF EXISTS t21")
+    sql("DROP TABLE IF EXISTS t22")
+    sql("DROP TABLE IF EXISTS t23")
+    sql("DROP TABLE IF EXISTS t24")
+    sql("DROP TABLE IF EXISTS t25")
+    sql("DROP TABLE IF EXISTS t26")
+    sql("DROP TABLE IF EXISTS t27")
+    sql("DROP TABLE IF EXISTS t28")
+    sql("DROP TABLE IF EXISTS t40")
+    sql("DROP TABLE IF EXISTS t41")
+    sql("DROP TABLE IF EXISTS t42")
+    sql("DROP TABLE IF EXISTS t43")
+    sql("DROP TABLE IF EXISTS t44")
+    sql("DROP TABLE IF EXISTS t45")
+    sql("DROP TABLE IF EXISTS t46")
+    sql("DROP TABLE IF EXISTS t47")
+    sql("DROP TABLE IF EXISTS t48")
+    sql("DROP TABLE IF EXISTS t49")
+    sql("DROP TABLE IF EXISTS t50")
     sql("DROP TABLE IF EXISTS bucketed_parquet_table")
     sql("DROP TABLE IF EXISTS parquet_table")
     sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", 
threshold.toString)
diff --git a/pom.xml b/pom.xml
index 3a819b6..40eb6ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,7 +148,7 @@
       <id>central</id>
       <!-- This should be at top, it makes maven try the central repo first 
and then others and hence faster dep resolution -->
       <name>Maven Repository</name>
-      <url>https://repo1.maven.org/maven2</url>
+      <url>https://maven.aliyun.com/repository/public</url>
       <releases>
         <enabled>true</enabled>
       </releases>
@@ -480,6 +480,7 @@
                 
<exclude>**/org.apache.carbondata.cluster.sdv.generated.*</exclude>
                 <exclude>**/org.apache.spark.sql.test.*</exclude>
                 <exclude>**/org.apache.carbondata.format.*</exclude>
+                <exclude>**/org.apache.carbondata.core.unsafe*</exclude>
               </excludes>
               <includes>
                 <include>**/org.apache.*</include>
diff --git a/processing/pom.xml b/processing/pom.xml
index 331d513..6600efa 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -41,6 +41,22 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-unsafe_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.esotericsoftware</groupId>
+      <artifactId>kryo-shaded</artifactId>
+      <version>3.0.3</version>
+    </dependency>
+    <dependency>
       <groupId>com.univocity</groupId>
       <artifactId>univocity-parsers</artifactId>
       <version>2.2.1</version>
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 0965ee6..1af4fe3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -47,6 +47,8 @@ public class CarbonDataLoadConfiguration {
 
   private BucketingInfo bucketingInfo;
 
+  private String bucketHashMethod;
+
   private String segmentPath;
 
   private Map<String, Object> dataLoadProperties = new HashMap<>();
@@ -364,4 +366,12 @@ public class CarbonDataLoadConfiguration {
   public void setIndexColumnsPresent(boolean indexColumnsPresent) {
     isIndexColumnsPresent = indexColumnsPresent;
   }
+
+  public String getBucketHashMethod() {
+    return bucketHashMethod;
+  }
+
+  public void setBucketHashMethod(String bucketHashMethod) {
+    this.bucketHashMethod = bucketHashMethod;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 4b44a18..8586a61 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -63,17 +63,22 @@ public final class DataLoadProcessBuilder {
       CarbonIterator[] inputIterators) {
     CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, 
storeLocation);
     SortScopeOptions.SortScope sortScope = 
CarbonDataProcessorUtil.getSortScope(configuration);
-    if (loadModel.isLoadWithoutConverterStep()) {
+    if (configuration.getBucketingInfo() != null &&
+            CarbonProperties.isBadRecordHandlingEnabledForInsert()) {
+      // if use old flow, both load and insert of bucket table use same. 
Otherwise, load of bucket
+      // will use buildInternalForBucketing but insert will use 
buildInternalWithNoConverter.
+      return buildInternalForBucketing(inputIterators, configuration);
+    } else if (loadModel.isLoadWithoutConverterStep()) {
       return buildInternalWithNoConverter(inputIterators, configuration, 
sortScope, false);
     } else if (loadModel.isLoadWithoutConverterWithoutReArrangeStep()) {
       return buildInternalWithNoConverter(inputIterators, configuration, 
sortScope, true);
     } else if (loadModel.isJsonFileLoad()) {
       return buildInternalWithJsonInputProcessor(inputIterators, 
configuration, sortScope);
-    } else if (!configuration.isSortTable() || sortScope.equals(
-        SortScopeOptions.SortScope.NO_SORT)) {
-      return buildInternalForNoSort(inputIterators, configuration);
     } else if (configuration.getBucketingInfo() != null) {
       return buildInternalForBucketing(inputIterators, configuration);
+    } else if (!configuration.isSortTable() || sortScope.equals(
+            SortScopeOptions.SortScope.NO_SORT)) {
+      return buildInternalForNoSort(inputIterators, configuration);
     } else {
       return buildInternal(inputIterators, configuration);
     }
@@ -117,7 +122,8 @@ public final class DataLoadProcessBuilder {
     // Wraps with dummy processor.
     AbstractDataLoadProcessorStep inputProcessorStep =
         new InputProcessorStepWithNoConverterImpl(configuration, 
inputIterators, withoutReArrange);
-    if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) {
+    if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT) ||
+            configuration.getBucketingInfo() != null) {
       AbstractDataLoadProcessorStep sortProcessorStep =
           new SortProcessorStepImpl(configuration, inputProcessorStep);
       //  Writes the sorted data in carbondata format.
@@ -250,6 +256,7 @@ public final class DataLoadProcessBuilder {
     configuration.setDataFields(
         updateDataFieldsBasedOnSortColumns(dataFields).toArray(new 
DataField[dataFields.size()]));
     configuration.setBucketingInfo(carbonTable.getBucketingInfo());
+    configuration.setBucketHashMethod(carbonTable.getBucketHashMethod());
     configuration.setPreFetch(loadModel.isPreFetch());
     configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     
configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
@@ -469,4 +476,5 @@ public final class DataLoadProcessBuilder {
     updatedDataFields.addAll(nonSortFields);
     return updatedDataFields;
   }
+
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 91ed153..1c65dcb 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -219,6 +219,11 @@ public class CarbonLoadModel implements Serializable {
    */
   private int scaleFactor;
 
+  /**
+   * bucket id
+   */
+  private int bucketId;
+
   private OutputFilesInfoHolder outputFilesInfoHolder;
 
   public boolean isAggLoadRequest() {
@@ -353,6 +358,14 @@ public class CarbonLoadModel implements Serializable {
     this.loadMinSize = loadMinSize;
   }
 
+  public int getBucketId() {
+    return bucketId;
+  }
+
+  public void setBucketId(int bucketId) {
+    this.bucketId = bucketId;
+  }
+
   /**
    * Get copy with taskNo.
    * Broadcast value is shared in process, so we need to copy it to make sure 
the value in each
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
index 1cc1000..c9da264 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.loading.partition.impl;
 
+import java.io.UnsupportedEncodingException;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -27,7 +28,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.processing.loading.partition.Partitioner;
 
 /**
- * Hash partitioner implementation
+ * Hash partitioner implementation, not consistent with spark.
  */
 @InterfaceAudience.Internal
 public class HashPartitionerImpl implements Partitioner<CarbonRow> {
@@ -102,7 +103,12 @@ public class HashPartitionerImpl implements 
Partitioner<CarbonRow> {
 
     @Override
     public int getHash(Object[] value) {
-      return value[index] != null ? value[index].hashCode() : 0;
+      try {
+        String valueStr = new String((byte[]) value[index], "utf-8");
+        return value[index] != null ? valueStr.hashCode() : 0;
+      } catch (UnsupportedEncodingException e) {
+        return 0;
+      }
     }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/SparkHashExpressionPartitionerImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/SparkHashExpressionPartitionerImpl.java
new file mode 100644
index 0000000..81501e6
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/SparkHashExpressionPartitionerImpl.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.partition.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.processing.loading.partition.Partitioner;
+
+import org.apache.spark.unsafe.hash.Murmur3_x86_32;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Hash partitioner implementation spark_hash_expression which using 
Murmur3_x86_32 keep the
+ * same hash value as spark for given input.
+ */
+@InterfaceAudience.Internal
+public class SparkHashExpressionPartitionerImpl implements 
Partitioner<CarbonRow> {
+
+  private int numberOfBuckets;
+
+  private Hash[] hashes;
+
+  public SparkHashExpressionPartitionerImpl(List<Integer> indexes, 
List<ColumnSchema> columnSchemas,
+                                            int numberOfBuckets) {
+    this.numberOfBuckets = numberOfBuckets;
+    hashes = new Hash[indexes.size()];
+    for (int i = 0; i < indexes.size(); i++) {
+      DataType dataType = columnSchemas.get(i).getDataType();
+      if (dataType == DataTypes.LONG || dataType == DataTypes.DOUBLE) {
+        hashes[i] = new LongHash(indexes.get(i));
+      } else if (dataType == DataTypes.SHORT || dataType == DataTypes.INT ||
+          dataType == DataTypes.FLOAT || dataType == DataTypes.BOOLEAN) {
+        hashes[i] = new IntegralHash(indexes.get(i));
+      } else if (DataTypes.isDecimal(dataType)) {
+        hashes[i] = new DecimalHash(indexes.get(i));
+      } else if (dataType == DataTypes.TIMESTAMP) {
+        hashes[i] = new TimestampHash(indexes.get(i));
+      } else {
+        hashes[i] = new StringHash(indexes.get(i));
+      }
+    }
+  }
+
+  @Override
+  public int getPartition(CarbonRow key) {
+    int hashCode = 0;
+    for (Hash hash : hashes) {
+      hashCode += hash.getHash(key.getData());
+    }
+    int reminder = hashCode % numberOfBuckets;
+    if (reminder < 0) {
+      return (reminder + numberOfBuckets) % numberOfBuckets;
+    } else {
+      return reminder;
+    }
+  }
+
+  private interface Hash {
+    int getHash(Object[] value);
+  }
+
+  private static class IntegralHash implements Hash {
+
+    private int index;
+
+    private IntegralHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      if (value[index] == null) {
+        return 42;
+      }
+      int intValue = 0;
+      if (value[index] instanceof Boolean) {
+        boolean boolValue = (boolean) value[intValue];
+        intValue = boolValue ? 1 : 0;
+      } else if (value[index] instanceof Float) {
+        intValue = Float.floatToIntBits((float) value[index]);
+      } else {
+        intValue = Integer.parseInt(value[index].toString());
+      }
+      return Murmur3_x86_32.hashInt(intValue, 42);
+    }
+  }
+
+  private static class LongHash implements Hash {
+
+    private int index;
+
+    private LongHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      if (value[index] == null) {
+        return 42;
+      }
+      long longValue = 0L;
+      if (value[index] instanceof java.lang.Double) {
+        longValue = Double.doubleToLongBits((double) value[index]);
+      } else {
+        longValue = Long.parseLong(value[index].toString());
+      }
+      return Murmur3_x86_32.hashLong(longValue, 42);
+    }
+  }
+
+  private static class TimestampHash implements Hash {
+
+    private int index;
+
+    private TimestampHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      if (value[index] == null) {
+        return 42;
+      }
+      long timeMilSec = (long) value[index];
+      long timeMicSec = timeMilSec * 1000;
+      return Murmur3_x86_32.hashLong(timeMicSec, 42);
+    }
+  }
+
+  private static class DecimalHash implements Hash {
+
+    private int index;
+
+    private DecimalHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      if (value[index] == null) {
+        return 42;
+      }
+      return Double.valueOf(value[index].toString()).hashCode();
+    }
+  }
+
+  private static class StringHash implements Hash {
+
+    private int index;
+
+    private StringHash(int index) {
+      this.index = index;
+    }
+
+    @Override
+    public int getHash(Object[] value) {
+      // we should use the same hash method as spark, otherwise the same value 
will hash into diff
+      // bucket in carbon/parquet bucket tables the result of join will not 
correct.
+      if (value[index] == null) {
+        return 42;
+      }
+      UTF8String utf8String = UTF8String.fromBytes((byte[]) value[index]);
+      return utf8String.hashCode();
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index 28178f0..f82b715 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
@@ -42,6 +43,7 @@ import 
org.apache.carbondata.processing.loading.partition.Partitioner;
 import 
org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
 import 
org.apache.carbondata.processing.loading.partition.impl.RangePartitionerImpl;
 import 
org.apache.carbondata.processing.loading.partition.impl.RawRowComparator;
+import 
org.apache.carbondata.processing.loading.partition.impl.SparkHashExpressionPartitionerImpl;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -105,8 +107,24 @@ public class DataConverterProcessorStepImpl extends 
AbstractDataLoadProcessorSte
     }
 
     // hash partitioner to dispatch rows by bucket column
-    this.partitioner =
-        new HashPartitionerImpl(indexes, columnSchemas, 
bucketingInfo.getNumOfRanges());
+    if (CarbonCommonConstants.BUCKET_HASH_METHOD_DEFAULT.equals(
+        configuration.getBucketHashMethod())) {
+      // keep consistent with both carbon and spark tables.
+      this.partitioner = new SparkHashExpressionPartitionerImpl(
+              indexes, columnSchemas, bucketingInfo.getNumOfRanges());
+    } else if (CarbonCommonConstants.BUCKET_HASH_METHOD_NATIVE.equals(
+        configuration.getBucketHashMethod())) {
+      // native does not keep consistent with spark, it just use java hash 
method directly such as
+      // Long, String, etc. May have better performance during convert process.
+      // But, do not use it when the table need to join with spark bucket 
tables!
+      this.partitioner = new HashPartitionerImpl(
+              indexes, columnSchemas, bucketingInfo.getNumOfRanges());
+    } else {
+      // by default we use SparkHashExpressionPartitionerImpl to hash.
+      this.partitioner = new SparkHashExpressionPartitionerImpl(
+              indexes, columnSchemas, bucketingInfo.getNumOfRanges());
+    }
+
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index f1ed8df..dd42092 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -139,6 +139,7 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
         throw new CarbonDataWriterException(e.getCause());
       }
     } catch (CarbonDataWriterException e) {
+      LOGGER.error(e);
       throw new CarbonDataLoadingException("Error while initializing writer: " 
+ e.getMessage(), e);
     } catch (Exception e) {
       throw new CarbonDataLoadingException("There is an unexpected error: " + 
e.getMessage(), e);
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 2e503bd..589b49d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.processing.loading.steps;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -27,11 +28,14 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -45,6 +49,9 @@ import 
org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFacto
 import 
org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
 import 
org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.partition.Partitioner;
+import 
org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
+import 
org.apache.carbondata.processing.loading.partition.impl.SparkHashExpressionPartitionerImpl;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -70,6 +77,8 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
 
   // set to true when there is no need to reArrange the data
   private boolean withoutReArrange;
+  private boolean isBucketColumnEnabled = false;
+  private Partitioner<CarbonRow> partitioner;
 
   public InputProcessorStepWithNoConverterImpl(CarbonDataLoadConfiguration 
configuration,
       CarbonIterator<Object[]>[] inputIterators, boolean withoutReArrange) {
@@ -109,6 +118,50 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
     if (!withoutReArrange) {
       orderOfData = arrangeData(configuration.getDataFields(), 
configuration.getHeader());
     }
+    if (null != configuration.getBucketingInfo()) {
+      this.isBucketColumnEnabled = true;
+      initializeBucketColumnPartitioner();
+    }
+  }
+
+  /**
+   * initialize partitioner for bucket column
+   */
+  private void initializeBucketColumnPartitioner() {
+    List<Integer> indexes = new ArrayList<>();
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    DataField[] inputDataFields = getOutput();
+    BucketingInfo bucketingInfo = configuration.getBucketingInfo();
+    for (int i = 0; i < inputDataFields.length; i++) {
+      for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) {
+        if (inputDataFields[i].getColumn().getColName()
+                
.equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) {
+          indexes.add(i);
+          columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema());
+          break;
+        }
+      }
+    }
+
+    // hash partitioner to dispatch rows by bucket column
+    if (CarbonCommonConstants.BUCKET_HASH_METHOD_DEFAULT.equals(
+            configuration.getBucketHashMethod())) {
+      // keep consistent with both carbon and spark tables.
+      this.partitioner = new SparkHashExpressionPartitionerImpl(
+              indexes, columnSchemas, bucketingInfo.getNumOfRanges());
+    } else if (CarbonCommonConstants.BUCKET_HASH_METHOD_NATIVE.equals(
+            configuration.getBucketHashMethod())) {
+      // native does not keep consistent with spark, it just use java hash 
method directly such as
+      // Long, String, etc. May have better performance during convert process.
+      // But, do not use it when the table need to join with spark bucket 
tables!
+      this.partitioner = new HashPartitionerImpl(
+              indexes, columnSchemas, bucketingInfo.getNumOfRanges());
+    } else {
+      // by default we use SparkHashExpressionPartitionerImpl hash.
+      this.partitioner = new SparkHashExpressionPartitionerImpl(
+              indexes, columnSchemas, bucketingInfo.getNumOfRanges());
+    }
+
   }
 
   private void convertComplexDataType(Map<Integer, GenericDataType> 
dataFieldsWithComplexDataType) {
@@ -148,7 +201,8 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
       outIterators[i] =
           new InputProcessorIterator(readerIterators[i], batchSize,
               rowCounter, orderOfData, noDictionaryMapping, dataTypes, 
configuration,
-              dataFieldsWithComplexDataType, rowConverter, withoutReArrange);
+              dataFieldsWithComplexDataType, rowConverter, withoutReArrange, 
isBucketColumnEnabled,
+                  partitioner);
     }
     return outIterators;
   }
@@ -208,14 +262,15 @@ public class InputProcessorStepWithNoConverterImpl 
extends AbstractDataLoadProce
 
     RowConverter converter;
     CarbonDataLoadConfiguration configuration;
-
+    private boolean isBucketColumnEnabled = false;
+    private Partitioner<CarbonRow> partitioner;
     private boolean withoutReArrange;
 
     public InputProcessorIterator(List<CarbonIterator<Object[]>> 
inputIterators, int batchSize,
         AtomicLong rowCounter, int[] orderOfData, boolean[] 
noDictionaryMapping,
         DataType[] dataTypes, CarbonDataLoadConfiguration configuration,
         Map<Integer, GenericDataType> dataFieldsWithComplexDataType, 
RowConverter converter,
-        boolean withoutReArrange) {
+        boolean withoutReArrange, boolean bucketColumnEnabled, 
Partitioner<CarbonRow> partitioner) {
       this.inputIterators = inputIterators;
       this.batchSize = batchSize;
       this.counter = 0;
@@ -234,6 +289,8 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
       this.configuration = configuration;
       this.converter = converter;
       this.withoutReArrange = withoutReArrange;
+      this.isBucketColumnEnabled = bucketColumnEnabled;
+      this.partitioner = partitioner;
     }
 
     @Override
@@ -278,6 +335,10 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
           if (configuration.isIndexColumnsPresent()) {
             carbonRow = converter.convert(carbonRow);
           }
+          if (isBucketColumnEnabled) {
+            short rangeNumber = (short) partitioner.getPartition(carbonRow);
+            carbonRow.setRangeId(rangeNumber);
+          }
           carbonRowBatch.addRow(carbonRow);
           count++;
         }
@@ -288,6 +349,10 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
           if (configuration.isIndexColumnsPresent()) {
             carbonRow = converter.convert(carbonRow);
           }
+          if (isBucketColumnEnabled) {
+            short rangeNumber = (short) partitioner.getPartition(carbonRow);
+            carbonRow.setRangeId(rangeNumber);
+          }
           carbonRowBatch.addRow(carbonRow);
           count++;
         }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 08273b0..368f5d0 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -511,6 +511,7 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, 
segmentProperties, tableName,
             tempStoreLocation, carbonStoreLocation);
     carbonFactDataHandlerModel.setSegmentId(carbonLoadModel.getSegmentId());
+    carbonFactDataHandlerModel.setBucketId(carbonLoadModel.getBucketId());
     setDataFileAttributesInModel(carbonLoadModel, compactionType, 
carbonFactDataHandlerModel);
     this.noDicAndComplexColumns = 
carbonFactDataHandlerModel.getNoDictAndComplexColumns();
     dataHandler = 
CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel);
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 85413cc..121b798 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -90,6 +90,7 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
     setDataFileAttributesInModel(loadModel, compactionType, 
carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
     carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
+    carbonFactDataHandlerModel.setBucketId(loadModel.getBucketId());
     this.noDicAndComplexColumns = 
carbonFactDataHandlerModel.getNoDictAndComplexColumns();
     dataHandler = new 
CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index 66c418e..3a21039 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -306,7 +306,7 @@ public class SingleThreadFinalSortFilesMerger extends 
CarbonIterator<Object[]> {
    * @return more element is present
    */
   public boolean hasNext() {
-    return this.recordHolderHeapLocal.size() > 0;
+    return this.recordHolderHeapLocal != null && 
this.recordHolderHeapLocal.size() > 0;
   }
 
   public void close() {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 6e98aaa..a95bf15 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -535,7 +535,7 @@ public class CarbonFactDataHandlerModel {
     return bucketId;
   }
 
-  public void setBucketId(Integer bucketId) {
+  public void setBucketId(int bucketId) {
     this.bucketId = bucketId;
   }
 

Reply via email to