This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fbc448520a6 [feature](ColdHeatSeperation) Support to upload cold data 
to HDFS (#22048)
fbc448520a6 is described below

commit fbc448520a6107ff277073e4447c55d9545e6dd5
Author: 川流 <[email protected]>
AuthorDate: Sun Oct 22 21:04:43 2023 +0800

    [feature](ColdHeatSeperation) Support to upload cold data to HDFS (#22048)
---
 .gitignore                                         |   3 +
 be/src/agent/task_worker_pool.cpp                  |  18 ++
 be/src/olap/storage_policy.cpp                     |   1 -
 docs/en/docs/advanced/cold-hot-separation.md       |  33 ++-
 docs/zh-CN/docs/advanced/cold-hot-separation.md    |  35 +++-
 .../org/apache/doris/master/ReportHandler.java     |   1 +
 .../org/apache/doris/policy/StoragePolicy.java     |  40 ++--
 .../apache/doris/task/PushStoragePolicyTask.java   |  12 +-
 gensrc/thrift/AgentService.thrift                  |   2 +
 .../load_colddata_to_hdfs.groovy                   | 230 +++++++++++++++++++++
 10 files changed, 353 insertions(+), 22 deletions(-)

diff --git a/.gitignore b/.gitignore
index 84fc3064e68..9a35bbe2258 100644
--- a/.gitignore
+++ b/.gitignore
@@ -122,3 +122,6 @@ lru_cache_test
 /conf/log4j2-spring.xml
 /fe/fe-core/src/test/resources/real-help-resource.zip
 /ui/dist
+
+# other
+compile_commands.json
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 8b4aedd32f3..ff22f7923c3 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -19,6 +19,7 @@
 
 #include <fmt/format.h>
 #include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/DataSinks_types.h>
 #include <gen_cpp/HeartbeatService_types.h>
 #include <gen_cpp/MasterService_types.h>
 #include <gen_cpp/Status_types.h>
@@ -49,6 +50,7 @@
 #include "gutil/strings/numbers.h"
 #include "gutil/strings/substitute.h"
 #include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
 #include "io/fs/local_file_system.h"
 #include "io/fs/path.h"
 #include "io/fs/s3_file_system.h"
@@ -1203,6 +1205,22 @@ void 
TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
                             .tag("s3_conf", s3_conf.to_string());
                     put_storage_resource(resource.id, {std::move(fs), 
resource.version});
                 }
+            } else if (resource.__isset.hdfs_storage_param) {
+                Status st;
+                std::shared_ptr<io::HdfsFileSystem> fs;
+                if (existed_resource.fs == nullptr) {
+                    st = 
io::HdfsFileSystem::create(resource.hdfs_storage_param, "", nullptr, &fs);
+                } else {
+                    fs = 
std::static_pointer_cast<io::HdfsFileSystem>(existed_resource.fs);
+                }
+                if (!st.ok()) {
+                    LOG(WARNING) << "update hdfs resource failed: " << st;
+                } else {
+                    LOG_INFO("successfully update hdfs resource")
+                            .tag("resource_id", resource.id)
+                            .tag("resource_name", resource.name);
+                    put_storage_resource(resource.id, {std::move(fs), 
resource.version});
+                }
             } else {
                 LOG(WARNING) << "unknown resource=" << resource;
             }
diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp
index 494c93cec13..61c3093abf4 100644
--- a/be/src/olap/storage_policy.cpp
+++ b/be/src/olap/storage_policy.cpp
@@ -48,7 +48,6 @@ Status get_remote_file_system(int64_t storage_policy_id,
         return Status::InternalError("could not find resource, resouce_id={}",
                                      storage_policy->resource_id);
     }
-    DCHECK(atol((*fs)->id().c_str()) == storage_policy->resource_id);
     DCHECK((*fs)->type() != io::FileSystemType::LOCAL);
     return Status::OK();
 }
diff --git a/docs/en/docs/advanced/cold-hot-separation.md 
b/docs/en/docs/advanced/cold-hot-separation.md
index 411dc38f2a5..64faca74e99 100644
--- a/docs/en/docs/advanced/cold-hot-separation.md
+++ b/docs/en/docs/advanced/cold-hot-separation.md
@@ -58,7 +58,7 @@ In addition, fe configuration needs to be added: 
`enable_storage_policy=true`
 
 Note: This property will not be synchronized by CCR. If this table is copied 
by CCR, that is, PROPERTIES contains `is_being_synced = true`, this property 
will be erased in this table.
 
-For example:
+This is an instance that how to create S3 RESOURCE:
 
 ```
 CREATE RESOURCE "remote_s3"
@@ -94,6 +94,36 @@ PROPERTIES(
     "storage_policy" = "test_policy"
 );
 ```
+and how to create HDFS RESOURCE:
+```
+CREATE RESOURCE "remote_hdfs" PROPERTIES (
+        "type"="hdfs",
+        "fs.defaultFS"="fs_host:default_fs_port",
+        "hadoop.username"="hive",
+        "hadoop.password"="hive",
+        "dfs.nameservices" = "my_ha",
+        "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
+        "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
+        "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_prot",
+        "dfs.client.failover.proxy.provider" = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+    );
+
+    CREATE STORAGE POLICY test_policy PROPERTIES (
+        "storage_resource" = "remote_hdfs",
+        "cooldown_ttl" = "300"
+    )
+
+    CREATE TABLE IF NOT EXISTS create_table_use_created_policy (
+    k1 BIGINT,
+    k2 LARGEINT,
+    v1 VARCHAR(2048)
+    )
+    UNIQUE KEY(k1)
+    DISTRIBUTED BY HASH (k1) BUCKETS 3
+    PROPERTIES(
+    "storage_policy" = "test_policy"
+    );
+```
 Or for an existing table, associate the storage policy
 ```
 ALTER TABLE create_table_not_have_policy set ("storage_policy" = 
"test_policy");
@@ -177,3 +207,4 @@ PROPERTIES
     "use_path_style" = "true"
 );
 ```
+
diff --git a/docs/zh-CN/docs/advanced/cold-hot-separation.md 
b/docs/zh-CN/docs/advanced/cold-hot-separation.md
index b1c662080c0..c81c45dc717 100644
--- a/docs/zh-CN/docs/advanced/cold-hot-separation.md
+++ b/docs/zh-CN/docs/advanced/cold-hot-separation.md
@@ -35,7 +35,7 @@ under the License.
 4. 基于普通云盘做高可用,需要实现多副本,某副本异常要做副本迁移。而将数据放到对象存储上则不存在此类问题,因为对象存储是共享的。
 
 ## 解决方案
-在Partition级别上设置freeze time,表示多久这个Partition会被freeze,并且定义freeze之后存储的remote 
storage的位置。在be上daemon线程会周期性的判断表是否需要freeze,若freeze后会将数据上传到s3上。
+在Partition级别上设置freeze time,表示多久这个Partition会被freeze,并且定义freeze之后存储的remote 
storage的位置。在be上daemon线程会周期性的判断表是否需要freeze,若freeze后会将数据上传到s3和hdfs上。
 
 冷热分层支持所有doris功能,只是把部分数据放到对象存储上,以节省成本,不牺牲功能。因此有如下特点:
 
@@ -57,7 +57,7 @@ under the License.
 
 注意:这个属性不会被CCR同步,如果这个表是被CCR复制而来的,即PROPERTIES中包含`is_being_synced = 
true`时,这个属性将会在这个表中被擦除。
 
-例如:
+下面演示如何创建S3 RESOURCE:
 
 ```
 CREATE RESOURCE "remote_s3"
@@ -93,6 +93,36 @@ PROPERTIES(
     "storage_policy" = "test_policy"
 );
 ```
+以及如何创建 HDFS RESOURCE:
+```
+CREATE RESOURCE "remote_hdfs" PROPERTIES (
+        "type"="hdfs",
+        "fs.defaultFS"="fs_host:default_fs_port",
+        "hadoop.username"="hive",
+        "hadoop.password"="hive",
+        "dfs.nameservices" = "my_ha",
+        "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
+        "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
+        "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
+        "dfs.client.failover.proxy.provider" = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+    );
+
+    CREATE STORAGE POLICY test_policy PROPERTIES (
+        "storage_resource" = "remote_hdfs",
+        "cooldown_ttl" = "300"
+    )
+
+    CREATE TABLE IF NOT EXISTS create_table_use_created_policy (
+    k1 BIGINT,
+    k2 LARGEINT,
+    v1 VARCHAR(2048)
+    )
+    UNIQUE KEY(k1)
+    DISTRIBUTED BY HASH (k1) BUCKETS 3
+    PROPERTIES(
+    "storage_policy" = "test_policy"
+    );
+```
 或者对一个已存在的表,关联storage policy
 ```
 ALTER TABLE create_table_not_have_policy set ("storage_policy" = 
"test_policy");
@@ -104,6 +134,7 @@ ALTER TABLE create_table_partition MODIFY PARTITION (*) 
SET("storage_policy"="te
 **注意**,如果用户在建表时给整张table和部分partition指定了不同的storage policy,partition设置的storage 
policy会被无视,整张表的所有partition都会使用table的policy. 
如果您需要让某个partition的policy和别的不同,则可以使用上文中对一个已存在的partition,关联storage policy的方式修改.
 
具体可以参考docs目录下[resource](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-RESOURCE.md)、
 
[policy](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md)、
 [create 
table](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)、
 [alter 
table](../sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN.md)等文档,里面有详细介绍
 
+
 ### 一些限制
 
 - 单表或单partition只能关联一个storage policy,关联后不能drop掉storage policy,需要先解除二者的关联。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 4bfb55e3a89..efca6c7f950 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -306,6 +306,7 @@ public class ReportHandler extends Daemon {
         // do the diff. find out (intersection) / (be - meta) / (meta - be)
         List<Policy> policiesInFe = 
Env.getCurrentEnv().getPolicyMgr().getCopiedPoliciesByType(PolicyTypeEnum.STORAGE);
         List<Resource> resourcesInFe = 
Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.S3);
+        
resourcesInFe.addAll(Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.HDFS));
 
         List<Resource> resourceToPush = new ArrayList<>();
         List<Policy> policyToPush = new ArrayList<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java 
b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
index 4f1cbd96b7d..b86b3dc4903 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
@@ -19,6 +19,7 @@ package org.apache.doris.policy;
 
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Resource.ReferenceType;
 import org.apache.doris.catalog.ScalarType;
@@ -177,29 +178,38 @@ public class StoragePolicy extends Policy {
             this.cooldownTtl = 
getSecondsByCooldownTtl(props.get(COOLDOWN_TTL));
         }
 
-        checkIsS3ResourceAndExist(this.storageResource);
+        checkResourceIsExist(this.storageResource);
         if (!addResourceReference() && !ifNotExists) {
-            throw new AnalysisException("this policy has been added to s3 
resource once, policy has been created.");
+            throw new AnalysisException("this policy has been added to s3 or 
hdfs resource, policy has been created.");
         }
     }
 
-    private static Resource checkIsS3ResourceAndExist(final String 
storageResource) throws AnalysisException {
-        // check storage_resource type is S3, current just support S3
+    private static Resource checkResourceIsExist(final String storageResource) 
throws AnalysisException {
         Resource resource =
                 
Optional.ofNullable(Env.getCurrentEnv().getResourceMgr().getResource(storageResource))
                     .orElseThrow(() -> new AnalysisException("storage resource 
doesn't exist: " + storageResource));
 
-        if (resource.getType() != Resource.ResourceType.S3) {
-            throw new AnalysisException("current storage policy just support 
resource type S3_COOLDOWN");
-        }
         Map<String, String> properties = resource.getCopiedProperties();
-        if (!properties.containsKey(S3Properties.ROOT_PATH)) {
-            throw new AnalysisException(String.format(
-                    "Missing [%s] in '%s' resource", S3Properties.ROOT_PATH, 
storageResource));
-        }
-        if (!properties.containsKey(S3Properties.BUCKET)) {
-            throw new AnalysisException(String.format(
-                    "Missing [%s] in '%s' resource", S3Properties.BUCKET, 
storageResource));
+        switch (resource.getType()) {
+            case S3:
+                if (!properties.containsKey(S3Properties.ROOT_PATH)) {
+                    throw new AnalysisException(String.format(
+                        "Missing [%s] in '%s' resource", 
S3Properties.ROOT_PATH, storageResource));
+                }
+                if (!properties.containsKey(S3Properties.BUCKET)) {
+                    throw new AnalysisException(String.format(
+                        "Missing [%s] in '%s' resource", S3Properties.BUCKET, 
storageResource));
+                }
+                break;
+            case HDFS:
+                if (!properties.containsKey(HdfsResource.HADOOP_FS_NAME)) {
+                    throw new AnalysisException(String.format(
+                        "Missing [%s] in '%s' resource", 
HdfsResource.HADOOP_FS_NAME, storageResource));
+                }
+                break;
+            default:
+                throw new AnalysisException(
+                    "current storage policy just support resource type 
S3_COOLDOWN or HDFS_COOLDOWN");
         }
         return resource;
     }
@@ -343,7 +353,7 @@ public class StoragePolicy extends Policy {
 
         String storageResource = properties.get(STORAGE_RESOURCE);
         if (storageResource != null) {
-            checkIsS3ResourceAndExist(storageResource);
+            checkResourceIsExist(storageResource);
         }
         if (this.policyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && 
this.storageResource == null
                 && storageResource == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java
index e422a409e17..4f1bec5d870 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java
@@ -18,6 +18,7 @@
 package org.apache.doris.task;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Resource.ResourceType;
 import org.apache.doris.datasource.property.constants.S3Properties;
@@ -62,8 +63,9 @@ public class PushStoragePolicyTask extends AgentTask {
                 StoragePolicy storagePolicy = (StoragePolicy) p;
                 String resourceName = storagePolicy.getStorageResource();
                 Resource resource = 
Env.getCurrentEnv().getResourceMgr().getResource(resourceName);
-                if (resource == null || resource.getType() != ResourceType.S3) 
{
-                    LOG.warn("can't find s3 resource by name {}", 
resourceName);
+                if (resource == null || (resource.getType() != ResourceType.S3
+                        && resource.getType() != ResourceType.HDFS)) {
+                    LOG.warn("can't find s3 resource or hdfs resource by name 
{}", resourceName);
                     return;
                 }
                 item.setResourceId(resource.getId());
@@ -85,7 +87,11 @@ public class PushStoragePolicyTask extends AgentTask {
             item.setId(r.getId());
             item.setName(r.getName());
             item.setVersion(r.getVersion());
-            
item.setS3StorageParam(S3Properties.getS3TStorageParam(r.getCopiedProperties()));
+            if (r.getType() == ResourceType.S3) {
+                
item.setS3StorageParam(S3Properties.getS3TStorageParam(r.getCopiedProperties()));
+            } else if (r.getType() == ResourceType.HDFS) {
+                
item.setHdfsStorageParam(HdfsResource.generateHdfsParam(r.getCopiedProperties()));
+            }
             r.readUnlock();
             tStorageResources.add(item);
         });
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index a61c9512564..4b4c260b473 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -60,6 +60,7 @@ enum TTabletType {
     TABLET_TYPE_MEMORY = 1
 }
 
+
 struct TS3StorageParam {
     1: optional string endpoint
     2: optional string region
@@ -87,6 +88,7 @@ struct TStorageResource {
     2: optional string name
     3: optional i64 version // alter version
     4: optional TS3StorageParam s3_storage_param
+    5: optional PlanNodes.THdfsParams hdfs_storage_param
     // more storage resource type
 }
 
diff --git 
a/regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy 
b/regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy
new file mode 100644
index 00000000000..14c5939dfc8
--- /dev/null
+++ 
b/regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import groovy.json.JsonSlurper
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("load_colddata_to_hdfs") {
+    if (!enableHdfs()) {
+      logger.info("skip this case because hdfs is not enabled");
+    }
+
+    def fetchBeHttp = { check_func, meta_url ->
+        def i = meta_url.indexOf("/api")
+        String endPoint = meta_url.substring(0, i)
+        String metaUri = meta_url.substring(i)
+        i = endPoint.lastIndexOf('/')
+        endPoint = endPoint.substring(i + 1)
+        httpTest {
+            endpoint endPoint
+            uri metaUri
+            op "get"
+            check check_func
+        }
+    }
+    // data_sizes is one arrayList<Long>, t is tablet
+    def fetchDataSize = { data_sizes, t ->
+        def tabletId = t[0]
+        String meta_url = t[17]
+        def clos = {  respCode, body ->
+            logger.info("test ttl expired resp Code {}", 
"${respCode}".toString())
+            assertEquals("${respCode}".toString(), "200")
+            String out = "${body}".toString()
+            def obj = new JsonSlurper().parseText(out)
+            data_sizes[0] = obj.local_data_size
+            data_sizes[1] = obj.remote_data_size
+        }
+        fetchBeHttp(clos, meta_url.replace("header", "data_size"))
+    }
+    // used as passing out parameter to fetchDataSize
+    List<Long> sizes = [-1, -1]
+    def tableName = "lineitem2"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    def stream_load_one_part = { partnum ->
+        streamLoad {
+            table tableName
+            // a default db 'regression_test' is specified in
+            // ${DORIS_HOME}/conf/regression-conf.groovy
+
+            // default label is UUID:
+            // set 'label' UUID.randomUUID().toString()
+
+            // default column_separator is specify in doris fe config, usually 
is '\t'.
+            // this line change to ','
+            set 'column_separator', '|'
+            set 'compress_type', 'GZ'
+
+
+            // relate to 
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
+            // also, you can stream load a http stream, e.g. 
http://xxx/some.csv
+            file 
"""${getS3Url()}/regression/tpch/sf1/lineitem.csv.split${partnum}.gz"""
+            time 10000 // limit inflight 10s
+
+            // stream load action will check result, include Success status, 
and NumberTotalRows == NumberLoadedRows
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+    }
+
+    def load_lineitem_table = {
+        stream_load_one_part("00")
+        stream_load_one_part("01")
+        def tablets = sql """
+        SHOW TABLETS FROM ${tableName}
+        """
+        while (tablets[0][8] == "0") {
+            log.info( "test local size is zero, sleep 10s")
+            sleep(10000)
+            tablets = sql """
+            SHOW TABLETS FROM ${tableName}
+            """
+        }
+    }
+
+    def check_storage_policy_exist = { name->
+        def polices = sql"""
+        show storage policy;
+        """
+        for (p in polices) {
+            if (name == p[0]) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    def resource_name = "test_table_with_data_resource"
+    def policy_name= "test_table_with_data_policy"
+
+    if (check_storage_policy_exist(policy_name)) {
+        sql """
+            DROP STORAGE POLICY ${policy_name}
+        """
+    }
+    
+    def has_resouce = sql """
+        SHOW RESOURCES WHERE NAME = "${resource_name}";
+    """
+    if (has_resouce.size() > 0) {
+        sql """
+            DROP RESOURCE ${resource_name}
+        """
+    }
+
+    sql """
+        CREATE RESOURCE IF NOT EXISTS ${resource_name}
+        PROPERTIES (
+            "type"="hdfs",
+            "fs.defaultFS"="127.0.0.1:8120",
+            "hadoop.username"="hive",
+            "hadoop.password"="hive",
+            "dfs.nameservices" = "my_ha",
+            "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
+            "dfs.namenode.rpc-address.my_ha.my_namenode1" = "127.0.0.1:10000",
+            "dfs.namenode.rpc-address.my_ha.my_namenode2" = "127.0.0.1:10000",
+            "dfs.client.failover.proxy.provider" = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+        );
+    """
+
+    sql """
+        CREATE STORAGE POLICY IF NOT EXISTS ${policy_name}
+        PROPERTIES(
+            "storage_resource" = "${resource_name}",
+            "cooldown_ttl" = "300"
+        )
+    """
+
+    // test one replica
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+            L_ORDERKEY    INTEGER NOT NULL,
+            L_PARTKEY     INTEGER NOT NULL,
+            L_SUPPKEY     INTEGER NOT NULL,
+            L_LINENUMBER  INTEGER NOT NULL,
+            L_QUANTITY    DECIMAL(15,2) NOT NULL,
+            L_EXTENDEDPRICE  DECIMAL(15,2) NOT NULL,
+            L_DISCOUNT    DECIMAL(15,2) NOT NULL,
+            L_TAX         DECIMAL(15,2) NOT NULL,
+            L_RETURNFLAG  CHAR(1) NOT NULL,
+            L_LINESTATUS  CHAR(1) NOT NULL,
+            L_SHIPDATE    DATE NOT NULL,
+            L_COMMITDATE  DATE NOT NULL,
+            L_RECEIPTDATE DATE NOT NULL,
+            L_SHIPINSTRUCT CHAR(25) NOT NULL,
+            L_SHIPMODE     CHAR(10) NOT NULL,
+            L_COMMENT      VARCHAR(44) NOT NULL
+            )
+            DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+            DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+            PROPERTIES (
+            "replication_num" = "1",
+            "storage_policy" = "${policy_name}"
+            )
+        """
+    
+    load_lineitem_table()
+
+    // show tablets from table, 获取第一个tablet的 LocalDataSize1
+    tablets = sql """
+    SHOW TABLETS FROM ${tableName}
+    """
+    log.info( "test tablets not empty")
+    assertTrue(tablets.size() > 0)
+    fetchDataSize(sizes, tablets[0])
+    def LocalDataSize1 = sizes[0]
+
+    // 等待10min,show tablets from table, 预期RemoteDataSize 
不为0,且等于LocalDataSize1,预期LocalDataSize 为0
+    sleep(600000)
+
+
+    tablets = sql """
+    SHOW TABLETS FROM ${tableName}
+    """
+    log.info( "test tablets not empty")
+    fetchDataSize(sizes, tablets[0])
+    while (sizes[1] == 0) {
+        log.info( "test remote size is zero, sleep 10s")
+        sleep(10000)
+        tablets = sql """
+        SHOW TABLETS FROM ${tableName}
+        """
+        fetchDataSize(sizes, tablets[0])
+    }
+    assertTrue(tablets.size() > 0)
+    log.info( "test remote size not zero")
+    assertEquals(LocalDataSize1, sizes[1])
+    log.info( "test local size is zero")
+    assertEquals(0, sizes[0])
+
+
+    sql """
+    DROP TABLE ${tableName}
+    """
+
+    
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to