This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a050513c913 [Fix](clean trash) Fix clean trash use agent task (#33912)
(#33972)
a050513c913 is described below
commit a050513c9139a5cb7513d46ea1272aea9d4e0816
Author: deardeng <[email protected]>
AuthorDate: Mon Apr 22 17:14:21 2024 +0800
[Fix](clean trash) Fix clean trash use agent task (#33912) (#33972)
* [Fix](clean trash) Fix clean trash use agent task (#33912)
* add .h
---
be/src/agent/agent_server.cpp | 3 +
be/src/agent/agent_server.h | 1 +
be/src/agent/task_worker_pool.cpp | 9 +
be/src/agent/task_worker_pool.h | 2 +
be/src/service/backend_service.cpp | 5 -
be/src/service/backend_service.h | 2 -
.../main/java/org/apache/doris/catalog/Env.java | 25 +-
.../java/org/apache/doris/task/AgentBatchTask.java | 10 +
.../java/org/apache/doris/task/CleanTrashTask.java | 37 ++
.../org/apache/doris/common/GenericPoolTest.java | 5 -
.../apache/doris/utframe/MockedBackendFactory.java | 5 -
gensrc/thrift/AgentService.thrift | 3 +
gensrc/thrift/BackendService.thrift | 2 -
gensrc/thrift/Types.thrift | 3 +-
.../test_admin_clean_trash.groovy | 687 +++++++++++++++++++++
15 files changed, 760 insertions(+), 39 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index a3b18c53567..e94d4c349a9 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -160,6 +160,9 @@ void AgentServer::start_workers(ExecEnv* exec_env) {
_report_tablet_workers = std::make_unique<ReportWorker>(
"REPORT_OLAP_TABLE", _master_info,
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] {
report_tablet_callback(engine, master_info); });
+
+ _clean_trash_binlog_workers = std::make_unique<TaskWorkerPool>(
+ "CLEAN_TRASH", 1, [&engine](auto&& task) {return
clean_trash_callback(engine, task); });
// clang-format on
}
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index b3889d894f9..283d2118ce3 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -96,6 +96,7 @@ private:
std::unique_ptr<TaskWorkerPool> _push_storage_policy_workers;
std::unique_ptr<TopicSubscriber> _topic_subscriber;
std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
+ std::unique_ptr<TaskWorkerPool> _clean_trash_binlog_workers;
};
} // end namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 058312227bf..1a53ec3b5ec 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -75,6 +75,7 @@
#include "runtime/exec_env.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
+#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/random.h"
@@ -1764,4 +1765,12 @@ void storage_medium_migrate_callback(StorageEngine&
engine, const TAgentTaskRequ
remove_task_info(req.task_type, req.signature);
}
+void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req)
{
+ LOG(INFO) << "clean trash start";
+ DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); })
+ static_cast<void>(engine.start_trash_sweep(nullptr, true));
+ static_cast<void>(engine.notify_listener("REPORT_DISK_STATE"));
+ LOG(INFO) << "clean trash finish";
+}
+
} // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 1c9327b42d7..f95a866a57a 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -161,6 +161,8 @@ void storage_medium_migrate_callback(StorageEngine& engine,
const TAgentTaskRequ
void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req);
+void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);
+
void report_task_callback(const TMasterInfo& master_info);
void report_disk_callback(StorageEngine& engine, const TMasterInfo&
master_info);
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 77137ba169c..6a46cf38408 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -843,11 +843,6 @@ void
BackendService::get_stream_load_record(TStreamLoadRecordResult& result,
}
}
-void BackendService::clean_trash() {
- static_cast<void>(StorageEngine::instance()->start_trash_sweep(nullptr,
true));
-
static_cast<void>(StorageEngine::instance()->notify_listener("REPORT_DISK_STATE"));
-}
-
void BackendService::check_storage_format(TCheckStorageFormatResult& result) {
StorageEngine::instance()->tablet_manager()->get_all_tablets_storage_format(&result);
}
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 4ee200796a6..9d53ec4bc45 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -133,8 +133,6 @@ public:
void get_stream_load_record(TStreamLoadRecordResult& result,
const int64_t last_stream_record_time)
override;
- void clean_trash() override;
-
void check_storage_format(TCheckStorageFormatResult& result) override;
void ingest_binlog(TIngestBinlogResult& result, const
TIngestBinlogRequest& request) override;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 366ec9a094d..352583362bb 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -94,7 +94,6 @@ import org.apache.doris.clone.TabletChecker;
import org.apache.doris.clone.TabletScheduler;
import org.apache.doris.clone.TabletSchedulerStat;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.ConfigException;
@@ -256,11 +255,11 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.CleanTrashTask;
import org.apache.doris.task.CompactionTask;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PriorityMasterTaskExecutor;
-import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TFrontendInfo;
import org.apache.doris.thrift.TGetMetaDBMeta;
@@ -5812,25 +5811,13 @@ public class Env {
public void cleanTrash(AdminCleanTrashStmt stmt) {
List<Backend> backends = stmt.getBackends();
+ AgentBatchTask batchTask = new AgentBatchTask();
for (Backend backend : backends) {
- BackendService.Client client = null;
- TNetworkAddress address = null;
- boolean ok = false;
- try {
- address = new TNetworkAddress(backend.getHost(),
backend.getBePort());
- client = ClientPool.backendPool.borrowObject(address);
- client.cleanTrash(); // async
- ok = true;
- } catch (Exception e) {
- LOG.warn("trash clean exec error. backend[{}]",
backend.getId(), e);
- } finally {
- if (ok) {
- ClientPool.backendPool.returnObject(address, client);
- } else {
- ClientPool.backendPool.invalidateObject(address, client);
- }
- }
+ CleanTrashTask cleanTrashTask = new
CleanTrashTask(backend.getId());
+ batchTask.addTask(cleanTrashTask);
+ LOG.info("clean trash in be {}, beId {}", backend.getHost(),
backend.getId());
}
+ AgentTaskExecutor.submit(batchTask);
}
public void setPartitionVersion(AdminSetPartitionVersionStmt stmt) throws
DdlException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 1f2e662c757..03a82cbb56b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -27,6 +27,7 @@ import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TAlterInvertedIndexReq;
import org.apache.doris.thrift.TAlterTabletReqV2;
import org.apache.doris.thrift.TCheckConsistencyReq;
+import org.apache.doris.thrift.TCleanTrashReq;
import org.apache.doris.thrift.TClearAlterTaskRequest;
import org.apache.doris.thrift.TClearTransactionTaskRequest;
import org.apache.doris.thrift.TCloneReq;
@@ -392,6 +393,15 @@ public class AgentBatchTask implements Runnable {
tAgentTaskRequest.setGcBinlogReq(request);
return tAgentTaskRequest;
}
+ case CLEAN_TRASH: {
+ CleanTrashTask cleanTrashTask = (CleanTrashTask) task;
+ TCleanTrashReq request = cleanTrashTask.toThrift();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(request.toString());
+ }
+ tAgentTaskRequest.setCleanTrashReq(request);
+ return tAgentTaskRequest;
+ }
default:
if (LOG.isDebugEnabled()) {
LOG.debug("could not find task type for task [{}]", task);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CleanTrashTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CleanTrashTask.java
new file mode 100644
index 00000000000..4e9c9a27a3c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CleanTrashTask.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.thrift.TCleanTrashReq;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+
+public class CleanTrashTask extends AgentTask {
+ private static final Logger LOG =
LogManager.getLogger(CleanTrashTask.class);
+
+ public CleanTrashTask(long backendId) {
+ super(null, backendId, TTaskType.CLEAN_TRASH, -1, -1, -1, -1, -1, -1,
-1);
+ }
+
+ public TCleanTrashReq toThrift() {
+ return new TCleanTrashReq();
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index ba66d07ec6b..d03d3595682 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -217,11 +217,6 @@ public class GenericPoolTest {
return null;
}
- @Override
- public void cleanTrash() throws TException {
- // TODO Auto-generated method stub
- }
-
@Override
public TCheckStorageFormatResult checkStorageFormat() throws
TException {
return new TCheckStorageFormatResult();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index df346f5ed58..735c46c70be 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -401,11 +401,6 @@ public class MockedBackendFactory {
return new TStreamLoadRecordResult(Maps.newHashMap());
}
- @Override
- public void cleanTrash() throws TException {
- return;
- }
-
@Override
public TCheckStorageFormatResult checkStorageFormat() throws
TException {
return new TCheckStorageFormatResult();
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 1543dc8a787..a59717cfa4a 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -99,6 +99,8 @@ struct TPushStoragePolicyReq {
3: optional list<i64> dropped_storage_policy
}
+struct TCleanTrashReq {}
+
enum TCompressionType {
UNKNOWN_COMPRESSION = 0,
DEFAULT_COMPRESSION = 1,
@@ -492,6 +494,7 @@ struct TAgentTaskRequest {
31: optional TPushStoragePolicyReq push_storage_policy_req
32: optional TAlterInvertedIndexReq alter_inverted_index_req
33: optional TGcBinlogReq gc_binlog_req
+ 34: optional TCleanTrashReq clean_trash_req
}
struct TAgentResult {
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index a8504fd4f15..d93520206c5 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -286,8 +286,6 @@ service BackendService {
TStreamLoadRecordResult get_stream_load_record(1: i64
last_stream_record_time);
- oneway void clean_trash();
-
// check tablet rowset type
TCheckStorageFormatResult check_storage_format();
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 529527a8838..d8953e7dd7a 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -222,7 +222,8 @@ enum TTaskType {
PUSH_COOLDOWN_CONF,
PUSH_STORAGE_POLICY,
ALTER_INVERTED_INDEX,
- GC_BINLOG
+ GC_BINLOG,
+ CLEAN_TRASH
}
enum TStmtType {
diff --git
a/regression-test/suites/fault_injection_p0/test_admin_clean_trash.groovy
b/regression-test/suites/fault_injection_p0/test_admin_clean_trash.groovy
new file mode 100644
index 00000000000..4e4e4532a03
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_admin_clean_trash.groovy
@@ -0,0 +1,687 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_admin_clean_trash", "nonConcurrent") {
+ if (isCloudMode()) {
+ return
+ }
+ try {
+ sql """create database clean_trash_db1"""
+ sql """
+ CREATE TABLE IF NOT EXISTS clean_trash_db1.customer_demographics (
+ cd_demo_sk bigint not null,
+ cd_gender char(1),
+ cd_marital_status char(1),
+ cd_education_status char(20),
+ cd_purchase_estimate integer,
+ cd_credit_rating char(10),
+ cd_dep_count integer,
+ cd_dep_employed_count integer,
+ cd_dep_college_count integer
+ )
+ DUPLICATE KEY(cd_demo_sk)
+ DISTRIBUTED BY HASH(cd_gender) BUCKETS 12
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS clean_trash_db1.reason (
+ r_reason_sk bigint not null,
+ r_reason_id char(16) not null,
+ r_reason_desc char(100)
+ )
+ DUPLICATE KEY(r_reason_sk)
+ DISTRIBUTED BY HASH(r_reason_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.date_dim (
+ d_date_sk bigint not null,
+ d_date_id char(16) not null,
+ d_date date,
+ d_month_seq integer,
+ d_week_seq integer,
+ d_quarter_seq integer,
+ d_year integer,
+ d_dow integer,
+ d_moy integer,
+ d_dom integer,
+ d_qoy integer,
+ d_fy_year integer,
+ d_fy_quarter_seq integer,
+ d_fy_week_seq integer,
+ d_day_name char(9),
+ d_quarter_name char(6),
+ d_holiday char(1),
+ d_weekend char(1),
+ d_following_holiday char(1),
+ d_first_dom integer,
+ d_last_dom integer,
+ d_same_day_ly integer,
+ d_same_day_lq integer,
+ d_current_day char(1),
+ d_current_week char(1),
+ d_current_month char(1),
+ d_current_quarter char(1),
+ d_current_year char(1)
+ )
+ DUPLICATE KEY(d_date_sk)
+ DISTRIBUTED BY HASH(d_date_sk) BUCKETS 12
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.warehouse (
+ w_warehouse_sk bigint not null,
+ w_warehouse_id char(16) not null,
+ w_warehouse_name varchar(20),
+ w_warehouse_sq_ft integer,
+ w_street_number char(10),
+ w_street_name varchar(60),
+ w_street_type char(15),
+ w_suite_number char(10),
+ w_city varchar(60),
+ w_county varchar(30),
+ w_state char(2),
+ w_zip char(10),
+ w_country varchar(20),
+ w_gmt_offset decimal(5,2)
+ )
+ DUPLICATE KEY(w_warehouse_sk)
+ DISTRIBUTED BY HASH(w_warehouse_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.catalog_sales (
+ cs_item_sk bigint not null,
+ cs_order_number bigint not null,
+ cs_sold_date_sk bigint,
+ cs_sold_time_sk bigint,
+ cs_ship_date_sk bigint,
+ cs_bill_customer_sk bigint,
+ cs_bill_cdemo_sk bigint,
+ cs_bill_hdemo_sk bigint,
+ cs_bill_addr_sk bigint,
+ cs_ship_customer_sk bigint,
+ cs_ship_cdemo_sk bigint,
+ cs_ship_hdemo_sk bigint,
+ cs_ship_addr_sk bigint,
+ cs_call_center_sk bigint,
+ cs_catalog_page_sk bigint,
+ cs_ship_mode_sk bigint,
+ cs_warehouse_sk bigint,
+ cs_promo_sk bigint,
+ cs_quantity integer,
+ cs_wholesale_cost decimal(7,2),
+ cs_list_price decimal(7,2),
+ cs_sales_price decimal(7,2),
+ cs_ext_discount_amt decimal(7,2),
+ cs_ext_sales_price decimal(7,2),
+ cs_ext_wholesale_cost decimal(7,2),
+ cs_ext_list_price decimal(7,2),
+ cs_ext_tax decimal(7,2),
+ cs_coupon_amt decimal(7,2),
+ cs_ext_ship_cost decimal(7,2),
+ cs_net_paid decimal(7,2),
+ cs_net_paid_inc_tax decimal(7,2),
+ cs_net_paid_inc_ship decimal(7,2),
+ cs_net_paid_inc_ship_tax decimal(7,2),
+ cs_net_profit decimal(7,2)
+ )
+ DUPLICATE KEY(cs_item_sk, cs_order_number)
+ DISTRIBUTED BY HASH(cs_item_sk, cs_order_number) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1",
+ "colocate_with" = "catalog"
+ );
+ """
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.call_center (
+ cc_call_center_sk bigint not null,
+ cc_call_center_id char(16) not null,
+ cc_rec_start_date date,
+ cc_rec_end_date date,
+ cc_closed_date_sk integer,
+ cc_open_date_sk integer,
+ cc_name varchar(50),
+ cc_class varchar(50),
+ cc_employees integer,
+ cc_sq_ft integer,
+ cc_hours char(20),
+ cc_manager varchar(40),
+ cc_mkt_id integer,
+ cc_mkt_class char(50),
+ cc_mkt_desc varchar(100),
+ cc_market_manager varchar(40),
+ cc_division integer,
+ cc_division_name varchar(50),
+ cc_company integer,
+ cc_company_name char(50),
+ cc_street_number char(10),
+ cc_street_name varchar(60),
+ cc_street_type char(15),
+ cc_suite_number char(10),
+ cc_city varchar(60),
+ cc_county varchar(30),
+ cc_state char(2),
+ cc_zip char(10),
+ cc_country varchar(20),
+ cc_gmt_offset decimal(5,2),
+ cc_tax_percentage decimal(5,2)
+ )
+ DUPLICATE KEY(cc_call_center_sk)
+ DISTRIBUTED BY HASH(cc_call_center_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.inventory (
+ inv_date_sk bigint not null,
+ inv_item_sk bigint not null,
+ inv_warehouse_sk bigint,
+ inv_quantity_on_hand integer
+ )
+ DUPLICATE KEY(inv_date_sk, inv_item_sk, inv_warehouse_sk)
+ DISTRIBUTED BY HASH(inv_date_sk, inv_item_sk, inv_warehouse_sk)
BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS clean_trash_db1.catalog_returns (
+ cr_item_sk bigint not null,
+ cr_order_number bigint not null,
+ cr_returned_date_sk bigint,
+ cr_returned_time_sk bigint,
+ cr_refunded_customer_sk bigint,
+ cr_refunded_cdemo_sk bigint,
+ cr_refunded_hdemo_sk bigint,
+ cr_refunded_addr_sk bigint,
+ cr_returning_customer_sk bigint,
+ cr_returning_cdemo_sk bigint,
+ cr_returning_hdemo_sk bigint,
+ cr_returning_addr_sk bigint,
+ cr_call_center_sk bigint,
+ cr_catalog_page_sk bigint,
+ cr_ship_mode_sk bigint,
+ cr_warehouse_sk bigint,
+ cr_reason_sk bigint,
+ cr_return_quantity integer,
+ cr_return_amount decimal(7,2),
+ cr_return_tax decimal(7,2),
+ cr_return_amt_inc_tax decimal(7,2),
+ cr_fee decimal(7,2),
+ cr_return_ship_cost decimal(7,2),
+ cr_refunded_cash decimal(7,2),
+ cr_reversed_charge decimal(7,2),
+ cr_store_credit decimal(7,2),
+ cr_net_loss decimal(7,2)
+ )
+ DUPLICATE KEY(cr_item_sk, cr_order_number)
+ DISTRIBUTED BY HASH(cr_item_sk, cr_order_number) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1",
+ "colocate_with" = "catalog"
+ );
+ """
+
+ sql """CREATE TABLE IF NOT EXISTS
clean_trash_db1.household_demographics (
+ hd_demo_sk bigint not null,
+ hd_income_band_sk bigint,
+ hd_buy_potential char(15),
+ hd_dep_count integer,
+ hd_vehicle_count integer
+ )
+ DUPLICATE KEY(hd_demo_sk)
+ DISTRIBUTED BY HASH(hd_demo_sk) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.customer_address
(
+ ca_address_sk bigint not null,
+ ca_address_id char(16) not null,
+ ca_street_number char(10),
+ ca_street_name varchar(60),
+ ca_street_type char(15),
+ ca_suite_number char(10),
+ ca_city varchar(60),
+ ca_county varchar(30),
+ ca_state char(2),
+ ca_zip char(10),
+ ca_country varchar(20),
+ ca_gmt_offset decimal(5,2),
+ ca_location_type char(20)
+ )
+ DUPLICATE KEY(ca_address_sk)
+ DISTRIBUTED BY HASH(ca_address_sk) BUCKETS 12
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.income_band (
+ ib_income_band_sk bigint not null,
+ ib_lower_bound integer,
+ ib_upper_bound integer
+ )
+ DUPLICATE KEY(ib_income_band_sk)
+ DISTRIBUTED BY HASH(ib_income_band_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.catalog_page (
+ cp_catalog_page_sk bigint not null,
+ cp_catalog_page_id char(16) not null,
+ cp_start_date_sk integer,
+ cp_end_date_sk integer,
+ cp_department varchar(50),
+ cp_catalog_number integer,
+ cp_catalog_page_number integer,
+ cp_description varchar(100),
+ cp_type varchar(100)
+ )
+ DUPLICATE KEY(cp_catalog_page_sk)
+ DISTRIBUTED BY HASH(cp_catalog_page_sk) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.item (
+ i_item_sk bigint not null,
+ i_item_id char(16) not null,
+ i_rec_start_date date,
+ i_rec_end_date date,
+ i_item_desc varchar(200),
+ i_current_price decimal(7,2),
+ i_wholesale_cost decimal(7,2),
+ i_brand_id integer,
+ i_brand char(50),
+ i_class_id integer,
+ i_class char(50),
+ i_category_id integer,
+ i_category char(50),
+ i_manufact_id integer,
+ i_manufact char(50),
+ i_size char(20),
+ i_formulation char(20),
+ i_color char(20),
+ i_units char(10),
+ i_container char(10),
+ i_manager_id integer,
+ i_product_name char(50)
+ )
+ DUPLICATE KEY(i_item_sk)
+ DISTRIBUTED BY HASH(i_item_sk) BUCKETS 12
+ PROPERTIES (
+ "replication_num" = "1"
+ ); """
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.web_returns (
+ wr_item_sk bigint not null,
+ wr_order_number bigint not null,
+ wr_returned_date_sk bigint,
+ wr_returned_time_sk bigint,
+ wr_refunded_customer_sk bigint,
+ wr_refunded_cdemo_sk bigint,
+ wr_refunded_hdemo_sk bigint,
+ wr_refunded_addr_sk bigint,
+ wr_returning_customer_sk bigint,
+ wr_returning_cdemo_sk bigint,
+ wr_returning_hdemo_sk bigint,
+ wr_returning_addr_sk bigint,
+ wr_web_page_sk bigint,
+ wr_reason_sk bigint,
+ wr_return_quantity integer,
+ wr_return_amt decimal(7,2),
+ wr_return_tax decimal(7,2),
+ wr_return_amt_inc_tax decimal(7,2),
+ wr_fee decimal(7,2),
+ wr_return_ship_cost decimal(7,2),
+ wr_refunded_cash decimal(7,2),
+ wr_reversed_charge decimal(7,2),
+ wr_account_credit decimal(7,2),
+ wr_net_loss decimal(7,2)
+ )
+ DUPLICATE KEY(wr_item_sk, wr_order_number)
+ DISTRIBUTED BY HASH(wr_item_sk, wr_order_number) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1",
+ "colocate_with" = "web"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.web_site (
+ web_site_sk bigint not null,
+ web_site_id char(16) not null,
+ web_rec_start_date date,
+ web_rec_end_date date,
+ web_name varchar(50),
+ web_open_date_sk bigint,
+ web_close_date_sk bigint,
+ web_class varchar(50),
+ web_manager varchar(40),
+ web_mkt_id integer,
+ web_mkt_class varchar(50),
+ web_mkt_desc varchar(100),
+ web_market_manager varchar(40),
+ web_company_id integer,
+ web_company_name char(50),
+ web_street_number char(10),
+ web_street_name varchar(60),
+ web_street_type char(15),
+ web_suite_number char(10),
+ web_city varchar(60),
+ web_county varchar(30),
+ web_state char(2),
+ web_zip char(10),
+ web_country varchar(20),
+ web_gmt_offset decimal(5,2),
+ web_tax_percentage decimal(5,2)
+ )
+ DUPLICATE KEY(web_site_sk)
+ DISTRIBUTED BY HASH(web_site_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.promotion (
+ p_promo_sk bigint not null,
+ p_promo_id char(16) not null,
+ p_start_date_sk bigint,
+ p_end_date_sk bigint,
+ p_item_sk bigint,
+ p_cost decimal(15,2),
+ p_response_targe integer,
+ p_promo_name char(50),
+ p_channel_dmail char(1),
+ p_channel_email char(1),
+ p_channel_catalog char(1),
+ p_channel_tv char(1),
+ p_channel_radio char(1),
+ p_channel_press char(1),
+ p_channel_event char(1),
+ p_channel_demo char(1),
+ p_channel_details varchar(100),
+ p_purpose char(15),
+ p_discount_active char(1)
+ )
+ DUPLICATE KEY(p_promo_sk)
+ DISTRIBUTED BY HASH(p_promo_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.web_sales (
+ ws_item_sk bigint not null,
+ ws_order_number bigint not null,
+ ws_sold_date_sk bigint,
+ ws_sold_time_sk bigint,
+ ws_ship_date_sk bigint,
+ ws_bill_customer_sk bigint,
+ ws_bill_cdemo_sk bigint,
+ ws_bill_hdemo_sk bigint,
+ ws_bill_addr_sk bigint,
+ ws_ship_customer_sk bigint,
+ ws_ship_cdemo_sk bigint,
+ ws_ship_hdemo_sk bigint,
+ ws_ship_addr_sk bigint,
+ ws_web_page_sk bigint,
+ ws_web_site_sk bigint,
+ ws_ship_mode_sk bigint,
+ ws_warehouse_sk bigint,
+ ws_promo_sk bigint,
+ ws_quantity integer,
+ ws_wholesale_cost decimal(7,2),
+ ws_list_price decimal(7,2),
+ ws_sales_price decimal(7,2),
+ ws_ext_discount_amt decimal(7,2),
+ ws_ext_sales_price decimal(7,2),
+ ws_ext_wholesale_cost decimal(7,2),
+ ws_ext_list_price decimal(7,2),
+ ws_ext_tax decimal(7,2),
+ ws_coupon_amt decimal(7,2),
+ ws_ext_ship_cost decimal(7,2),
+ ws_net_paid decimal(7,2),
+ ws_net_paid_inc_tax decimal(7,2),
+ ws_net_paid_inc_ship decimal(7,2),
+ ws_net_paid_inc_ship_tax decimal(7,2),
+ ws_net_profit decimal(7,2)
+ )
+ DUPLICATE KEY(ws_item_sk, ws_order_number)
+ DISTRIBUTED BY HASH(ws_item_sk, ws_order_number) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1",
+ "colocate_with" = "web"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.store (
+ s_store_sk bigint not null,
+ s_store_id char(16) not null,
+ s_rec_start_date date,
+ s_rec_end_date date,
+ s_closed_date_sk bigint,
+ s_store_name varchar(50),
+ s_number_employees integer,
+ s_floor_space integer,
+ s_hours char(20),
+ s_manager varchar(40),
+ s_market_id integer,
+ s_geography_class varchar(100),
+ s_market_desc varchar(100),
+ s_market_manager varchar(40),
+ s_division_id integer,
+ s_division_name varchar(50),
+ s_company_id integer,
+ s_company_name varchar(50),
+ s_street_number varchar(10),
+ s_street_name varchar(60),
+ s_street_type char(15),
+ s_suite_number char(10),
+ s_city varchar(60),
+ s_county varchar(30),
+ s_state char(2),
+ s_zip char(10),
+ s_country varchar(20),
+ s_gmt_offset decimal(5,2),
+ s_tax_precentage decimal(5,2)
+ )
+ DUPLICATE KEY(s_store_sk)
+ DISTRIBUTED BY HASH(s_store_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.time_dim (
+ t_time_sk bigint not null,
+ t_time_id char(16) not null,
+ t_time integer,
+ t_hour integer,
+ t_minute integer,
+ t_second integer,
+ t_am_pm char(2),
+ t_shift char(20),
+ t_sub_shift char(20),
+ t_meal_time char(20)
+ )
+ DUPLICATE KEY(t_time_sk)
+ DISTRIBUTED BY HASH(t_time_sk) BUCKETS 12
+ PROPERTIES (
+ "replication_num" = "1"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.web_page (
+ wp_web_page_sk bigint not null,
+ wp_web_page_id char(16) not null,
+ wp_rec_start_date date,
+ wp_rec_end_date date,
+ wp_creation_date_sk bigint,
+ wp_access_date_sk bigint,
+ wp_autogen_flag char(1),
+ wp_customer_sk bigint,
+ wp_url varchar(100),
+ wp_type char(50),
+ wp_char_count integer,
+ wp_link_count integer,
+ wp_image_count integer,
+ wp_max_ad_count integer
+ )
+ DUPLICATE KEY(wp_web_page_sk)
+ DISTRIBUTED BY HASH(wp_web_page_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.store_returns (
+ sr_item_sk bigint not null,
+ sr_ticket_number bigint not null,
+ sr_returned_date_sk bigint,
+ sr_return_time_sk bigint,
+ sr_customer_sk bigint,
+ sr_cdemo_sk bigint,
+ sr_hdemo_sk bigint,
+ sr_addr_sk bigint,
+ sr_store_sk bigint,
+ sr_reason_sk bigint,
+ sr_return_quantity integer,
+ sr_return_amt decimal(7,2),
+ sr_return_tax decimal(7,2),
+ sr_return_amt_inc_tax decimal(7,2),
+ sr_fee decimal(7,2),
+ sr_return_ship_cost decimal(7,2),
+ sr_refunded_cash decimal(7,2),
+ sr_reversed_charge decimal(7,2),
+ sr_store_credit decimal(7,2),
+ sr_net_loss decimal(7,2)
+ )
+ duplicate key(sr_item_sk, sr_ticket_number)
+ distributed by hash (sr_item_sk, sr_ticket_number) buckets 32
+ properties (
+ "replication_num" = "1",
+ "colocate_with" = "store"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.store_sales (
+ ss_item_sk bigint not null,
+ ss_ticket_number bigint not null,
+ ss_sold_date_sk bigint,
+ ss_sold_time_sk bigint,
+ ss_customer_sk bigint,
+ ss_cdemo_sk bigint,
+ ss_hdemo_sk bigint,
+ ss_addr_sk bigint,
+ ss_store_sk bigint,
+ ss_promo_sk bigint,
+ ss_quantity integer,
+ ss_wholesale_cost decimal(7,2),
+ ss_list_price decimal(7,2),
+ ss_sales_price decimal(7,2),
+ ss_ext_discount_amt decimal(7,2),
+ ss_ext_sales_price decimal(7,2),
+ ss_ext_wholesale_cost decimal(7,2),
+ ss_ext_list_price decimal(7,2),
+ ss_ext_tax decimal(7,2),
+ ss_coupon_amt decimal(7,2),
+ ss_net_paid decimal(7,2),
+ ss_net_paid_inc_tax decimal(7,2),
+ ss_net_profit decimal(7,2)
+ )
+ DUPLICATE KEY(ss_item_sk, ss_ticket_number)
+ DISTRIBUTED BY HASH(ss_item_sk, ss_ticket_number) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1",
+ "colocate_with" = "store"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.ship_mode (
+ sm_ship_mode_sk bigint not null,
+ sm_ship_mode_id char(16) not null,
+ sm_type char(30),
+ sm_code char(10),
+ sm_carrier char(20),
+ sm_contract char(20)
+ )
+ DUPLICATE KEY(sm_ship_mode_sk)
+ DISTRIBUTED BY HASH(sm_ship_mode_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.customer (
+ c_customer_sk bigint not null,
+ c_customer_id char(16) not null,
+ c_current_cdemo_sk bigint,
+ c_current_hdemo_sk bigint,
+ c_current_addr_sk bigint,
+ c_first_shipto_date_sk bigint,
+ c_first_sales_date_sk bigint,
+ c_salutation char(10),
+ c_first_name char(20),
+ c_last_name char(30),
+ c_preferred_cust_flag char(1),
+ c_birth_day integer,
+ c_birth_month integer,
+ c_birth_year integer,
+ c_birth_country varchar(20),
+ c_login char(13),
+ c_email_address char(50),
+ c_last_review_date_sk bigint
+ )
+ DUPLICATE KEY(c_customer_sk)
+ DISTRIBUTED BY HASH(c_customer_id) BUCKETS 12
+ PROPERTIES (
+ "replication_num" = "1"
+ );"""
+ sql """CREATE TABLE IF NOT EXISTS clean_trash_db1.dbgen_version
+ (
+ dv_version varchar(16) ,
+ dv_create_date date ,
+ dv_create_time datetime ,
+ dv_cmdline_args varchar(200)
+ )
+ DUPLICATE KEY(dv_version)
+ DISTRIBUTED BY HASH(dv_version) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );"""
+
+ GetDebugPoint().enableDebugPointForAllBEs("clean_trash_callback_sleep")
+ sql """ADMIN CLEAN TRASH"""
+
+ sql """create database clean_trash_db2"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS clean_trash_db2.customer (
+ `c_custkey` int(11) NOT NULL COMMENT "",
+ `c_name` varchar(26) NOT NULL COMMENT "",
+ `c_address` varchar(41) NOT NULL COMMENT "",
+ `c_city` varchar(11) NOT NULL COMMENT "",
+ `c_nation` varchar(16) NOT NULL COMMENT "",
+ `c_region` varchar(13) NOT NULL COMMENT "",
+ `c_phone` varchar(16) NOT NULL COMMENT "",
+ `c_mktsegment` varchar(11) NOT NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c_custkey`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 12
+ PROPERTIES (
+ "replication_num" = "1",
+ "colocate_with" = "groupa2",
+ "in_memory" = "false",
+ "storage_format" = "DEFAULT"
+ )
+ """
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("clean_trash_callback_sleep")
+ sql """DROP database if EXISTS clean_trash_db1 force"""
+ sql """DROP database if EXISTS clean_trash_db2 force"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]