This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e70e48c  Add a ALTER operation to change distribution type from RANDOM 
to HASH (#1823)
e70e48c is described below

commit e70e48c01e28733171bf5dea2a0024994a253d1f
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Sep 18 14:16:26 2019 +0800

    Add a ALTER operation to change distribution type from RANDOM to HASH 
(#1823)
    
    Random distribution is no longer supported since version 0.9.
    And we need a way to convert the random distribution to hash distribution.
    
        ALTER TABLE db.tbl SET ("distribution_type" = "hash");
---
 .../sql-statements/Data Definition/ALTER TABLE.md  |   7 +-
 .../Data Manipulation/BROKER LOAD.md               |  29 +-
 .../sql-statements/Data Manipulation/LOAD.md       |  40 ---
 .../Data Definition/ALTER TABLE_EN.md              |   4 +
 .../Data Manipulation/BROKER LOAD_EN.md            | 380 +++++++++++++++++++++
 .../sql-statements/Data Manipulation/LOAD_EN.md    |  24 --
 .../Data Manipulation/broker_load_EN.md            | 344 -------------------
 fe/src/main/java/org/apache/doris/alter/Alter.java |   3 +-
 .../apache/doris/alter/SchemaChangeHandler.java    |   9 +-
 .../apache/doris/analysis/AbstractBackupStmt.java  |   3 +-
 .../apache/doris/analysis/CancelBackupStmt.java    |   4 +-
 .../analysis/ModifyTablePropertiesClause.java      |  42 +--
 .../java/org/apache/doris/catalog/Catalog.java     |  28 ++
 .../java/org/apache/doris/catalog/OlapTable.java   |  16 +
 .../java/org/apache/doris/catalog/Partition.java   |   9 +
 .../doris/catalog/RandomDistributionInfo.java      |  13 +
 .../apache/doris/common/util/PropertyAnalyzer.java |   2 +
 .../org/apache/doris/journal/JournalEntity.java    |   5 +
 .../java/org/apache/doris/persist/EditLog.java     |   9 +
 .../org/apache/doris/persist/OperationType.java    |   1 +
 .../java/org/apache/doris/persist/TableInfo.java   |   4 +
 21 files changed, 526 insertions(+), 450 deletions(-)

diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data 
Definition/ALTER TABLE.md 
b/docs/documentation/cn/sql-reference/sql-statements/Data Definition/ALTER 
TABLE.md
index e967d37..f61c69e 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data Definition/ALTER 
TABLE.md 
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data Definition/ALTER 
TABLE.md 
@@ -232,7 +232,12 @@
         PROPERTIES ("bloom_filter_columns"="k1,k2,k3");
 
     12. 修改表的Colocate 属性
-        ALTER TABLE example_db.my_table set ("colocate_with"="t1");
+
+        ALTER TABLE example_db.my_table set ("colocate_with" = "t1");
+
+    13. 将表的分桶方式由 Random Distribution 改为 Hash Distribution
+
+        ALTER TABLE example_db.my_table set ("distribution_type" = "hash");
         
     [rename]
     1. 将名为 table1 的表修改为 table2
diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/BROKER LOAD.md 
b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md
index e9210a3..fd1e499 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/BROKER LOAD.md       
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/BROKER LOAD.md       
@@ -1,7 +1,7 @@
 # BROKER LOAD
 ## description
 
-    Broker load 通过随 Palo 集群一同部署的 broker 进行,访问对应数据源的数据,进行数据导入。
+    Broker load 通过随 Doris 集群一同部署的 broker 进行,访问对应数据源的数据,进行数据导入。
     可以通过 show broker 命令查看已经部署的 broker。
     目前支持以下4种数据源:
 
@@ -316,7 +316,8 @@
         )
         WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
 
-     8. 导入Parquet文件中数据  指定FORMAT 为parquet, 默认是通过文件后缀判断
+    8. 导入Parquet文件中数据  指定FORMAT 为parquet, 默认是通过文件后缀判断
+
         LOAD LABEL example_db.label9
         (
         DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
@@ -326,7 +327,29 @@
         )
         WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
 
-     9. 对待导入数据进行过滤,k1 值大于 k2 值的列才能被导入
+    9. 提取文件路径中的分区字段
+
+        如果需要,则会根据表中定义的字段类型解析文件路径中的分区字段(partitioned fields),类似Spark中Partition 
Discovery的功能
+
+        LOAD LABEL example_db.label10
+        (
+        DATA 
INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*")
+        INTO TABLE `my_table`
+        FORMAT AS "csv"
+        (k1, k2, k3)
+        COLUMNS FROM PATH AS (city, utc_date)
+        SET (uniq_id = md5sum(k1, city))
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+        
hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing目录下包括如下文件:
+
+        
[hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv,
 
hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv,
 ...]
+
+        则提取文件路径的中的city和utc_date字段
+
+    10. 对待导入数据进行过滤,k1 值大于 k2 值的列才能被导入
+
         LOAD LABEL example_db.label10
         (
         DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/LOAD.md
index 865bfc3..9bde11c 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/LOAD.md      
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/LOAD.md      
@@ -262,46 +262,6 @@
         )
         );
 
-        LOAD LABEL example_db.label8
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, p2)
-        COLUMNS TERMINATED BY ","
-        (k1, k2, tmp_k3, tmp_k4, v1, v2)
-        SET (
-          v1 = hll_hash(tmp_k3),
-          v2 = hll_hash(tmp_k4)
-        )
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-    8. 导入Parquet文件中数据  指定FORMAT 为parquet, 默认是通过文件后缀判断
-        LOAD LABEL example_db.label9
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        FORMAT AS "parquet"
-        (k1, k2, k3)
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); 
-        
-    9. 提取文件路径中的压缩字段
-        如果需要,则会根据表中定义的字段类型解析文件路径中的压缩字段(partitioned fields),类似Spark中Partition 
Discovery的功能
-        LOAD LABEL example_db.label10
-        (
-        DATA 
INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*")
-        INTO TABLE `my_table`
-        FORMAT AS "csv"
-        (k1, k2, k3)
-        COLUMNS FROM PATH AS (city, utc_date)
-        SET (uniq_id = md5sum(k1, city))
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-        
hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing目录下包括如下文件:[hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv,
 
hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv,
 ...]
-        则提取文件路径的中的city和utc_date字段
-
 ## keyword
     LOAD
     
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data 
Definition/ALTER TABLE_EN.md 
b/docs/documentation/en/sql-reference/sql-statements/Data Definition/ALTER 
TABLE_EN.md
index 6a821d7..35b38c0 100644
--- a/docs/documentation/en/sql-reference/sql-statements/Data Definition/ALTER 
TABLE_EN.md      
+++ b/docs/documentation/en/sql-reference/sql-statements/Data Definition/ALTER 
TABLE_EN.md      
@@ -225,6 +225,10 @@ PROPERTIES ("bloom_filter_columns"="k1,k2,k3");
 12. Modify the Colocate property of the table
 ALTER TABLE example_db.my_table set ("colocate_with"="t1");
 
+13. Change the Distribution type from Random to Hash
+
+ALTER TABLE example_db.my_table set ("distribution_type" = "hash");
+
 [Rename]
 1. Modify the table named Table 1 to table2
 ALTER TABLE table1 RENAME table2;
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/BROKER LOAD_EN.md 
b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD_EN.md
new file mode 100644
index 0000000..7eefcfa
--- /dev/null
+++ b/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/BROKER LOAD_EN.md    
@@ -0,0 +1,380 @@
+# BROKER LOAD
+## description
+
+    Broker load will load data into Doris via Broker.
+    Use `show broker;` to see the Broker deployed in cluster.
+    
+    Support following data sources:
+
+    1. Baidu HDFS: hdfs for Baidu. Only be used inside Baidu.
+    2. Baidu AFS: afs for Baidu. Only be used inside Baidu.
+    3. Baidu Object Storage(BOS): BOS on Baidu Cloud.
+    4. Apache HDFS.
+
+### Syntax: 
+
+    LOAD LABEL load_label
+    (
+    data_desc1[, data_desc2, ...]
+    )
+    WITH BROKER broker_name
+    [broker_properties]
+    [opt_properties];
+
+    1. load_label
+
+        Unique load label within a database.
+        syntax: 
+        [database_name.]your_label
+     
+    2. data_desc
+
+        To describe the data source. 
+        syntax: 
+            DATA INFILE
+            (
+            "file_path1"[, file_path2, ...]
+            )
+            [NEGATIVE]
+            INTO TABLE `table_name`
+            [PARTITION (p1, p2)]
+            [COLUMNS TERMINATED BY "column_separator"]
+            [FORMAT AS "file_type"]
+            [(column_list)]
+            [SET (k1 = func(k2))]
+            [WHERE predicate]    
+
+        Explain: 
+            file_path: 
+
+            File path. Support wildcard. Must match to file, not directory. 
+
+            PARTITION:
+
+            Data will only be loaded to specified partitions. Data out of 
partition's range will be filtered. If not specifed, all partitions will be 
loaded.
+                    
+            NEGATIVE: 
+            
+            If this parameter is specified, it is equivalent to importing a 
batch of "negative" data to offset the same batch of data loaded before.
+            
+            This parameter applies only to the case where there are value 
columns and the aggregation type of value columns is only SUM.
+            
+            column_separator: 
+            
+            Used to specify the column separator in the import file. Default 
is `\t`.
+            If the character is invisible, it needs to be prefixed with `\\x`, 
using hexadecimal to represent the separator.
+
+            For example, the separator `\x01` of the hive file is specified as 
`\\ x01`
+            
+            file_type: 
+
+            Used to specify the type of imported file, such as parquet, csv. 
Default values are determined by the file suffix name. 
+ 
+            column_list: 
+
+            Used to specify the correspondence between columns in the import 
file and columns in the table.
+
+            When you need to skip a column in the import file, specify it as a 
column name that does not exist in the table.
+
+            syntax: 
+            (col_name1, col_name2, ...)
+            
+            SET:
+            
+            If this parameter is specified, a column of the source file can be 
transformed according to a function, and then the transformed result can be 
loaded into the table. The grammar is `column_name = expression`. Some examples 
are given to help understand.
+
+            Example 1: There are three columns "c1, c2, c3" in the table. The 
first two columns in the source file correspond in turn (c1, c2), and the last 
two columns correspond to c3. Then, column (c1, c2, tmp_c3, tmp_c4) SET (c3 = 
tmp_c3 + tmp_c4) should be specified.
+
+            Example 2: There are three columns "year, month, day" in the 
table. There is only one time column in the source file, in the format of 
"2018-06-01:02:03". Then you can specify columns (tmp_time) set (year = year 
(tmp_time), month = month (tmp_time), day = day (tmp_time)) to complete the 
import.
+
+            WHERE:
+          
+            After filtering the transformed data, data that meets where 
predicates can be loaded. Only column names in tables can be referenced in 
WHERE statements.
+            
+    3. broker_name
+
+        The name of the Broker used can be viewed through the `show broker` 
command.
+
+    4. broker_properties
+
+        Used to provide Broker access to data sources. Different brokers, and 
different access methods, need to provide different information.
+
+        1. Baidu HDFS/AFS
+
+            Access to Baidu's internal hdfs/afs currently only supports simple 
authentication, which needs to be provided:
+            
+            username: hdfs username
+            password: hdfs password
+
+        2. BOS
+
+            bos_endpoint.
+            bos_accesskey: cloud user's accesskey
+            bos_secret_accesskey: cloud user's secret_accesskey
+        
+        3. Apache HDFS
+
+            Community version of HDFS supports simple authentication, Kerberos 
authentication, and HA configuration.
+
+            Simple authentication:
+            hadoop.security.authentication = simple (default)
+            username: hdfs username
+            password: hdfs password
+
+            kerberos authentication: 
+            hadoop.security.authentication = kerberos
+            kerberos_principal:  kerberos's principal
+            kerberos_keytab:  path of kerberos's keytab file. This file should 
be able to access by Broker
+            kerberos_keytab_content: Specify the contents of the KeyTab file 
in Kerberos after base64 encoding. This option is optional from the 
kerberos_keytab configuration. 
+
+            namenode HA: 
+            By configuring namenode HA, new namenode can be automatically 
identified when the namenode is switched
+            dfs.nameservices: hdfs service name,customize,eg: 
"dfs.nameservices" = "my_ha"
+            dfs.ha.namenodes.xxx: Customize the name of a namenode, separated 
by commas. XXX is a custom name in dfs. name services, such as "dfs. ha. 
namenodes. my_ha" = "my_nn"
+            dfs.namenode.rpc-address.xxx.nn: Specify RPC address information 
for namenode, where NN denotes the name of the namenode configured in 
dfs.ha.namenodes.xxxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= 
"host:port"
+            dfs.client.failover.proxy.provider: Specify the provider that 
client connects to namenode by default: org. apache. hadoop. hdfs. server. 
namenode. ha. Configured Failover ProxyProvider.
+
+    4. opt_properties
+
+        Used to specify some special parameters. 
+        Syntax: 
+        [PROPERTIES ("key"="value", ...)]
+        
+        You can specify the following parameters: 
+        
+        timout: Specifies the timeout time for the import operation. The 
default timeout is 4 hours per second.
+
+        max_filter_ratio: Data ratio of maximum tolerance filterable (data 
irregularity, etc.). Default zero tolerance.
+
+        exc_mem_limit: Sets the upper memory limit for import. The default is 
2G, per byte. This is the upper memory limit for a single BE node.
+
+        A load may be distributed over multiple BEs. Let's assume that 1GB 
data needs up to 5GB of memory for processing at a single node. Suppose 1GB 
files are distributed over two nodes, then theoretically, each node needs 2.5GB 
of memory. Then the parameter can be set to 268454560, or 2.5GB.
+
+        strict_mode: Whether the data is strictly restricted. The default is 
true.
+
+        timezone: Specify time zones for functions affected by time zones, 
such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation 
for details. If not specified, use the "Asia/Shanghai" time zone.
+
+    5. Load data format sample
+
+        Integer(TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
+        Float(FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356
+        Date(DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03. 
+        (Note: If it's in other date formats, you can use strftime or 
time_format functions to convert in the import command)
+        
+        String(CHAR/VARCHAR): "I am a student", "a"
+        NULL: \N
+
+## example
+
+    1. Load a batch of data from HDFS, specify timeout and filtering ratio. 
Use the broker with the inscription my_hdfs_broker. Simple authentication.
+
+        LOAD LABEL example_db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "username" = "hdfs_user",
+        "password" = "hdfs_passwd"
+        )
+        PROPERTIES
+        (
+        "timeout" = "3600",
+        "max_filter_ratio" = "0.1"
+        );
+    
+        Where hdfs_host is the host of the namenode and hdfs_port is the 
fs.defaultFS port (default 9000)
+        
+    2. Load a batch of data from AFS contains multiple files. Import different 
tables, specify separators, and specify column correspondences.
+
+        LOAD LABEL example_db.label2
+        (
+        DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file1")
+        INTO TABLE `my_table_1`
+        COLUMNS TERMINATED BY ","
+        (k1, k3, k2, v1, v2),
+        DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file2")
+        INTO TABLE `my_table_2`
+        COLUMNS TERMINATED BY "\t"
+        (k1, k2, k3, v2, v1)
+        )
+        WITH BROKER my_afs_broker
+        (
+        "username" = "afs_user",
+        "password" = "afs_passwd"
+        )
+        PROPERTIES
+        (
+        "timeout" = "3600",
+        "max_filter_ratio" = "0.1"
+        );
+        
+
+    3. Load a batch of data from HDFS, specify hive's default delimiter \\x01, 
and use wildcard * to specify all files in the directory. Use simple 
authentication and configure namenode HA at the same time
+
+        LOAD LABEL example_db.label3
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
+        INTO TABLE `my_table`
+        COLUMNS TERMINATED BY "\\x01"
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "username" = "hdfs_user",
+        "password" = "hdfs_passwd",
+        "dfs.nameservices" = "my_ha",
+        "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
+        "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
+        "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
+        "dfs.client.failover.proxy.provider" = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+        )
+    
+    4. Load a batch of "negative" data from HDFS. Use Kerberos authentication 
to provide KeyTab file path.
+
+        LOAD LABEL example_db.label4
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file)
+        NEGATIVE
+        INTO TABLE `my_table`
+        COLUMNS TERMINATED BY "\t"
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "hadoop.security.authentication" = "kerberos",
+        "kerberos_principal"="[email protected]",
+        "kerberos_keytab"="/home/palo/palo.keytab"
+        )
+
+    5. Load a batch of data from HDFS, specify partition. At the same time, 
use Kerberos authentication mode. Provide the KeyTab file content encoded by 
base64.
+
+        LOAD LABEL example_db.label5
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, p2)
+        COLUMNS TERMINATED BY ","
+        (k1, k3, k2, v1, v2)
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "hadoop.security.authentication"="kerberos",
+        "kerberos_principal"="[email protected]",
+        "kerberos_keytab_content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw"
+        )
+
+    6. Load a batch of data from BOS, specify partitions, and make some 
transformations to the columns of the imported files, as follows:
+    
+       Table schema: 
+        k1 varchar(20)
+        k2 int
+
+        Assuming that the data file has only one row of data:
+
+        Adele,1,1
+
+        The columns in the data file correspond to the columns specified in 
the load statement:
+        
+        k1,tmp_k2,tmp_k3
+
+        transform as: 
+
+        1) k1: unchanged
+        2) k2: sum of tmp_k2 and tmp_k3
+
+        LOAD LABEL example_db.label6
+        (
+        DATA INFILE("bos://my_bucket/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, p2)
+        COLUMNS TERMINATED BY ","
+        (k1, tmp_k2, tmp_k3)
+        SET (
+          k2 = tmp_k2 + tmp_k3
+        )
+        )
+        WITH BROKER my_bos_broker
+        (
+        "bos_endpoint" = "http://bj.bcebos.com";,
+        "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
+        "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
+        )
+    
+    7. Load data into tables containing HLL columns, which can be columns in 
tables or columns in data
+    
+        If there are three columns in the table (id, v1, v2). The V1 and V2 
columns are HLL columns. The imported source file has three columns. Then 
(column_list) declares that the first column is id, and the second and third 
columns are temporarily named k1, k2.
+
+        In SET, the HLL column in the table must be specifically declared 
hll_hash. The V1 column in the table is equal to the hll_hash (k1) column in 
the original data.        
+
+        LOAD LABEL example_db.label7
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, p2)
+        COLUMNS TERMINATED BY ","
+        (id, k1, k2)
+        SET (
+          v1 = hll_hash(k1),
+          v2 = hll_hash(k2)
+        )
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+        LOAD LABEL example_db.label8
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, p2)
+        COLUMNS TERMINATED BY ","
+        (k1, k2, tmp_k3, tmp_k4, v1, v2)
+        SET (
+          v1 = hll_hash(tmp_k3),
+          v2 = hll_hash(tmp_k4)
+        )
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+    8. Data in load Parquet file specifies FORMAT as parquet. By default, it 
is judged by file suffix.
+
+        LOAD LABEL example_db.label9
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        FORMAT AS "parquet"
+        (k1, k2, k3)
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+    9. Extract partition fields in file paths
+
+        If necessary, partitioned fields in the file path are resolved based 
on the field type defined in the table, similar to the Partition Discovery 
function in Spark.
+
+        LOAD LABEL example_db.label10
+        (
+        DATA 
INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*")
+        INTO TABLE `my_table`
+        FORMAT AS "csv"
+        (k1, k2, k3)
+        COLUMNS FROM PATH AS (city, utc_date)
+        SET (uniq_id = md5sum(k1, city))
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+        Directory 
`hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing` contains 
following files:
+         
+        
[hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv,
 
hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv,
 ...]
+
+        Extract city and utc_date fields in the file path
+
+    10. To filter the load data, columns whose K1 value is greater than K2 
value can be imported.
+    
+        LOAD LABEL example_db.label10
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        where k1 > k2
+        );
+     
+## keyword
+
+    BROKER,LOAD
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/LOAD_EN.md 
b/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/LOAD_EN.md
index 3ca2655..a8bdf0a 100644
--- a/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/LOAD_EN.md   
+++ b/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/LOAD_EN.md   
@@ -255,30 +255,6 @@ v2 = hll, u hash (k2)
 )
 );
 
-LOAD LABEL example db.label8
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-INTO TABLE `my_table`
-PARTITION (p1, P2)
-COLUMNS TERMINATED BY ","
-(k1, k2, tmp u k3, tmp u k4, v1, v2)
-SET (
-v1 = hll, u hash (tmp
-v2 = hll, u hash (tmp
-)
-)
-WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-8. Importing data into Parquet file specifies FORMAT as parquet, which is 
judged by file suffix by default.
-LOAD LABEL example db.label9
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-INTO TABLE `my_table`
-FORMAT AS "parquet"
-(k1, k2, k3)
-)
-WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
 ## keyword
 LOAD
 
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/broker_load_EN.md 
b/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/broker_load_EN.md
deleted file mode 100644
index 6c67996..0000000
--- a/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/broker_load_EN.md    
+++ /dev/null
@@ -1,344 +0,0 @@
-# BROKER LOAD
-## Description
-
-    Broker load accesses data from corresponding data sources and imports data 
through broker deployed with Palo cluster.
-    You can view the deployed broker through the show broker command.
-    The following four data sources are currently supported:
-
-    1. Baidu HDFS: Baidu's internal HDFS are limited to Baidu's internal use.
-    2. Baidu AFS: Baidu's internal AFs are limited to Baidu's internal use.
-    3. Baidu Object Storage (BOS): Baidu Object Storage. Only Baidu internal 
users, public cloud users or other users who can access BOS.
-Four. Apache HDFS
-
-Grammar:
-
-    LOAD LABEL load_label
-    (
-    Date of date of date of entry
-    )
-    WITH BROKER broker_name
-    [broker_properties]
-    [opt_properties];
-
-    1. load label
-    
-        The label of the current imported batch. Unique in a database.
-        Grammar:
-        [database_name.]your_label
-    
-    2. data_desc
-    
-        Used to describe a batch of imported data.
-        Grammar:
-            DATA INFILE
-            (
-            "file_path1"[, file_path2, ...]
-            )
-            [NEGATIVE]
-            INTO TABLE `table_name`
-            [PARTITION (p1, P2)]
-            [COLUMNS TERMINATED BY "column_separator"]
-            [FORMAT AS "file_type"]
-            [(column_list)]
-            [set (k1 = fun (k2)]]
-            [WHERE predicate]    
-    
-        Explain:
-            file_path:
-            
-            File paths can be specified to a file, or * wildcards can be used 
to specify all files in a directory. Wildcards must match to files, not 
directories.
-            
-            PARTICIPATION:
-            
-            If this parameter is specified, only the specified partition will 
be imported, and data outside the imported partition will be filtered out.
-            If not specified, all partitions of the table are imported by 
default.
-            
-            NEGATIVE:
-            If this parameter is specified, it is equivalent to importing a 
batch of "negative" data. Used to offset the same batch of data imported before.
-            This parameter applies only to the case where there are value 
columns and the aggregation type of value columns is SUM only.
-            
-            Column U separator:
-            
-            Used to specify the column separator in the import file. Default 
tot
-            If the character is invisible, it needs to be prefixed with \x, 
using hexadecimal to represent the separator.
-            For example, the separator X01 of the hive file is specified as "\ 
x01"
-            
-            File type:
-            
-            Used to specify the type of imported file, such as parquet, csv. 
The default value is determined by the file suffix name.
-            
-            column_list:
-            
-            Used to specify the correspondence between columns in the import 
file and columns in the table.
-            When you need to skip a column in the import file, specify it as a 
column name that does not exist in the table.
-            Grammar:
-            (col_name1, col_name2, ...)
-            
-            SET:
-            
-            If this parameter is specified, a column of the source file can be 
transformed according to a function, and then the transformed result can be 
imported into the table.
-            Example1: table has columns (c1, c2, c3), source file has four 
columns. The first and second columns in source file map to the c1, c2 and the 
sum of 3th and 4th map to the c3. So the column mapping is columns 
(c1,c2,tmp_c3,tmp_c4) SET (c3=tmp_c3+tmp_c4);  
-    
-    3. broker name
-    
-        The name of the broker used can be viewed through the show broker 
command.
-    
-    4. broker_properties
-    
-        Used to provide information to access data sources through broker. 
Different brokers, as well as different access methods, need to provide 
different information.
-        
-        1. HDFS /AFS Baidu
-        
-            Access to Baidu's internal hdfs/afs currently only supports simple 
authentication, which needs to be provided:
-            Username: HDFS username
-            password -hdfs
-        
-        2. BOS
-        
-            Need to provide:
-            Bos_endpoint: endpoint of BOS
-            Bos_accesskey: Accesskey for public cloud users
-            Bos_secret_access key: secret_access key for public cloud users
-        
-        3. Apache HDFS
-        
-            Community version of HDFS supports simple authentication and 
Kerberos authentication. And support HA configuration.
-            Simple authentication:
-            hadoop.security.authentication = simple (默认)
-            Username: HDFS username
-            password -hdfs
-            
-            Kerberos authentication:
-            hadoop.security.authentication = kerberos
-            Kerberos_principal: Specifies the principal of Kerberos
-            Kerberos_keytab: Specifies the KeyTab file path for kerberos. This 
file must be a file on the server where the broker process resides.
-            Kerberos_keytab_content: Specifies the content of the KeyTab file 
in Kerberos after base64 encoding. This is a choice from the kerberos_keytab 
configuration.
-            
-            HA code
-            By configuring namenode HA, new namenode can be automatically 
identified when the namenode is switched
-            Dfs. nameservices: Specify the name of the HDFS service and 
customize it, such as: "dfs. nameservices" = "my_ha"
-            Dfs.ha.namenodes.xxx: Customize the name of the namenode, with 
multiple names separated by commas. Where XXX is a custom name in dfs. name 
services, such as "dfs. ha. namenodes. my_ha" = "my_nn"
-            Dfs.namenode.rpc-address.xxx.nn: Specify RPC address information 
for namenode. Where NN denotes the name of the namenode configured in 
dfs.ha.namenodes.xxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= 
"host:port"
-            Dfs.client.failover.proxy.provider: Specifies the provider that 
client connects to namenode by default: 
org.apache.hadoop.hdfs.server.namenode.ha.Configured Failover ProxyProvider
-    
-    4. opt_properties
-    
-        Used to specify some special parameters.
-        Grammar:
-        [PROPERTIES ("key"="value", ...)]
-        
-        The following parameters can be specified:
-        Timeout: Specifies the timeout time of the import operation. The 
default timeout is 4 hours. Unit seconds.
-        Max_filter_ratio: The ratio of data that is most tolerant of being 
filterable (for reasons such as data irregularities). Default zero tolerance.
-        Exec_mem_limit: Sets the upper memory limit for import use. Default is 
2G, unit byte. This refers to the upper memory limit of a single BE node.
-        An import may be distributed across multiple BEs. We assume that 
processing 1GB data at a single node requires up to 5GB of memory. Assuming 
that a 1GB file is distributed among two nodes, then theoretically, each node 
needs 2.5GB of memory. Then the parameter can be set to 268454560, or 2.5GB.
-        Strict mode: is there a strict restriction on data? The default is 
true.
-    
-    5. Import data format sample
-    
-        Integer classes (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1,1000,1234
-        Floating Point Class (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, 356
-        
-        (Note: If it's in other date formats, you can use strftime or 
time_format functions to convert in the import command)
-        字符串类(CHAR/VARCHAR):"I am a student", "a"
-NULL value: N
-
-## example
-
-    1. Import a batch of data from HDFS, specifying the timeout time and 
filtering ratio. Use the broker with the inscription my_hdfs_broker. Simple 
authentication.
-    
-        LOAD LABEL example db.label1
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        )
-        WITH BROKER my_hdfs_broker
-        (
-        "Username" = "HDFS\\ user"
-        "password" = "hdfs_passwd"
-        )
-        PROPERTIES
-        (
-        Timeout ="3600",
-        "max_filter_ratio" = "0.1"
-        );
-        
-        Where hdfs_host is the host of the namenode and hdfs_port is the 
fs.defaultFS port (default 9000)
-    
-    2. A batch of data from AFS, including multiple files. Import different 
tables, specify separators, and specify column correspondences.
-    
-        LOAD LABEL example db.label2
-        (
-        DATA INFILE ("afs http://afs host:hdfs /u port /user /palo /data 
/input /file1")
-        INTO TABLE `my_table_1`
-        COLUMNS TERMINATED BY ","
-        (k1, k3, k2, v1, v2),
-        DATA INFILE ("afs http://afs host:hdfs /u port /user /palo /data 
/input /file2")
-        INTO TABLE `my_table_2`
-        COLUMNS TERMINATED BY "\t"
-        (k1, k2, k3, v2, v1)
-        )
-        WITH BROKER my_afs_broker
-        (
-        "username" ="abu user",
-        "password" = "afs_passwd"
-        )
-        PROPERTIES
-        (
-        Timeout ="3600",
-        "max_filter_ratio" = "0.1"
-        );
-    
-    
-    3. Import a batch of data from HDFS, specify hive's default delimiter x01, 
and use wildcard * to specify all files in the directory.
-    Use simple authentication and configure namenode HA at the same time
-    
-        LOAD LABEL example db.label3
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
-        INTO TABLE `my_table`
-        COLUMNS TERMINATED BY "\\x01"
-        )
-        WITH BROKER my_hdfs_broker
-        (
-        "Username" = "HDFS\\ user"
-        "password" = "hdfs_passwd",
-        "dfs.nameservices" = "my_ha",
-        "dfs.ha.namodes.my -ha" ="we named1, we named2",
-        "dfs.namode.rpc -address.my ha.my name1" ="nn1 guest:rpc port",
-        "dfs.namode.rpc -address.my ha.my name2" ="nn2 guest:rpc port",
-        "dfs.client.failover.proxy.provider" = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
-        )
-    
-    4. Import a batch of "negative" data from HDFS. At the same time, Kerberos 
authentication is used. Provide KeyTab file path.
-    
-        LOAD LABEL example db.label4
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file)
-        NEGATIVE
-        INTO TABLE `my_table`
-        COLUMNS TERMINATED BY "\t"
-        )
-        WITH BROKER my_hdfs_broker
-        (
-        "hadoop.security.authentication" = "kerberos",
-        "kerberos" principal ="doris @YOUR.COM",
-        "kerberos" keytab ="/home /palo /palo.keytab"
-        )
-    
-    5. Import a batch of data from HDFS and specify partitions. At the same 
time, Kerberos authentication is used. Provides the KeyTab file content encoded 
by base64.
-    
-        LOAD LABEL example db.label5
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, P2)
-        COLUMNS TERMINATED BY ","
-        (k1, k3, k2, v1, v2)
-        )
-        WITH BROKER my_hdfs_broker
-        (
-        "hadoop.security.authentication"="kerberos",
-        "kerberos" principal ="doris @YOUR.COM",
-        "kerberos" keytab "content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw"
-        )
-    
-    6. Import a batch of data from BOS, specify partitions, and make some 
transformations to the columns of imported files, as follows:
-        The table structure is as follows:
-        K1 date
-        date
-        k3 bigint
-        k4 varchar (20)
-        k5 varchar (64)
-        k6 int
-        
-        Assume that the data file has only one row of data:
-        
-        1537002087,2018-08-09 11:12:13,1537002087,-,1
-        
-        The columns in the data file correspond to the columns specified in 
the import statement:
-        tmp -u k1, tmp -u k2, tmp u k3, k6, v1
-        
-        The conversion is as follows:
-        
-        1) k1: Transform tmp_k1 timestamp column into datetime type data
-        2) k2: Converting tmp_k2 datetime-type data into date data
-        3) k3: Transform tmp_k3 timestamp column into day-level timestamp
-        4) k4: Specify import default value of 1
-        5) k5: Calculate MD5 values from tmp_k1, tmp_k2, tmp_k3 columns
-        6) k6: Replace the - value in the imported file with 10
-        
-        LOAD LABEL example db.label6
-        (
-        DATA INFILE("bos://my_bucket/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, P2)
-        COLUMNS TERMINATED BY ","
-        (tmp /u k1, tmp /u k2, tmp /u k3, k6, v1)
-        SET (
-        
-        K2 = Time = UFormat ("% Y -% M -% D% H:% I = S", "% Y -% M -% D", TMP 
= UK2),
-        k3 = alignment_timestamp("day", tmp_k3),
-        k4 = default_value("1"),
-        K5 = MD5Sum (TMP = UK1, TMP = UK2, TMP = UK3)
-        k6 = replace value ("-", "10")
-        )
-        )
-        WITH BROKER my_bos_broker
-        (
-        "bosu endpoint" ="http://bj.bcebos.com";,
-        "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
-        "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
-        )
-    
-    7. Import data into tables containing HLL columns, which can be columns in 
tables or columns in data
-    
-        If there are three columns in the table (id, v1, v2). Where V1 and V2 
columns are HLL columns. The imported source file has three columns. In 
column_list, it is declared that the first column is ID and the second and 
third column is K1 and k2, which are temporarily named.
-        In SET, the HLL column in the table must be specifically declared 
hll_hash. The V1 column in the table is equal to the hll_hash (k1) column in 
the original data.
-        LOAD LABEL example db.label7
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, P2)
-        COLUMNS TERMINATED BY ","
-        (id, k1, k2)
-        SET (
-        v1 = hll, u hash (k1),
-        v2 = hll, u hash (k2)
-        )
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-        
-        LOAD LABEL example db.label8
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        PARTITION (p1, P2)
-        COLUMNS TERMINATED BY ","
-        (k1, k2, tmp u k3, tmp u k4, v1, v2)
-        SET (
-        v1 = hll, u hash (tmp
-        v2 = hll, u hash (tmp
-        )
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-    
-    8. Importing data into Parquet file specifies FORMAT as parquet, which is 
judged by file suffix by default.
-        LOAD LABEL example db.label9
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        FORMAT AS "parquet"
-        (k1, k2, k3)
-        )
-        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-     9. Filter data by k1>k2 
-        LOAD LABEL example_db.label10
-        (
-        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-        INTO TABLE `my_table`
-        where k1 > k2
-        )
-## keyword
-BROKER,LOAD
diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/src/main/java/org/apache/doris/alter/Alter.java
index 9c2ad2e..ab443dc 100644
--- a/fe/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/src/main/java/org/apache/doris/alter/Alter.java
@@ -123,8 +123,7 @@ public class Alter {
                     || alterClause instanceof AddColumnsClause
                     || alterClause instanceof DropColumnClause
                     || alterClause instanceof ModifyColumnClause
-                    || alterClause instanceof ReorderColumnsClause
-                    || alterClause instanceof ModifyTablePropertiesClause)
+                    || alterClause instanceof ReorderColumnsClause)
                     && !hasAddRollup && !hasDropRollup && !hasPartition && 
!hasRename) {
                 hasSchemaChange = true;
             } else if (alterClause instanceof AddRollupClause && 
!hasSchemaChange && !hasAddRollup && !hasDropRollup
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 35b1618..55d92c0 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -81,8 +81,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.doris.catalog.AggregateType.BITMAP_UNION;
-
 public class SchemaChangeHandler extends AlterHandler {
     private static final Logger LOG = 
LogManager.getLogger(SchemaChangeHandler.class);
 
@@ -539,7 +537,7 @@ public class SchemaChangeHandler extends AlterHandler {
             throw new DdlException("HLL type column can only be in Aggregation 
data model table: " + newColName);
         }
 
-        if (newColumn.getAggregationType() == BITMAP_UNION && 
KeysType.AGG_KEYS != olapTable.getKeysType()) {
+        if (newColumn.getAggregationType() == AggregateType.BITMAP_UNION && 
KeysType.AGG_KEYS != olapTable.getKeysType()) {
             throw new DdlException("BITMAP_UNION must be used in AGG_KEYS");
         }
 
@@ -1305,7 +1303,10 @@ public class SchemaChangeHandler extends AlterHandler {
                 // so just return after finished handling.
                 if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) {
                     String colocateGroup = 
properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
-                    Catalog.getInstance().modifyTableColocate(db, olapTable, 
colocateGroup, false, null);
+                    Catalog.getCurrentCatalog().modifyTableColocate(db, 
olapTable, colocateGroup, false, null);
+                    return;
+                } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
+                    Catalog.getCurrentCatalog().convertDistributionType(db, 
olapTable);
                     return;
                 }
             }
diff --git a/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java
index d33d961..4bb83a1 100644
--- a/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java
@@ -70,8 +70,7 @@ public class AbstractBackupStmt extends DdlStmt {
         // the restore table may be newly created, so we can not judge its 
privs.
         if 
(!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(),
                 labelName.getDbName(), PrivPredicate.LOAD)) {
-            ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED,
-                    ConnectContext.get().getQualifiedUser(), 
labelName.getDbName());
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"LOAD");
         }
 
         checkAndNormalizeBackupObjs();
diff --git a/fe/src/main/java/org/apache/doris/analysis/CancelBackupStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/CancelBackupStmt.java
index 721f394..b1de068 100644
--- a/fe/src/main/java/org/apache/doris/analysis/CancelBackupStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/CancelBackupStmt.java
@@ -59,8 +59,8 @@ public class CancelBackupStmt extends CancelStmt {
         }
 
         // check auth
-        if 
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
+        if 
(!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), 
dbName, PrivPredicate.LOAD)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"LOAD");
         }
     }
 
diff --git 
a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java 
b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index 183251c..2cd7d94 100644
--- 
a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++ 
b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -17,23 +17,16 @@
 
 package org.apache.doris.analysis;
 
-import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.util.PrintableMap;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.common.util.PropertyAnalyzer;
 
 import java.util.Map;
 
 // clause which is used to modify table properties
 public class ModifyTablePropertiesClause extends AlterClause {
 
-    private static final String KEY_STORAGE_TYPE = "storage_type";
-    private static final String KEY_COLOCATE_WITH = "colocate_with";
-
     private Map<String, String> properties;
 
     public ModifyTablePropertiesClause(Map<String, String> properties) {
@@ -46,31 +39,24 @@ public class ModifyTablePropertiesClause extends 
AlterClause {
             throw new AnalysisException("Properties is not set");
         }
 
-        if (properties.size() == 1 && 
properties.containsKey(KEY_COLOCATE_WITH)) {
-            if (Config.disable_colocate_join) {
-                throw new AnalysisException("Colocate table is disabled by 
Admin");
-            }
-
-            if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(
-                    ConnectContext.get(), ConnectContext.get().getDatabase(), 
PrivPredicate.ALTER)) {
-                
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
-                        "ALTER");
-            }
-        } else if 
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ALTER)) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
-                    "ALTER");
+        if (properties.size() != 1) {
+            throw new AnalysisException("Can only set one table property at a 
time");
         }
 
-        if (properties.containsKey(KEY_STORAGE_TYPE)) {
-            // if set storage type, we need ADMIN privs.
-            if 
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
-                
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
-                                                    "ADMIN");
+        if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) 
{
+            if (Config.disable_colocate_join) {
+                throw new AnalysisException("Colocate table is disabled by 
Admin");
             }
-
-            if (!properties.get(KEY_STORAGE_TYPE).equals("column")) {
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE)) {
+            if 
(!properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE).equalsIgnoreCase("column"))
 {
                 throw new AnalysisException("Can only change storage type to 
COLUMN");
             }
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
+            if 
(!properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE).equalsIgnoreCase("hash"))
 {
+                throw new AnalysisException("Can only change distribution type 
to HASH");
+            }
+        } else {
+            throw new AnalysisException("Unknown table property: " + 
properties.keySet());
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index a120dc8..1e835ed 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -6134,5 +6134,33 @@ public class Catalog {
             replica.setBad(backendTabletsInfo.isBad());
         }
     }
+
+    // Convert table's distribution type from random to hash.
+    // random distribution is no longer supported.
+    public void convertDistributionType(Database db, OlapTable tbl) throws 
DdlException {
+        db.writeLock();
+        try {
+            if (!tbl.convertRandomDistributionToHashDistribution()) {
+                throw new DdlException("Table " + tbl.getName() + " is not 
random distributed");
+            }
+            TableInfo tableInfo = 
TableInfo.createForModifyDistribution(db.getId(), tbl.getId());
+            editLog.logModifyDitrubutionType(tableInfo);
+            LOG.info("finished to modify distribution type of table: " + 
tbl.getName());
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    public void replayConvertDistributionType(TableInfo tableInfo) {
+        Database db = getDb(tableInfo.getDbId());
+        db.writeLock();
+        try {
+            OlapTable tbl = (OlapTable) db.getTable(tableInfo.getTableId());
+            tbl.convertRandomDistributionToHashDistribution();
+            LOG.info("replay modify distribution type of table: " + 
tbl.getName());
+        } finally {
+            db.writeUnlock();
+        }
+    }
 }
 
diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index 457eeaf..d02f957 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1115,4 +1115,20 @@ public class OlapTable extends Table {
         }
         return keysNum;
     }
+
+    public boolean convertRandomDistributionToHashDistribution() {
+        boolean hasChanged = false;
+        List<Column> baseSchema = indexIdToSchema.get(baseIndexId);
+        if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) {
+            defaultDistributionInfo = ((RandomDistributionInfo) 
defaultDistributionInfo).toHashDistributionInfo(baseSchema);
+            hasChanged = true;
+        }
+        
+        for (Partition partition : idToPartition.values()) {
+            if 
(partition.convertRandomDistributionToHashDistribution(baseSchema)) {
+                hasChanged = true;
+            }
+        }
+        return hasChanged;
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/catalog/Partition.java 
b/fe/src/main/java/org/apache/doris/catalog/Partition.java
index 286f7fd..d5afe6c 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Partition.java
@@ -438,4 +438,13 @@ public class Partition extends MetaObject implements 
Writable {
 
         return buffer.toString();
     }
+
+    public boolean convertRandomDistributionToHashDistribution(List<Column> 
baseSchema) {
+        boolean hasChanged = false;
+        if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
+            distributionInfo = ((RandomDistributionInfo) 
distributionInfo).toHashDistributionInfo(baseSchema);
+            hasChanged = true;
+        }
+        return hasChanged;
+    }
 }
diff --git 
a/fe/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java 
b/fe/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
index aadf69f..7458449 100644
--- a/fe/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
+++ b/fe/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
@@ -20,9 +20,12 @@ package org.apache.doris.catalog;
 import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.RandomDistributionDesc;
 
+import com.google.common.collect.Lists;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Random partition.
@@ -89,4 +92,14 @@ public class RandomDistributionInfo extends DistributionInfo 
{
                 && bucketNum == randomDistributionInfo.bucketNum;
     }
 
+    public HashDistributionInfo toHashDistributionInfo(List<Column> 
baseSchema) {
+        List<Column> keyColumns = Lists.newArrayList();
+        for (Column column : baseSchema) {
+            if (column.isKey()) {
+                keyColumns.add(column);
+            }
+        }
+        HashDistributionInfo hashDistributionInfo = new 
HashDistributionInfo(bucketNum, keyColumns);
+        return hashDistributionInfo;
+    }
 }
diff --git 
a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 4dcb157..0a74a70 100644
--- a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -70,6 +70,8 @@ public class PropertyAnalyzer {
     
     public static final String PROPERTIES_TIMEOUT = "timeout";
 
+    public static final String PROPERTIES_DISTRIBUTION_TYPE = 
"distribution_type";
+
     public static DataProperty analyzeDataProperty(Map<String, String> 
properties, DataProperty oldDataProperty)
             throws AnalysisException {
         DataProperty dataProperty = oldDataProperty;
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index 06a8515..d12dc07 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -417,6 +417,11 @@ public class JournalEntity implements Writable {
                 needRead = false;
                 break;
             }
+            case OperationType.OP_MODIFY_DISTRIBUTION_TYPE: {
+                data = TableInfo.read(in);
+                needRead = false;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 9fe2049..c508f1b 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -695,6 +695,11 @@ public class EditLog {
                     }
                     break;
                 }
+                case OperationType.OP_MODIFY_DISTRIBUTION_TYPE: {
+                    TableInfo tableInfo = (TableInfo)journal.getData();
+                    catalog.replayConvertDistributionType(tableInfo);
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1188,4 +1193,8 @@ public class EditLog {
     public void logAlterJob(AlterJobV2 alterJob) {
         logEdit(OperationType.OP_ALTER_JOB_V2, alterJob);
     }
+
+    public void logModifyDitrubutionType(TableInfo tableInfo) {
+        logEdit(OperationType.OP_MODIFY_DISTRIBUTION_TYPE, tableInfo);
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index 5041cbc..c972e04 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -55,6 +55,7 @@ public class OperationType {
     public static final short OP_FINISH_CONSISTENCY_CHECK = 29;
     public static final short OP_RENAME_ROLLUP = 120;
     public static final short OP_ALTER_JOB_V2 = 121;
+    public static final short OP_MODIFY_DISTRIBUTION_TYPE = 122;
 
     // 30~39 130~139 230~239 ...
     // load job for only hadoop load
diff --git a/fe/src/main/java/org/apache/doris/persist/TableInfo.java 
b/fe/src/main/java/org/apache/doris/persist/TableInfo.java
index bf0af33..8529327 100644
--- a/fe/src/main/java/org/apache/doris/persist/TableInfo.java
+++ b/fe/src/main/java/org/apache/doris/persist/TableInfo.java
@@ -64,6 +64,10 @@ public class TableInfo implements Writable {
         return new TableInfo(dbId, tableId, -1L, partitionId, "", "", 
newPartitionName);
     }
 
+    public static TableInfo createForModifyDistribution(long dbId, long 
tableId) {
+        return new TableInfo(dbId, tableId, -1L, -1, "", "", "");
+    }
+
     public long getDbId() {
         return dbId;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to