This is an automated email from the ASF dual-hosted git repository.
kamille pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 63c4e9bb test: add integration test for compaction offload (#1573)
63c4e9bb is described below
commit 63c4e9bb1c546aad89350c56987fbb4204147622
Author: Leslie Su <[email protected]>
AuthorDate: Wed Oct 30 09:20:36 2024 +0800
test: add integration test for compaction offload (#1573)
## Rationale
Close #1571
## Detailed Changes
- Impl `compact` as pre-command for sqlness, with http compaction
service.
- update integration tests to test compaction offload.
## Test Plan
---------
Co-authored-by: kamille <[email protected]>
---
integration_tests/Makefile | 16 ++-
integration_tests/README.md | 3 +
.../env/compaction_offload/compact/compact.result | 110 +++++++++++++++++++++
.../env/compaction_offload/compact/compact.sql | 76 ++++++++++++++
.../cases/env/compaction_offload/config.toml | 44 +++++++++
integration_tests/config/compaction-offload.toml | 44 +++++++++
integration_tests/src/database.rs | 51 ++++++++++
integration_tests/src/main.rs | 9 +-
src/server/src/http.rs | 49 +++++++++
9 files changed, 398 insertions(+), 4 deletions(-)
diff --git a/integration_tests/Makefile b/integration_tests/Makefile
index fe7fbcdc..505f8380 100644
--- a/integration_tests/Makefile
+++ b/integration_tests/Makefile
@@ -21,6 +21,7 @@ HORAEDB_DATA_DIR = /tmp/horaedb
HORAEDB_DATA_DIR_0 = /tmp/horaedb0
HORAEDB_DATA_DIR_1 = /tmp/horaedb1
HORAEMETA_DATA_DIR = /tmp/horaemeta
+HORAEDB_DATA_DIR_2 = /tmp/compaction-offload
export HORAEDB_TEST_CASE_PATH ?= $(ROOT)/cases/env
export HORAEDB_TEST_BINARY ?= $(ROOT)/../target/$(MODE)/horaedb-test
@@ -42,13 +43,17 @@ export CLUSTER_HORAEDB_STDOUT_FILE_0 ?=
/tmp/horaedb-stdout-0.log
export CLUSTER_HORAEDB_STDOUT_FILE_1 ?= /tmp/horaedb-stdout-1.log
export RUST_BACKTRACE=1
+# Environment variables for compaction offload
+export HORAEDB_STDOUT_FILE_2 ?= /tmp/horaedb-stdout-2.log
+export HORAEDB_CONFIG_FILE_2 ?= $(ROOT)/config/compaction-offload.toml
+
# Whether update related repos
# We don't want to rebuild the binaries and data on sometimes(e.g. debugging
in local),
# and we can set it to false.
export UPDATE_REPOS_TO_LATEST ?= true
clean:
- rm -rf $(HORAEDB_DATA_DIR) $(HORAEDB_DATA_DIR_0) $(HORAEDB_DATA_DIR_1)
$(HORAEMETA_DATA_DIR)
+ rm -rf $(HORAEDB_DATA_DIR) $(HORAEDB_DATA_DIR_0) $(HORAEDB_DATA_DIR_1)
$(HORAEMETA_DATA_DIR) $(HORAEDB_DATA_DIR_2)
build-meta:
./build_meta.sh
@@ -80,8 +85,10 @@ run-horaedb-cluster: build-horaedb
nohup ${HORAEDB_BINARY_PATH} --config ${HORAEDB_CONFIG_FILE_1} >
${CLUSTER_HORAEDB_STDOUT_FILE_1} 2>&1 &
sleep 30
-run: prepare build-meta
- $(HORAEDB_TEST_BINARY)
+run:
+ make run-local
+ make run-cluster
+ make run-compaction-offload
run-local: prepare
HORAEDB_ENV_FILTER=local $(HORAEDB_TEST_BINARY)
@@ -89,6 +96,9 @@ run-local: prepare
run-cluster: prepare build-meta
HORAEDB_ENV_FILTER=cluster $(HORAEDB_TEST_BINARY)
+run-compaction-offload: prepare
+ HORAEDB_ENV_FILTER=compaction_offload $(HORAEDB_TEST_BINARY)
+
run-java:
java -version
cd sdk/java && MAVEN_OPTS="--add-opens=java.base/java.nio=ALL-UNNAMED"
mvn clean compile exec:java
diff --git a/integration_tests/README.md b/integration_tests/README.md
index 3c87cda7..a3dc7583 100644
--- a/integration_tests/README.md
+++ b/integration_tests/README.md
@@ -12,6 +12,9 @@ make run-local
# Only cluster env
make run-cluster
+
+# Only compaction offload env
+make run-compaction-offload
```
`horaedb-test` will recursively find all the files end with `.sql` and run it.
Each file will be treated as a case. A file can contain multiple SQLs. When
finished it will tell how many cases it run, and display the diff set if there
is any. An example with one case:
diff --git
a/integration_tests/cases/env/compaction_offload/compact/compact.result
b/integration_tests/cases/env/compaction_offload/compact/compact.result
new file mode 100644
index 00000000..9f4d91b4
--- /dev/null
+++ b/integration_tests/cases/env/compaction_offload/compact/compact.result
@@ -0,0 +1,110 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+DROP TABLE IF EXISTS `compact_table1`;
+
+affected_rows: 0
+
+CREATE TABLE `compact_table1` (
+ `timestamp` timestamp NOT NULL,
+ `value` double,
+ `dic` string dictionary,
+ timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+ enable_ttl='false',
+ update_mode='OVERWRITE'
+);
+
+affected_rows: 0
+
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (1, 100, "d1"), (2, 200, "d2"), (3, 300, "d3");
+
+affected_rows: 3
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (1, 100, "update_d1"), (2, 200, "update_d2"), (3, 300, "update_d3");
+
+affected_rows: 3
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (4, 400, "d4"), (5, 500, "d5"), (6, 600, "d6");
+
+affected_rows: 3
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (4, 400, "update_d4"), (5, 500, "update_d5"), (6, 600, "update_d6");
+
+affected_rows: 3
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (7, 700, "d7"), (8, 800, "d8"), (9, 900, "d9");
+
+affected_rows: 3
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (7, 700, "update_d7"), (8, 800, "update_d8"), (9, 900, "update_d9");
+
+affected_rows: 3
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (10, 1000, "d10"), (11, 1100, "d11"), (12, 1200, "d12");
+
+affected_rows: 3
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (10, 1000, "update_d10"), (11, 1100, "update_d11"), (12, 1200,
"update_d12");
+
+affected_rows: 3
+
+-- trigger manual compaction after flush memtable
+-- SQLNESS ARG pre_cmd=flush
+-- SQLNESS ARG pre_cmd=compact
+SELECT
+ *
+FROM
+ `compact_table1`
+ORDER BY
+ `value` ASC;
+
+tsid,timestamp,value,dic,
+UInt64(0),Timestamp(1),Double(100.0),String("update_d1"),
+UInt64(0),Timestamp(2),Double(200.0),String("update_d2"),
+UInt64(0),Timestamp(3),Double(300.0),String("update_d3"),
+UInt64(0),Timestamp(4),Double(400.0),String("update_d4"),
+UInt64(0),Timestamp(5),Double(500.0),String("update_d5"),
+UInt64(0),Timestamp(6),Double(600.0),String("update_d6"),
+UInt64(0),Timestamp(7),Double(700.0),String("update_d7"),
+UInt64(0),Timestamp(8),Double(800.0),String("update_d8"),
+UInt64(0),Timestamp(9),Double(900.0),String("update_d9"),
+UInt64(0),Timestamp(10),Double(1000.0),String("update_d10"),
+UInt64(0),Timestamp(11),Double(1100.0),String("update_d11"),
+UInt64(0),Timestamp(12),Double(1200.0),String("update_d12"),
+
+
+DROP TABLE `compact_table1`;
+
+affected_rows: 0
+
diff --git a/integration_tests/cases/env/compaction_offload/compact/compact.sql
b/integration_tests/cases/env/compaction_offload/compact/compact.sql
new file mode 100644
index 00000000..f0aa46fb
--- /dev/null
+++ b/integration_tests/cases/env/compaction_offload/compact/compact.sql
@@ -0,0 +1,76 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+DROP TABLE IF EXISTS `compact_table1`;
+
+CREATE TABLE `compact_table1` (
+ `timestamp` timestamp NOT NULL,
+ `value` double,
+ `dic` string dictionary,
+ timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+ enable_ttl='false',
+ update_mode='OVERWRITE'
+);
+
+
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (1, 100, "d1"), (2, 200, "d2"), (3, 300, "d3");
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (1, 100, "update_d1"), (2, 200, "update_d2"), (3, 300, "update_d3");
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (4, 400, "d4"), (5, 500, "d5"), (6, 600, "d6");
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (4, 400, "update_d4"), (5, 500, "update_d5"), (6, 600, "update_d6");
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (7, 700, "d7"), (8, 800, "d8"), (9, 900, "d9");
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (7, 700, "update_d7"), (8, 800, "update_d8"), (9, 900, "update_d9");
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (10, 1000, "d10"), (11, 1100, "d11"), (12, 1200, "d12");
+
+-- SQLNESS ARG pre_cmd=flush
+INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
+ VALUES (10, 1000, "update_d10"), (11, 1100, "update_d11"), (12, 1200,
"update_d12");
+
+
+-- trigger manual compaction after flush memtable
+-- SQLNESS ARG pre_cmd=flush
+-- SQLNESS ARG pre_cmd=compact
+SELECT
+ *
+FROM
+ `compact_table1`
+ORDER BY
+ `value` ASC;
+
+
+DROP TABLE `compact_table1`;
diff --git a/integration_tests/cases/env/compaction_offload/config.toml
b/integration_tests/cases/env/compaction_offload/config.toml
new file mode 100644
index 00000000..044e6af1
--- /dev/null
+++ b/integration_tests/cases/env/compaction_offload/config.toml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[server]
+bind_addr = "127.0.0.1"
+http_port = 5440
+grpc_port = 8831
+
+[query_engine]
+read_parallelism = 8
+
+[analytic.wal]
+type = "RocksDB"
+data_dir = "/tmp/horaedb"
+
+[analytic.storage]
+mem_cache_capacity = '1G'
+mem_cache_partition_bits = 0
+disk_cache_dir = "/tmp/horaedb"
+disk_cache_capacity = '2G'
+disk_cache_page_size = '1M'
+
+[analytic.storage.object_store]
+type = "Local"
+data_dir = "/tmp/horaedb"
+
+[analytic.compaction_mode]
+compaction_mode = "Offload"
+node_picker = "Local"
+endpoint = "127.0.0.1:8831"
diff --git a/integration_tests/config/compaction-offload.toml
b/integration_tests/config/compaction-offload.toml
new file mode 100644
index 00000000..1cb8fbb2
--- /dev/null
+++ b/integration_tests/config/compaction-offload.toml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[server]
+bind_addr = "0.0.0.0"
+http_port = 5440
+grpc_port = 8831
+postgresql_port = 5433
+
+[logger]
+level = "info"
+
+[tracing]
+dir = "/tmp/compaction-offload"
+
+[analytic.storage.object_store]
+type = "Local"
+data_dir = "/tmp/compaction-offload"
+
+[analytic.wal]
+type = "Local"
+data_dir = "/tmp/compaction-offload"
+
+[analytic.compaction_mode]
+compaction_mode = "Offload"
+node_picker = "Local"
+endpoint = "127.0.0.1:8831"
+
+[analytic]
+enable_primary_key_sampling = true
diff --git a/integration_tests/src/database.rs
b/integration_tests/src/database.rs
index 2020cd84..e598a46a 100644
--- a/integration_tests/src/database.rs
+++ b/integration_tests/src/database.rs
@@ -43,6 +43,9 @@ const CLUSTER_HORAEDB_STDOUT_FILE_0_ENV: &str =
"CLUSTER_HORAEDB_STDOUT_FILE_0";
const CLUSTER_HORAEDB_STDOUT_FILE_1_ENV: &str =
"CLUSTER_HORAEDB_STDOUT_FILE_1";
const CLUSTER_HORAEDB_HEALTH_CHECK_INTERVAL_SECONDS: usize = 5;
+const HORAEDB_STDOUT_FILE_2_ENV: &str = "HORAEDB_STDOUT_FILE_2";
+const HORAEDB_CONFIG_FILE_2_ENV: &str = "HORAEDB_CONFIG_FILE_2";
+
const HORAEDB_SERVER_ADDR: &str = "HORAEDB_SERVER_ADDR";
// Used to access HoraeDB by http service.
@@ -82,6 +85,10 @@ pub struct HoraeDBCluster {
meta_stable_check_sql: String,
}
+pub struct HoraeDBCompactionOffload {
+ server: HoraeDBServer,
+}
+
impl HoraeDBServer {
fn spawn(bin: String, config: String, stdout: String) -> Self {
let local_ip = local_ip_address::local_ip()
@@ -231,6 +238,29 @@ impl Backend for HoraeDBCluster {
}
}
+#[async_trait]
+impl Backend for HoraeDBCompactionOffload {
+ fn start() -> Self {
+ let config = env::var(HORAEDB_CONFIG_FILE_2_ENV).expect("Cannot parse
horaedb2 config env");
+ let bin = env::var(HORAEDB_BINARY_PATH_ENV).expect("Cannot parse
binary path env");
+ let stdout = env::var(HORAEDB_STDOUT_FILE_2_ENV).expect("Cannot parse
stdout2 env");
+ Self {
+ server: HoraeDBServer::spawn(bin, config, stdout),
+ }
+ }
+
+ async fn wait_for_ready(&self) {
+ tokio::time::sleep(Duration::from_secs(10)).await
+ }
+
+ fn stop(&mut self) {
+ self.server
+ .server_process
+ .kill()
+ .expect("Failed to kill server");
+ }
+}
+
pub struct HoraeDB<T> {
backend: T,
db_client: Arc<dyn DbClient>,
@@ -264,6 +294,7 @@ impl TryFrom<&str> for Protocol {
#[derive(Debug, Clone, Copy)]
enum Command {
Flush,
+ Compact,
}
impl TryFrom<&str> for Command {
@@ -272,6 +303,7 @@ impl TryFrom<&str> for Command {
fn try_from(s: &str) -> Result<Self, Self::Error> {
let cmd = match s {
"flush" => Self::Flush,
+ "compact" => Self::Compact,
_ => return Err(format!("Unknown command:{s}")),
};
@@ -305,6 +337,12 @@ impl<T: Send + Sync> Database for HoraeDB<T> {
panic!("Execute flush command failed, err:{e}");
}
}
+ Command::Compact => {
+ println!("Compact table...");
+ if let Err(e) = self.execute_compact().await {
+ panic!("Execute compact command failed, err:{e}");
+ }
+ }
}
}
@@ -363,6 +401,19 @@ impl<T> HoraeDB<T> {
Err(resp.text().await.unwrap_or_else(|e| format!("{e:?}")))
}
+ async fn execute_compact(&self) -> Result<(), String> {
+ // TODO(leslie): Improve code reusability. The following code is
similar to
+ // `execute_flush()`.
+ let url = format!("http://{}/debug/compact_table",
self.http_client.endpoint);
+ let resp = self.http_client.client.post(url).send().await.unwrap();
+
+ if resp.status() == StatusCode::OK {
+ return Ok(());
+ }
+
+ Err(resp.text().await.unwrap_or_else(|e| format!("{e:?}")))
+ }
+
async fn execute_influxql(
query: String,
http_client: HttpClient,
diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs
index 70987127..e2c63f10 100644
--- a/integration_tests/src/main.rs
+++ b/integration_tests/src/main.rs
@@ -21,7 +21,7 @@ use std::{env, fmt::Display, path::Path};
use anyhow::Result;
use async_trait::async_trait;
-use database::{Backend, HoraeDB};
+use database::{Backend, HoraeDB, HoraeDBCompactionOffload};
use sqlness::{Database, EnvController, QueryContext, Runner};
use crate::database::{HoraeDBCluster, HoraeDBServer};
@@ -65,6 +65,9 @@ impl EnvController for HoraeDBController {
let db = match env {
"local" => Box::new(HoraeDB::<HoraeDBServer>::create().await) as
DbRef,
"cluster" => Box::new(HoraeDB::<HoraeDBCluster>::create().await)
as DbRef,
+ "compaction_offload" => {
+ Box::new(HoraeDB::<HoraeDBCompactionOffload>::create().await)
as DbRef
+ }
_ => panic!("invalid env {env}"),
};
@@ -103,6 +106,10 @@ async fn main() -> Result<()> {
"build_local" => {
let _ = controller.start("local", None).await;
}
+ // Just build the compaction offload testing env.
+ "build_compaction_offload" => {
+ let _ = controller.start("compaction_offload", None).await;
+ }
other => {
panic!("Unknown run mode:{other}")
}
diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index 83dad878..d31f5ade 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -247,6 +247,7 @@ impl Service {
.or(self.admin_block())
// debug APIs
.or(self.flush_memtable())
+ .or(self.compact_table())
.or(self.update_log_level())
.or(self.profile_cpu())
.or(self.profile_heap())
@@ -524,6 +525,54 @@ impl Service {
})
}
+ // POST /debug/compact_table
+ fn compact_table(
+ &self,
+ ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> +
Clone {
+ warp::path!("debug" / "compact_table")
+ .and(warp::post())
+ .and(self.with_instance())
+ .and_then(|instance: InstanceRef| async move {
+ let get_all_tables = || {
+ let mut tables = Vec::new();
+ for catalog in instance
+ .catalog_manager
+ .all_catalogs()
+ .box_err()
+ .context(Internal)?
+ {
+ for schema in
catalog.all_schemas().box_err().context(Internal)? {
+ for table in
schema.all_tables().box_err().context(Internal)? {
+ tables.push(table);
+ }
+ }
+ }
+ Result::Ok(tables)
+ };
+ match get_all_tables() {
+ Ok(tables) => {
+ let mut failed_tables = Vec::new();
+ let mut success_tables = Vec::new();
+
+ for table in tables {
+ let table_name = table.name().to_string();
+ if let Err(e) = table.compact().await {
+ error!("compact {} failed, err:{}",
&table_name, e);
+ failed_tables.push(table_name);
+ } else {
+ success_tables.push(table_name);
+ }
+ }
+ let mut result = HashMap::new();
+ result.insert("success", success_tables);
+ result.insert("failed", failed_tables);
+ Ok(reply::json(&result))
+ }
+ Err(e) => Err(reject::custom(e)),
+ }
+ })
+ }
+
// GET /metrics
fn metrics(
&self,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]