This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fbc448520a6 [feature](ColdHeatSeperation) Support to upload cold data
to HDFS (#22048)
fbc448520a6 is described below
commit fbc448520a6107ff277073e4447c55d9545e6dd5
Author: 川流 <[email protected]>
AuthorDate: Sun Oct 22 21:04:43 2023 +0800
[feature](ColdHeatSeperation) Support to upload cold data to HDFS (#22048)
---
.gitignore | 3 +
be/src/agent/task_worker_pool.cpp | 18 ++
be/src/olap/storage_policy.cpp | 1 -
docs/en/docs/advanced/cold-hot-separation.md | 33 ++-
docs/zh-CN/docs/advanced/cold-hot-separation.md | 35 +++-
.../org/apache/doris/master/ReportHandler.java | 1 +
.../org/apache/doris/policy/StoragePolicy.java | 40 ++--
.../apache/doris/task/PushStoragePolicyTask.java | 12 +-
gensrc/thrift/AgentService.thrift | 2 +
.../load_colddata_to_hdfs.groovy | 230 +++++++++++++++++++++
10 files changed, 353 insertions(+), 22 deletions(-)
diff --git a/.gitignore b/.gitignore
index 84fc3064e68..9a35bbe2258 100644
--- a/.gitignore
+++ b/.gitignore
@@ -122,3 +122,6 @@ lru_cache_test
/conf/log4j2-spring.xml
/fe/fe-core/src/test/resources/real-help-resource.zip
/ui/dist
+
+# other
+compile_commands.json
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 8b4aedd32f3..ff22f7923c3 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -19,6 +19,7 @@
#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/Status_types.h>
@@ -49,6 +50,7 @@
#include "gutil/strings/numbers.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "io/fs/s3_file_system.h"
@@ -1203,6 +1205,22 @@ void
TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
.tag("s3_conf", s3_conf.to_string());
put_storage_resource(resource.id, {std::move(fs),
resource.version});
}
+ } else if (resource.__isset.hdfs_storage_param) {
+ Status st;
+ std::shared_ptr<io::HdfsFileSystem> fs;
+ if (existed_resource.fs == nullptr) {
+ st =
io::HdfsFileSystem::create(resource.hdfs_storage_param, "", nullptr, &fs);
+ } else {
+ fs =
std::static_pointer_cast<io::HdfsFileSystem>(existed_resource.fs);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "update hdfs resource failed: " << st;
+ } else {
+ LOG_INFO("successfully update hdfs resource")
+ .tag("resource_id", resource.id)
+ .tag("resource_name", resource.name);
+ put_storage_resource(resource.id, {std::move(fs),
resource.version});
+ }
} else {
LOG(WARNING) << "unknown resource=" << resource;
}
diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp
index 494c93cec13..61c3093abf4 100644
--- a/be/src/olap/storage_policy.cpp
+++ b/be/src/olap/storage_policy.cpp
@@ -48,7 +48,6 @@ Status get_remote_file_system(int64_t storage_policy_id,
return Status::InternalError("could not find resource, resouce_id={}",
storage_policy->resource_id);
}
- DCHECK(atol((*fs)->id().c_str()) == storage_policy->resource_id);
DCHECK((*fs)->type() != io::FileSystemType::LOCAL);
return Status::OK();
}
diff --git a/docs/en/docs/advanced/cold-hot-separation.md
b/docs/en/docs/advanced/cold-hot-separation.md
index 411dc38f2a5..64faca74e99 100644
--- a/docs/en/docs/advanced/cold-hot-separation.md
+++ b/docs/en/docs/advanced/cold-hot-separation.md
@@ -58,7 +58,7 @@ In addition, fe configuration needs to be added:
`enable_storage_policy=true`
Note: This property will not be synchronized by CCR. If this table is copied
by CCR, that is, PROPERTIES contains `is_being_synced = true`, this property
will be erased in this table.
-For example:
+This is an instance that how to create S3 RESOURCE:
```
CREATE RESOURCE "remote_s3"
@@ -94,6 +94,36 @@ PROPERTIES(
"storage_policy" = "test_policy"
);
```
+and how to create HDFS RESOURCE:
+```
+CREATE RESOURCE "remote_hdfs" PROPERTIES (
+ "type"="hdfs",
+ "fs.defaultFS"="fs_host:default_fs_port",
+ "hadoop.username"="hive",
+ "hadoop.password"="hive",
+ "dfs.nameservices" = "my_ha",
+ "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
+ "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
+ "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_prot",
+ "dfs.client.failover.proxy.provider" =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ );
+
+ CREATE STORAGE POLICY test_policy PROPERTIES (
+ "storage_resource" = "remote_hdfs",
+ "cooldown_ttl" = "300"
+ )
+
+ CREATE TABLE IF NOT EXISTS create_table_use_created_policy (
+ k1 BIGINT,
+ k2 LARGEINT,
+ v1 VARCHAR(2048)
+ )
+ UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH (k1) BUCKETS 3
+ PROPERTIES(
+ "storage_policy" = "test_policy"
+ );
+```
Or for an existing table, associate the storage policy
```
ALTER TABLE create_table_not_have_policy set ("storage_policy" =
"test_policy");
@@ -177,3 +207,4 @@ PROPERTIES
"use_path_style" = "true"
);
```
+
diff --git a/docs/zh-CN/docs/advanced/cold-hot-separation.md
b/docs/zh-CN/docs/advanced/cold-hot-separation.md
index b1c662080c0..c81c45dc717 100644
--- a/docs/zh-CN/docs/advanced/cold-hot-separation.md
+++ b/docs/zh-CN/docs/advanced/cold-hot-separation.md
@@ -35,7 +35,7 @@ under the License.
4. 基于普通云盘做高可用,需要实现多副本,某副本异常要做副本迁移。而将数据放到对象存储上则不存在此类问题,因为对象存储是共享的。
## 解决方案
-在Partition级别上设置freeze time,表示多久这个Partition会被freeze,并且定义freeze之后存储的remote
storage的位置。在be上daemon线程会周期性的判断表是否需要freeze,若freeze后会将数据上传到s3上。
+在Partition级别上设置freeze time,表示多久这个Partition会被freeze,并且定义freeze之后存储的remote
storage的位置。在be上daemon线程会周期性的判断表是否需要freeze,若freeze后会将数据上传到s3和hdfs上。
冷热分层支持所有doris功能,只是把部分数据放到对象存储上,以节省成本,不牺牲功能。因此有如下特点:
@@ -57,7 +57,7 @@ under the License.
注意:这个属性不会被CCR同步,如果这个表是被CCR复制而来的,即PROPERTIES中包含`is_being_synced =
true`时,这个属性将会在这个表中被擦除。
-例如:
+下面演示如何创建S3 RESOURCE:
```
CREATE RESOURCE "remote_s3"
@@ -93,6 +93,36 @@ PROPERTIES(
"storage_policy" = "test_policy"
);
```
+以及如何创建 HDFS RESOURCE:
+```
+CREATE RESOURCE "remote_hdfs" PROPERTIES (
+ "type"="hdfs",
+ "fs.defaultFS"="fs_host:default_fs_port",
+ "hadoop.username"="hive",
+ "hadoop.password"="hive",
+ "dfs.nameservices" = "my_ha",
+ "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
+ "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
+ "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
+ "dfs.client.failover.proxy.provider" =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ );
+
+ CREATE STORAGE POLICY test_policy PROPERTIES (
+ "storage_resource" = "remote_hdfs",
+ "cooldown_ttl" = "300"
+ )
+
+ CREATE TABLE IF NOT EXISTS create_table_use_created_policy (
+ k1 BIGINT,
+ k2 LARGEINT,
+ v1 VARCHAR(2048)
+ )
+ UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH (k1) BUCKETS 3
+ PROPERTIES(
+ "storage_policy" = "test_policy"
+ );
+```
或者对一个已存在的表,关联storage policy
```
ALTER TABLE create_table_not_have_policy set ("storage_policy" =
"test_policy");
@@ -104,6 +134,7 @@ ALTER TABLE create_table_partition MODIFY PARTITION (*)
SET("storage_policy"="te
**注意**,如果用户在建表时给整张table和部分partition指定了不同的storage policy,partition设置的storage
policy会被无视,整张表的所有partition都会使用table的policy.
如果您需要让某个partition的policy和别的不同,则可以使用上文中对一个已存在的partition,关联storage policy的方式修改.
具体可以参考docs目录下[resource](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-RESOURCE.md)、
[policy](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md)、
[create
table](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)、
[alter
table](../sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN.md)等文档,里面有详细介绍
+
### 一些限制
- 单表或单partition只能关联一个storage policy,关联后不能drop掉storage policy,需要先解除二者的关联。
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 4bfb55e3a89..efca6c7f950 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -306,6 +306,7 @@ public class ReportHandler extends Daemon {
// do the diff. find out (intersection) / (be - meta) / (meta - be)
List<Policy> policiesInFe =
Env.getCurrentEnv().getPolicyMgr().getCopiedPoliciesByType(PolicyTypeEnum.STORAGE);
List<Resource> resourcesInFe =
Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.S3);
+
resourcesInFe.addAll(Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.HDFS));
List<Resource> resourceToPush = new ArrayList<>();
List<Policy> policyToPush = new ArrayList<>();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
index 4f1cbd96b7d..b86b3dc4903 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
@@ -19,6 +19,7 @@ package org.apache.doris.policy;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ReferenceType;
import org.apache.doris.catalog.ScalarType;
@@ -177,29 +178,38 @@ public class StoragePolicy extends Policy {
this.cooldownTtl =
getSecondsByCooldownTtl(props.get(COOLDOWN_TTL));
}
- checkIsS3ResourceAndExist(this.storageResource);
+ checkResourceIsExist(this.storageResource);
if (!addResourceReference() && !ifNotExists) {
- throw new AnalysisException("this policy has been added to s3
resource once, policy has been created.");
+ throw new AnalysisException("this policy has been added to s3 or
hdfs resource, policy has been created.");
}
}
- private static Resource checkIsS3ResourceAndExist(final String
storageResource) throws AnalysisException {
- // check storage_resource type is S3, current just support S3
+ private static Resource checkResourceIsExist(final String storageResource)
throws AnalysisException {
Resource resource =
Optional.ofNullable(Env.getCurrentEnv().getResourceMgr().getResource(storageResource))
.orElseThrow(() -> new AnalysisException("storage resource
doesn't exist: " + storageResource));
- if (resource.getType() != Resource.ResourceType.S3) {
- throw new AnalysisException("current storage policy just support
resource type S3_COOLDOWN");
- }
Map<String, String> properties = resource.getCopiedProperties();
- if (!properties.containsKey(S3Properties.ROOT_PATH)) {
- throw new AnalysisException(String.format(
- "Missing [%s] in '%s' resource", S3Properties.ROOT_PATH,
storageResource));
- }
- if (!properties.containsKey(S3Properties.BUCKET)) {
- throw new AnalysisException(String.format(
- "Missing [%s] in '%s' resource", S3Properties.BUCKET,
storageResource));
+ switch (resource.getType()) {
+ case S3:
+ if (!properties.containsKey(S3Properties.ROOT_PATH)) {
+ throw new AnalysisException(String.format(
+ "Missing [%s] in '%s' resource",
S3Properties.ROOT_PATH, storageResource));
+ }
+ if (!properties.containsKey(S3Properties.BUCKET)) {
+ throw new AnalysisException(String.format(
+ "Missing [%s] in '%s' resource", S3Properties.BUCKET,
storageResource));
+ }
+ break;
+ case HDFS:
+ if (!properties.containsKey(HdfsResource.HADOOP_FS_NAME)) {
+ throw new AnalysisException(String.format(
+ "Missing [%s] in '%s' resource",
HdfsResource.HADOOP_FS_NAME, storageResource));
+ }
+ break;
+ default:
+ throw new AnalysisException(
+ "current storage policy just support resource type
S3_COOLDOWN or HDFS_COOLDOWN");
}
return resource;
}
@@ -343,7 +353,7 @@ public class StoragePolicy extends Policy {
String storageResource = properties.get(STORAGE_RESOURCE);
if (storageResource != null) {
- checkIsS3ResourceAndExist(storageResource);
+ checkResourceIsExist(storageResource);
}
if (this.policyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) &&
this.storageResource == null
&& storageResource == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java
index e422a409e17..4f1bec5d870 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java
@@ -18,6 +18,7 @@
package org.apache.doris.task;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ResourceType;
import org.apache.doris.datasource.property.constants.S3Properties;
@@ -62,8 +63,9 @@ public class PushStoragePolicyTask extends AgentTask {
StoragePolicy storagePolicy = (StoragePolicy) p;
String resourceName = storagePolicy.getStorageResource();
Resource resource =
Env.getCurrentEnv().getResourceMgr().getResource(resourceName);
- if (resource == null || resource.getType() != ResourceType.S3)
{
- LOG.warn("can't find s3 resource by name {}",
resourceName);
+ if (resource == null || (resource.getType() != ResourceType.S3
+ && resource.getType() != ResourceType.HDFS)) {
+ LOG.warn("can't find s3 resource or hdfs resource by name
{}", resourceName);
return;
}
item.setResourceId(resource.getId());
@@ -85,7 +87,11 @@ public class PushStoragePolicyTask extends AgentTask {
item.setId(r.getId());
item.setName(r.getName());
item.setVersion(r.getVersion());
-
item.setS3StorageParam(S3Properties.getS3TStorageParam(r.getCopiedProperties()));
+ if (r.getType() == ResourceType.S3) {
+
item.setS3StorageParam(S3Properties.getS3TStorageParam(r.getCopiedProperties()));
+ } else if (r.getType() == ResourceType.HDFS) {
+
item.setHdfsStorageParam(HdfsResource.generateHdfsParam(r.getCopiedProperties()));
+ }
r.readUnlock();
tStorageResources.add(item);
});
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index a61c9512564..4b4c260b473 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -60,6 +60,7 @@ enum TTabletType {
TABLET_TYPE_MEMORY = 1
}
+
struct TS3StorageParam {
1: optional string endpoint
2: optional string region
@@ -87,6 +88,7 @@ struct TStorageResource {
2: optional string name
3: optional i64 version // alter version
4: optional TS3StorageParam s3_storage_param
+ 5: optional PlanNodes.THdfsParams hdfs_storage_param
// more storage resource type
}
diff --git
a/regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy
b/regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy
new file mode 100644
index 00000000000..14c5939dfc8
--- /dev/null
+++
b/regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import groovy.json.JsonSlurper
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("load_colddata_to_hdfs") {
+ if (!enableHdfs()) {
+ logger.info("skip this case because hdfs is not enabled");
+ }
+
+ def fetchBeHttp = { check_func, meta_url ->
+ def i = meta_url.indexOf("/api")
+ String endPoint = meta_url.substring(0, i)
+ String metaUri = meta_url.substring(i)
+ i = endPoint.lastIndexOf('/')
+ endPoint = endPoint.substring(i + 1)
+ httpTest {
+ endpoint endPoint
+ uri metaUri
+ op "get"
+ check check_func
+ }
+ }
+ // data_sizes is one arrayList<Long>, t is tablet
+ def fetchDataSize = { data_sizes, t ->
+ def tabletId = t[0]
+ String meta_url = t[17]
+ def clos = { respCode, body ->
+ logger.info("test ttl expired resp Code {}",
"${respCode}".toString())
+ assertEquals("${respCode}".toString(), "200")
+ String out = "${body}".toString()
+ def obj = new JsonSlurper().parseText(out)
+ data_sizes[0] = obj.local_data_size
+ data_sizes[1] = obj.remote_data_size
+ }
+ fetchBeHttp(clos, meta_url.replace("header", "data_size"))
+ }
+ // used as passing out parameter to fetchDataSize
+ List<Long> sizes = [-1, -1]
+ def tableName = "lineitem2"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ def stream_load_one_part = { partnum ->
+ streamLoad {
+ table tableName
+ // a default db 'regression_test' is specified in
+ // ${DORIS_HOME}/conf/regression-conf.groovy
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually
is '\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+
+
+ // relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
+ // also, you can stream load a http stream, e.g.
http://xxx/some.csv
+ file
"""${getS3Url()}/regression/tpch/sf1/lineitem.csv.split${partnum}.gz"""
+ time 10000 // limit inflight 10s
+
+ // stream load action will check result, include Success status,
and NumberTotalRows == NumberLoadedRows
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ def load_lineitem_table = {
+ stream_load_one_part("00")
+ stream_load_one_part("01")
+ def tablets = sql """
+ SHOW TABLETS FROM ${tableName}
+ """
+ while (tablets[0][8] == "0") {
+ log.info( "test local size is zero, sleep 10s")
+ sleep(10000)
+ tablets = sql """
+ SHOW TABLETS FROM ${tableName}
+ """
+ }
+ }
+
+ def check_storage_policy_exist = { name->
+ def polices = sql"""
+ show storage policy;
+ """
+ for (p in polices) {
+ if (name == p[0]) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ def resource_name = "test_table_with_data_resource"
+ def policy_name= "test_table_with_data_policy"
+
+ if (check_storage_policy_exist(policy_name)) {
+ sql """
+ DROP STORAGE POLICY ${policy_name}
+ """
+ }
+
+ def has_resouce = sql """
+ SHOW RESOURCES WHERE NAME = "${resource_name}";
+ """
+ if (has_resouce.size() > 0) {
+ sql """
+ DROP RESOURCE ${resource_name}
+ """
+ }
+
+ sql """
+ CREATE RESOURCE IF NOT EXISTS ${resource_name}
+ PROPERTIES (
+ "type"="hdfs",
+ "fs.defaultFS"="127.0.0.1:8120",
+ "hadoop.username"="hive",
+ "hadoop.password"="hive",
+ "dfs.nameservices" = "my_ha",
+ "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
+ "dfs.namenode.rpc-address.my_ha.my_namenode1" = "127.0.0.1:10000",
+ "dfs.namenode.rpc-address.my_ha.my_namenode2" = "127.0.0.1:10000",
+ "dfs.client.failover.proxy.provider" =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ );
+ """
+
+ sql """
+ CREATE STORAGE POLICY IF NOT EXISTS ${policy_name}
+ PROPERTIES(
+ "storage_resource" = "${resource_name}",
+ "cooldown_ttl" = "300"
+ )
+ """
+
+ // test one replica
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ L_ORDERKEY INTEGER NOT NULL,
+ L_PARTKEY INTEGER NOT NULL,
+ L_SUPPKEY INTEGER NOT NULL,
+ L_LINENUMBER INTEGER NOT NULL,
+ L_QUANTITY DECIMAL(15,2) NOT NULL,
+ L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
+ L_DISCOUNT DECIMAL(15,2) NOT NULL,
+ L_TAX DECIMAL(15,2) NOT NULL,
+ L_RETURNFLAG CHAR(1) NOT NULL,
+ L_LINESTATUS CHAR(1) NOT NULL,
+ L_SHIPDATE DATE NOT NULL,
+ L_COMMITDATE DATE NOT NULL,
+ L_RECEIPTDATE DATE NOT NULL,
+ L_SHIPINSTRUCT CHAR(25) NOT NULL,
+ L_SHIPMODE CHAR(10) NOT NULL,
+ L_COMMENT VARCHAR(44) NOT NULL
+ )
+ DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+ DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1",
+ "storage_policy" = "${policy_name}"
+ )
+ """
+
+ load_lineitem_table()
+
+ // show tablets from table, 获取第一个tablet的 LocalDataSize1
+ tablets = sql """
+ SHOW TABLETS FROM ${tableName}
+ """
+ log.info( "test tablets not empty")
+ assertTrue(tablets.size() > 0)
+ fetchDataSize(sizes, tablets[0])
+ def LocalDataSize1 = sizes[0]
+
+ // 等待10min,show tablets from table, 预期RemoteDataSize
不为0,且等于LocalDataSize1,预期LocalDataSize 为0
+ sleep(600000)
+
+
+ tablets = sql """
+ SHOW TABLETS FROM ${tableName}
+ """
+ log.info( "test tablets not empty")
+ fetchDataSize(sizes, tablets[0])
+ while (sizes[1] == 0) {
+ log.info( "test remote size is zero, sleep 10s")
+ sleep(10000)
+ tablets = sql """
+ SHOW TABLETS FROM ${tableName}
+ """
+ fetchDataSize(sizes, tablets[0])
+ }
+ assertTrue(tablets.size() > 0)
+ log.info( "test remote size not zero")
+ assertEquals(LocalDataSize1, sizes[1])
+ log.info( "test local size is zero")
+ assertEquals(0, sizes[0])
+
+
+ sql """
+ DROP TABLE ${tableName}
+ """
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]