This is an automated email from the ASF dual-hosted git repository.
kamille pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new e47d9ae7 feat: support horaedb submit compaction task to remote (#1563)
e47d9ae7 is described below
commit e47d9ae7cbf240d50d009317562bc4247c92b8c4
Author: Leslie Su <[email protected]>
AuthorDate: Wed Oct 30 08:48:52 2024 +0800
feat: support horaedb submit compaction task to remote (#1563)
## Rationale
The subtask to support compaction offloading. See #1545
## Detailed Changes
**Compaction node support remote compaction service**
- Define `CompactionServiceImpl` to support compaction rpc service.
- Introduce `NodeType` to distinguish compaction node and horaedb node.
Enable the deployment of compaction node.
- Impl `compaction_client` for horaedb node to access remote compaction
node.
**Horaedb node support compaction offload**
- Introduce `compaction_mode` in analytic engine's `Config` to determine
whether exec compaction offload or not.
- Define `CompactionNodePicker` trait, supporting get remote compaction
node info.
- Impl `RemoteCompactionRunner`, supporting pick remote node and pass
compaction task to the node.
- Add docs (e.g. `example-cluster-n.toml`) to explain how to deploy a
cluster supporting compaction offload.
## Test Plan
---------
Co-authored-by: kamille <[email protected]>
---
Cargo.lock | 9 +-
Cargo.toml | 2 +-
src/analytic_engine/Cargo.toml | 6 +
src/analytic_engine/src/compaction/mod.rs | 91 +++++++-
.../src/compaction/runner/local_runner.rs | 1 +
src/analytic_engine/src/compaction/runner/mod.rs | 244 ++++++++++++++++++++-
.../src/compaction/runner/node_picker.rs | 88 ++++++++
.../src/compaction/runner/remote_client.rs | 148 +++++++++++++
.../src/compaction/runner/remote_runner.rs | 116 ++++++++++
src/analytic_engine/src/instance/engine.rs | 7 +
.../src/instance/flush_compaction.rs | 20 ++
src/analytic_engine/src/instance/open.rs | 56 ++++-
src/analytic_engine/src/lib.rs | 21 +-
src/analytic_engine/src/setup.rs | 6 +
src/analytic_engine/src/sst/factory.rs | 65 +++++-
src/analytic_engine/src/sst/file.rs | 98 ++++++++-
src/analytic_engine/src/sst/writer.rs | 95 +++++++-
src/analytic_engine/src/table_options.rs | 54 +++++
src/analytic_engine/src/tests/util.rs | 1 +
src/benchmarks/src/util.rs | 1 +
src/cluster/src/cluster_impl.rs | 8 +-
src/cluster/src/config.rs | 3 +
src/cluster/src/lib.rs | 6 +-
src/common_types/src/cluster.rs | 26 +++
src/common_types/src/lib.rs | 1 +
src/horaedb/Cargo.toml | 53 ++---
src/horaedb/src/config.rs | 4 +-
src/horaedb/src/setup.rs | 16 +-
src/meta_client/src/lib.rs | 14 +-
src/meta_client/src/meta_impl.rs | 14 +-
src/meta_client/src/types.rs | 9 +
src/router/src/cluster_based.rs | 6 +-
src/server/src/grpc/compaction_service/error.rs | 96 ++++++++
src/server/src/grpc/compaction_service/mod.rs | 113 ++++++++++
src/server/src/grpc/mod.rs | 65 +++++-
src/server/src/server.rs | 9 +
src/table_engine/src/predicate.rs | 6 +-
src/table_engine/src/table.rs | 6 +
38 files changed, 1505 insertions(+), 79 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 33ed54bc..1830a3fb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -89,6 +89,7 @@ dependencies = [
"atomic_enum",
"base64 0.13.1",
"bytes_ext",
+ "cluster",
"codec",
"common_types",
"datafusion",
@@ -107,6 +108,7 @@ dependencies = [
"lru 0.7.8",
"macros",
"message_queue",
+ "meta_client",
"metric_ext",
"object_store 2.1.0",
"parquet",
@@ -116,10 +118,12 @@ dependencies = [
"prost 0.11.8",
"rand 0.8.5",
"remote_engine_client",
+ "reqwest 0.12.4",
"router",
"runtime",
"sampling_cache",
"serde",
+ "serde_json",
"size_ext",
"skiplist",
"smallvec",
@@ -131,7 +135,9 @@ dependencies = [
"thiserror",
"time_ext",
"tokio",
+ "tonic 0.8.3",
"trace_metric",
+ "url",
"wal",
"xorfilter-rs",
]
@@ -3150,6 +3156,7 @@ dependencies = [
"catalog_impls",
"clap",
"cluster",
+ "common_types",
"datafusion",
"df_operator",
"etcd-client",
@@ -3223,7 +3230,7 @@ dependencies = [
[[package]]
name = "horaedbproto"
version = "2.0.0"
-source =
"git+https://github.com/apache/incubator-horaedb-proto.git?rev=a5874d9fedee32ab1292252c4eb6defc4f6e245a#a5874d9fedee32ab1292252c4eb6defc4f6e245a"
+source =
"git+https://github.com/apache/incubator-horaedb-proto.git?rev=fac8564e6e3d50e51daa2af6eb905e747f3191b0#fac8564e6e3d50e51daa2af6eb905e747f3191b0"
dependencies = [
"prost 0.11.8",
"protoc-bin-vendored",
diff --git a/Cargo.toml b/Cargo.toml
index d2d73fd0..b6ca6273 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -103,7 +103,7 @@ thiserror = "1"
bytes_ext = { path = "src/components/bytes_ext" }
catalog = { path = "src/catalog" }
catalog_impls = { path = "src/catalog_impls" }
-horaedbproto = { git =
"https://github.com/apache/incubator-horaedb-proto.git", rev =
"a5874d9fedee32ab1292252c4eb6defc4f6e245a" }
+horaedbproto = { git =
"https://github.com/apache/incubator-horaedb-proto.git", rev =
"fac8564e6e3d50e51daa2af6eb905e747f3191b0" }
codec = { path = "src/components/codec" }
chrono = "0.4"
clap = { version = "4.5.1", features = ["derive"] }
diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml
index 09ff47af..d6c642eb 100644
--- a/src/analytic_engine/Cargo.toml
+++ b/src/analytic_engine/Cargo.toml
@@ -49,6 +49,7 @@ async-trait = { workspace = true }
atomic_enum = { workspace = true }
base64 = { workspace = true }
bytes_ext = { workspace = true }
+cluster = { workspace = true }
codec = { workspace = true }
common_types = { workspace = true }
datafusion = { workspace = true }
@@ -66,6 +67,7 @@ logger = { workspace = true }
lru = { workspace = true }
macros = { workspace = true }
message_queue = { workspace = true }
+meta_client = { workspace = true }
metric_ext = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true }
@@ -73,10 +75,12 @@ parquet_ext = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
remote_engine_client = { workspace = true }
+reqwest = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
sampling_cache = { workspace = true }
serde = { workspace = true }
+serde_json = { workspace = true }
size_ext = { workspace = true }
skiplist = { path = "../components/skiplist" }
smallvec = { workspace = true }
@@ -87,7 +91,9 @@ tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
+tonic = { workspace = true }
trace_metric = { workspace = true }
+url = "2.2"
wal = { workspace = true }
xorfilter-rs = { workspace = true }
diff --git a/src/analytic_engine/src/compaction/mod.rs
b/src/analytic_engine/src/compaction/mod.rs
index 34048d6b..8f63c93e 100644
--- a/src/analytic_engine/src/compaction/mod.rs
+++ b/src/analytic_engine/src/compaction/mod.rs
@@ -20,15 +20,17 @@
use std::{collections::HashMap, fmt, str::FromStr, sync::Arc};
use common_types::COMPACTION_STRATEGY;
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
-use snafu::{ensure, Backtrace, GenerateBacktrace, ResultExt, Snafu};
+use snafu::{ensure, Backtrace, GenerateBacktrace, OptionExt, ResultExt, Snafu};
use time_ext::TimeUnit;
use tokio::sync::oneshot;
use crate::{
compaction::picker::{CommonCompactionPicker, CompactionPickerRef},
- sst::file::{FileHandle, Level},
+ sst::file::{FileHandle, FileMeta, FilePurgeQueue, Level},
table::data::TableDataRef,
};
@@ -72,8 +74,22 @@ pub enum Error {
},
#[snafu(display("Invalid compaction option value, err: {}", error))]
InvalidOption { error: String, backtrace: Backtrace },
+
+ #[snafu(display("Empty file meta.\nBacktrace:\n{}", backtrace))]
+ EmptyFileMeta { backtrace: Backtrace },
+
+ #[snafu(display("Failed to convert file meta, err:{}", source))]
+ ConvertFileMeta { source: GenericError },
+
+ #[snafu(display("Empty purge queue.\nBacktrace:\n{}", backtrace))]
+ EmptyPurgeQueue { backtrace: Backtrace },
+
+ #[snafu(display("Failed to convert level, err:{}", source))]
+ ConvertLevel { source: GenericError },
}
+define_result!(Error);
+
#[derive(Debug, Clone, Copy, Deserialize, Default, PartialEq, Serialize)]
pub enum CompactionStrategy {
#[default]
@@ -145,7 +161,7 @@ impl CompactionStrategy {
pub(crate) fn parse_from(
value: &str,
options: &HashMap<String, String>,
- ) -> Result<CompactionStrategy, Error> {
+ ) -> Result<CompactionStrategy> {
match value.trim().to_lowercase().as_str() {
DEFAULT_STRATEGY => Ok(CompactionStrategy::Default),
STC_STRATEGY => Ok(CompactionStrategy::SizeTiered(
@@ -182,7 +198,7 @@ impl CompactionStrategy {
}
impl SizeTieredCompactionOptions {
- pub(crate) fn validate(&self) -> Result<(), Error> {
+ pub(crate) fn validate(&self) -> Result<()> {
ensure!(
self.bucket_high > self.bucket_low,
InvalidOption {
@@ -215,7 +231,7 @@ impl SizeTieredCompactionOptions {
pub(crate) fn parse_from(
options: &HashMap<String, String>,
- ) -> Result<SizeTieredCompactionOptions, Error> {
+ ) -> Result<SizeTieredCompactionOptions> {
let mut opts = SizeTieredCompactionOptions::default();
if let Some(v) = options.get(BUCKET_LOW_KEY) {
opts.bucket_low = v.parse().context(ParseFloat {
@@ -278,7 +294,7 @@ impl TimeWindowCompactionOptions {
);
}
- pub(crate) fn validate(&self) -> Result<(), Error> {
+ pub(crate) fn validate(&self) -> Result<()> {
if !Self::valid_timestamp_unit(self.timestamp_resolution) {
return InvalidOption {
error: format!(
@@ -294,7 +310,7 @@ impl TimeWindowCompactionOptions {
pub(crate) fn parse_from(
options: &HashMap<String, String>,
- ) -> Result<TimeWindowCompactionOptions, Error> {
+ ) -> Result<TimeWindowCompactionOptions> {
let mut opts = TimeWindowCompactionOptions {
size_tiered: SizeTieredCompactionOptions::parse_from(options)?,
..Default::default()
@@ -326,6 +342,67 @@ pub struct CompactionInputFiles {
pub output_level: Level,
}
+impl TryFrom<horaedbproto::compaction_service::CompactionInputFiles> for
CompactionInputFiles {
+ type Error = Error;
+
+ fn try_from(value: horaedbproto::compaction_service::CompactionInputFiles)
-> Result<Self> {
+ let level: Level =
value.level.try_into().box_err().context(ConvertLevel)?;
+ let output_level: Level = value
+ .output_level
+ .try_into()
+ .box_err()
+ .context(ConvertLevel)?;
+
+ let mut files: Vec<FileHandle> = Vec::with_capacity(value.files.len());
+ for file in value.files {
+ let meta: FileMeta = file
+ .meta
+ .context(EmptyFileMeta)?
+ .try_into()
+ .box_err()
+ .context(ConvertFileMeta)?;
+
+ let purge_queue: FilePurgeQueue =
file.purge_queue.context(EmptyPurgeQueue)?.into();
+
+ files.push({
+ let handle = FileHandle::new(meta, purge_queue);
+ handle.set_being_compacted(file.being_compacted);
+ handle
+ });
+ }
+
+ Ok(CompactionInputFiles {
+ level,
+ files,
+ output_level,
+ })
+ }
+}
+
+impl From<CompactionInputFiles> for
horaedbproto::compaction_service::CompactionInputFiles {
+ fn from(value: CompactionInputFiles) -> Self {
+ let mut files = Vec::with_capacity(value.files.len());
+ for file in value.files {
+ let handle = horaedbproto::compaction_service::FileHandle {
+ meta: Some(file.meta().into()),
+ purge_queue:
Some(horaedbproto::compaction_service::FilePurgeQueue {
+ space_id: file.space_id(),
+ table_id: file.table_id().into(),
+ }),
+ being_compacted: file.being_compacted(),
+ metrics: Some(horaedbproto::compaction_service::SstMetrics {}),
+ };
+ files.push(handle);
+ }
+
+ Self {
+ level: value.level.as_u32(),
+ files,
+ output_level: value.output_level.as_u32(),
+ }
+ }
+}
+
#[derive(Debug, Default, Clone)]
pub struct ExpiredFiles {
/// Level of the expired files.
diff --git a/src/analytic_engine/src/compaction/runner/local_runner.rs
b/src/analytic_engine/src/compaction/runner/local_runner.rs
index fc34b2bf..e379d785 100644
--- a/src/analytic_engine/src/compaction/runner/local_runner.rs
+++ b/src/analytic_engine/src/compaction/runner/local_runner.rs
@@ -45,6 +45,7 @@ use crate::{
const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;
/// Executor carrying for actual compaction work
+#[derive(Clone)]
pub struct LocalCompactionRunner {
runtime: Arc<Runtime>,
scan_options: ScanOptions,
diff --git a/src/analytic_engine/src/compaction/runner/mod.rs
b/src/analytic_engine/src/compaction/runner/mod.rs
index 12f333ea..c8e34484 100644
--- a/src/analytic_engine/src/compaction/runner/mod.rs
+++ b/src/analytic_engine/src/compaction/runner/mod.rs
@@ -16,17 +16,23 @@
// under the License.
pub mod local_runner;
+pub mod node_picker;
+mod remote_client;
+pub mod remote_runner;
use std::sync::Arc;
use async_trait::async_trait;
use common_types::{request_id::RequestId, schema::Schema, SequenceNumber};
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
use object_store::Path;
+use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table::TableId;
use crate::{
compaction::CompactionInputFiles,
- instance::flush_compaction::Result,
+ instance::flush_compaction,
row_iter::IterOptions,
space::SpaceId,
sst::{
@@ -39,12 +45,87 @@ use crate::{
/// Compaction runner
#[async_trait]
pub trait CompactionRunner: Send + Sync + 'static {
- async fn run(&self, task: CompactionRunnerTask) ->
Result<CompactionRunnerResult>;
+ async fn run(
+ &self,
+ task: CompactionRunnerTask,
+ ) -> flush_compaction::Result<CompactionRunnerResult>;
}
pub type CompactionRunnerPtr = Box<dyn CompactionRunner>;
pub type CompactionRunnerRef = Arc<dyn CompactionRunner>;
+#[derive(Debug, Snafu)]
+#[snafu(visibility = "pub")]
+pub enum Error {
+ #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))]
+ EmptyTableSchema { backtrace: Backtrace },
+
+ #[snafu(display("Empty input context.\nBacktrace:\n{}", backtrace))]
+ EmptyInputContext { backtrace: Backtrace },
+
+ #[snafu(display("Empty ouput context.\nBacktrace:\n{}", backtrace))]
+ EmptyOuputContext { backtrace: Backtrace },
+
+ #[snafu(display("Empty compaction input files.\nBacktrace:\n{}",
backtrace))]
+ EmptyCompactionInputFiles { backtrace: Backtrace },
+
+ #[snafu(display("Empty write options.\nBacktrace:\n{}", backtrace))]
+ EmptySstWriteOptions { backtrace: Backtrace },
+
+ #[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))]
+ EmptySstMeta { backtrace: Backtrace },
+
+ #[snafu(display("Empty sst info.\nBacktrace:\n{}", backtrace))]
+ EmptySstInfo { backtrace: Backtrace },
+
+ #[snafu(display("Empty compaction task exec result.\nBacktrace:\n{}",
backtrace))]
+ EmptyExecResult { backtrace: Backtrace },
+
+ #[snafu(display("Failed to convert table schema, err:{}", source))]
+ ConvertTableSchema { source: GenericError },
+
+ #[snafu(display("Failed to convert input context, err:{}", source))]
+ ConvertInputContext { source: GenericError },
+
+ #[snafu(display("Failed to convert ouput context, err:{}", source))]
+ ConvertOuputContext { source: GenericError },
+
+ #[snafu(display("Failed to convert compaction input files, err:{}",
source))]
+ ConvertCompactionInputFiles { source: GenericError },
+
+ #[snafu(display("Failed to convert write options, err:{}", source))]
+ ConvertSstWriteOptions { source: GenericError },
+
+ #[snafu(display("Failed to convert sst info, err:{}", source))]
+ ConvertSstInfo { source: GenericError },
+
+ #[snafu(display("Failed to convert sst meta, err:{}", source))]
+ ConvertSstMeta { source: GenericError },
+
+ #[snafu(display("Failed to connect the service endpoint:{}, err:{}", addr,
source,))]
+ FailConnect { addr: String, source: GenericError },
+
+ #[snafu(display("Failed to execute compaction task, err:{}", source))]
+ FailExecuteCompactionTask { source: GenericError },
+
+ #[snafu(display("Missing header in rpc response.\nBacktrace:\n{}",
backtrace))]
+ MissingHeader { backtrace: Backtrace },
+
+ #[snafu(display(
+ "Bad response, resp code:{}, msg:{}.\nBacktrace:\n{}",
+ code,
+ msg,
+ backtrace
+ ))]
+ BadResponse {
+ code: u32,
+ msg: String,
+ backtrace: Backtrace,
+ },
+}
+
+define_result!(Error);
+
/// Compaction runner task
#[derive(Debug, Clone)]
pub struct CompactionRunnerTask {
@@ -113,12 +194,106 @@ impl CompactionRunnerTask {
}
}
+impl TryFrom<horaedbproto::compaction_service::ExecuteCompactionTaskRequest>
+ for CompactionRunnerTask
+{
+ type Error = Error;
+
+ fn try_from(
+ request:
horaedbproto::compaction_service::ExecuteCompactionTaskRequest,
+ ) -> Result<Self> {
+ let task_key = request.task_key;
+ let request_id: RequestId = request.request_id.into();
+
+ let schema: Schema = request
+ .schema
+ .context(EmptyTableSchema)?
+ .try_into()
+ .box_err()
+ .context(ConvertTableSchema)?;
+
+ let space_id: SpaceId = request.space_id;
+ let table_id: TableId = request.table_id.into();
+ let sequence: SequenceNumber = request.sequence;
+
+ let input_ctx: InputContext = request
+ .input_ctx
+ .context(EmptyInputContext)?
+ .try_into()
+ .box_err()
+ .context(ConvertInputContext)?;
+
+ let output_ctx: OutputContext = request
+ .output_ctx
+ .context(EmptyOuputContext)?
+ .try_into()
+ .box_err()
+ .context(ConvertOuputContext)?;
+
+ Ok(Self {
+ task_key,
+ request_id,
+ schema,
+ space_id,
+ table_id,
+ sequence,
+ input_ctx,
+ output_ctx,
+ })
+ }
+}
+
+impl From<CompactionRunnerTask> for
horaedbproto::compaction_service::ExecuteCompactionTaskRequest {
+ fn from(task: CompactionRunnerTask) -> Self {
+ Self {
+ task_key: task.task_key,
+ request_id: task.request_id.into(),
+ schema: Some((&(task.schema)).into()),
+ space_id: task.space_id,
+ table_id: task.table_id.into(),
+ sequence: task.sequence,
+ input_ctx: Some(task.input_ctx.into()),
+ output_ctx: Some(task.output_ctx.into()),
+ }
+ }
+}
+
pub struct CompactionRunnerResult {
pub output_file_path: Path,
pub sst_info: SstInfo,
pub sst_meta: MetaData,
}
+impl TryFrom<horaedbproto::compaction_service::ExecuteCompactionTaskResponse>
+ for CompactionRunnerResult
+{
+ type Error = Error;
+
+ fn try_from(
+ resp: horaedbproto::compaction_service::ExecuteCompactionTaskResponse,
+ ) -> Result<Self> {
+ let res = resp.result.context(EmptyExecResult)?;
+ let sst_info = res
+ .sst_info
+ .context(EmptySstInfo)?
+ .try_into()
+ .box_err()
+ .context(ConvertSstInfo)?;
+ let sst_meta = res
+ .sst_meta
+ .context(EmptySstMeta)?
+ .try_into()
+ .box_err()
+ .context(ConvertSstMeta)?;
+
+ Ok(Self {
+ output_file_path: res.output_file_path.into(),
+ sst_info,
+ sst_meta,
+ })
+ }
+}
+
#[derive(Debug, Clone)]
pub struct InputContext {
/// Input sst files in this compaction
@@ -128,6 +303,43 @@ pub struct InputContext {
pub need_dedup: bool,
}
+impl TryFrom<horaedbproto::compaction_service::InputContext> for InputContext {
+ type Error = Error;
+
+ fn try_from(value: horaedbproto::compaction_service::InputContext) ->
Result<Self> {
+ let num_rows_per_row_group: usize = value.num_rows_per_row_group as
usize;
+ let merge_iter_options = IterOptions {
+ batch_size: value.merge_iter_options as usize,
+ };
+ let need_dedup = value.need_dedup;
+
+ let files: CompactionInputFiles = value
+ .files
+ .context(EmptyCompactionInputFiles)?
+ .try_into()
+ .box_err()
+ .context(ConvertCompactionInputFiles)?;
+
+ Ok(InputContext {
+ files,
+ num_rows_per_row_group,
+ merge_iter_options,
+ need_dedup,
+ })
+ }
+}
+
+impl From<InputContext> for horaedbproto::compaction_service::InputContext {
+ fn from(value: InputContext) -> Self {
+ Self {
+ files: Some(value.files.into()),
+ num_rows_per_row_group: value.num_rows_per_row_group as u64,
+ merge_iter_options: value.merge_iter_options.batch_size as u64,
+ need_dedup: value.need_dedup,
+ }
+ }
+}
+
#[derive(Debug, Clone)]
pub struct OutputContext {
/// Output sst file path
@@ -135,3 +347,31 @@ pub struct OutputContext {
/// Output sst write context
pub write_options: SstWriteOptions,
}
+
+impl TryFrom<horaedbproto::compaction_service::OutputContext> for
OutputContext {
+ type Error = Error;
+
+ fn try_from(value: horaedbproto::compaction_service::OutputContext) ->
Result<Self> {
+ let file_path: Path = value.file_path.into();
+ let write_options: SstWriteOptions = value
+ .write_options
+ .context(EmptySstWriteOptions)?
+ .try_into()
+ .box_err()
+ .context(ConvertSstWriteOptions)?;
+
+ Ok(OutputContext {
+ file_path,
+ write_options,
+ })
+ }
+}
+
+impl From<OutputContext> for horaedbproto::compaction_service::OutputContext {
+ fn from(value: OutputContext) -> Self {
+ Self {
+ file_path: value.file_path.into(),
+ write_options: Some(value.write_options.into()),
+ }
+ }
+}
diff --git a/src/analytic_engine/src/compaction/runner/node_picker.rs
b/src/analytic_engine/src/compaction/runner/node_picker.rs
new file mode 100644
index 00000000..bf21787c
--- /dev/null
+++ b/src/analytic_engine/src/compaction/runner/node_picker.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Remote compaction node picker.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use macros::define_result;
+use meta_client::{types::FetchCompactionNodeRequest, MetaClientRef};
+use serde::{Deserialize, Serialize};
+use snafu::{ResultExt, Snafu};
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+#[serde(tag = "node_picker", content = "endpoint")]
+pub enum NodePicker {
+ // Local node picker that specifies the local endpoint.
+ // The endpoint in the form `addr:port`.
+ Local(String),
+ Remote,
+}
+
+#[async_trait]
+pub trait CompactionNodePicker: Send + Sync {
+ /// Get the addr of the remote compaction node.
+ async fn get_compaction_node(&self) -> Result<String>;
+}
+
+pub type RemoteCompactionNodePickerRef = Arc<dyn CompactionNodePicker>;
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+ #[snafu(display("Meta client fetch compaciton node failed,
err:{source}."))]
+ FetchCompactionNodeFailure { source: meta_client::Error },
+}
+
+define_result!(Error);
+
+/// RemoteCompactionNodePickerImpl is an implementation of
+/// [`CompactionNodePicker`] based [`MetaClient`].
+pub struct RemoteCompactionNodePickerImpl {
+ pub meta_client: MetaClientRef,
+}
+
+#[async_trait]
+impl CompactionNodePicker for RemoteCompactionNodePickerImpl {
+ /// Get proper remote compaction node info for compaction offload with meta
+ /// client.
+ async fn get_compaction_node(&self) -> Result<String> {
+ let req = FetchCompactionNodeRequest::default();
+ let resp = self
+ .meta_client
+ .fetch_compaction_node(req)
+ .await
+ .context(FetchCompactionNodeFailure)?;
+
+ let compaction_node_addr = resp.endpoint;
+ Ok(compaction_node_addr)
+ }
+}
+
+/// LocalCompactionNodePickerImpl is an implementation of
+/// [`CompactionNodePicker`] mainly used for testing.
+pub struct LocalCompactionNodePickerImpl {
+ pub endpoint: String,
+}
+
+#[async_trait]
+impl CompactionNodePicker for LocalCompactionNodePickerImpl {
+ /// Return the local addr and port of grpc service.
+ async fn get_compaction_node(&self) -> Result<String> {
+ Ok(self.endpoint.clone())
+ }
+}
diff --git a/src/analytic_engine/src/compaction/runner/remote_client.rs
b/src/analytic_engine/src/compaction/runner/remote_client.rs
new file mode 100644
index 00000000..cf1f69be
--- /dev/null
+++ b/src/analytic_engine/src/compaction/runner/remote_client.rs
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use generic_error::BoxError;
+use horaedbproto::{
+ common::ResponseHeader,
compaction_service::compaction_service_client::CompactionServiceClient,
+};
+use logger::info;
+use serde::{Deserialize, Serialize};
+use snafu::{OptionExt, ResultExt};
+use time_ext::ReadableDuration;
+
+use crate::compaction::runner::{
+ BadResponse, FailConnect, FailExecuteCompactionTask, MissingHeader, Result,
+};
+
+type CompactionServiceGrpcClient =
CompactionServiceClient<tonic::transport::Channel>;
+
+#[derive(Debug, Deserialize, Clone, Serialize)]
+#[serde(default)]
+pub struct CompactionClientConfig {
+ pub compaction_server_addr: String,
+ pub timeout: ReadableDuration,
+}
+
+impl Default for CompactionClientConfig {
+ fn default() -> Self {
+ Self {
+ compaction_server_addr: "127.0.0.1:7878".to_string(),
+ timeout: ReadableDuration::secs(5),
+ }
+ }
+}
+
+/// CompactionClient is the abstraction of client used for HoraeDB to
+/// communicate with CompactionServer cluster.
+#[async_trait]
+pub trait CompactionClient: Send + Sync {
+ async fn execute_compaction_task(
+ &self,
+ req: horaedbproto::compaction_service::ExecuteCompactionTaskRequest,
+ ) ->
Result<horaedbproto::compaction_service::ExecuteCompactionTaskResponse>;
+}
+
+pub type CompactionClientRef = Arc<dyn CompactionClient>;
+
+/// Default compaction client impl, will interact with the remote compaction
+/// node.
+pub struct CompactionClientImpl {
+ client: CompactionServiceGrpcClient,
+}
+
+impl CompactionClientImpl {
+ pub async fn connect(config: CompactionClientConfig) -> Result<Self> {
+ let client = {
+ let endpoint =
+
tonic::transport::Endpoint::from_shared(config.compaction_server_addr.to_string())
+ .box_err()
+ .context(FailConnect {
+ addr: &config.compaction_server_addr,
+ })?
+ .timeout(config.timeout.0);
+ CompactionServiceGrpcClient::connect(endpoint)
+ .await
+ .box_err()
+ .context(FailConnect {
+ addr: &config.compaction_server_addr,
+ })?
+ };
+
+ Ok(Self { client })
+ }
+
+ #[inline]
+ fn client(&self) -> CompactionServiceGrpcClient {
+ self.client.clone()
+ }
+}
+
+#[async_trait]
+impl CompactionClient for CompactionClientImpl {
+ async fn execute_compaction_task(
+ &self,
+ pb_req: horaedbproto::compaction_service::ExecuteCompactionTaskRequest,
+ ) ->
Result<horaedbproto::compaction_service::ExecuteCompactionTaskResponse> {
+ // TODO(leslie): Add request header for ExecuteCompactionTaskRequest.
+
+ info!(
+ "Compaction client try to execute compaction task in remote
compaction node, req:{:?}",
+ pb_req
+ );
+
+ let pb_resp = self
+ .client()
+ .execute_compaction_task(pb_req)
+ .await
+ .box_err()
+ .context(FailExecuteCompactionTask)?
+ .into_inner();
+
+ info!(
+ "Compaction client finish executing compaction task in remote
compaction node, req:{:?}",
+ pb_resp
+ );
+
+ check_response_header(&pb_resp.header)?;
+ Ok(pb_resp)
+ }
+}
+
+// TODO(leslie): Consider to refactor and reuse the similar function in
+// meta_client.
+fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
+ let header = header.as_ref().context(MissingHeader)?;
+ if header.code == 0 {
+ Ok(())
+ } else {
+ BadResponse {
+ code: header.code,
+ msg: header.error.clone(),
+ }
+ .fail()
+ }
+}
+
+pub async fn build_compaction_client(
+ config: CompactionClientConfig,
+) -> Result<CompactionClientRef> {
+ let compaction_client = CompactionClientImpl::connect(config).await?;
+ Ok(Arc::new(compaction_client))
+}
diff --git a/src/analytic_engine/src/compaction/runner/remote_runner.rs
b/src/analytic_engine/src/compaction/runner/remote_runner.rs
new file mode 100644
index 00000000..59a70c2f
--- /dev/null
+++ b/src/analytic_engine/src/compaction/runner/remote_runner.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use generic_error::BoxError;
+use logger::info;
+use snafu::ResultExt;
+
+use super::{local_runner::LocalCompactionRunner,
node_picker::RemoteCompactionNodePickerRef};
+use crate::{
+ compaction::runner::{
+ remote_client::{build_compaction_client, CompactionClientConfig,
CompactionClientRef},
+ CompactionRunner, CompactionRunnerResult, CompactionRunnerTask,
+ },
+ instance::flush_compaction::{
+ self, BuildCompactionClientFailed, ConvertCompactionTaskResponse,
+ GetCompactionClientFailed, PickCompactionNodeFailed, Result,
+ },
+};
+
+pub struct RemoteCompactionRunner {
+ pub node_picker: RemoteCompactionNodePickerRef,
+
+ pub fallback_local_when_failed: bool,
+ /// Responsible for executing compaction task locally if fail to remote
+ /// compact when `fallback_local_when_failed` is true, used for better
fault
+ /// tolerance.
+ pub local_compaction_runner: LocalCompactionRunner,
+}
+
+impl RemoteCompactionRunner {
+ async fn get_compaction_client(&self) -> Result<CompactionClientRef> {
+ let mut config = CompactionClientConfig::default();
+ let endpoint = self
+ .node_picker
+ .get_compaction_node()
+ .await
+ .context(PickCompactionNodeFailed)?;
+ config.compaction_server_addr = make_formatted_endpoint(&endpoint);
+
+ let client = build_compaction_client(config)
+ .await
+ .context(BuildCompactionClientFailed)?;
+ Ok(client)
+ }
+
+ async fn local_compact(&self, task: CompactionRunnerTask) ->
Result<CompactionRunnerResult> {
+ self.local_compaction_runner.run(task).await
+ }
+}
+
+#[async_trait]
+impl CompactionRunner for RemoteCompactionRunner {
+ /// Run the compaction task either on a remote node or fall back to local
+ /// compaction.
+ async fn run(&self, task: CompactionRunnerTask) ->
Result<CompactionRunnerResult> {
+ let client = self
+ .get_compaction_client()
+ .await
+ .box_err()
+ .context(GetCompactionClientFailed);
+
+ let pb_resp = match client {
+ Ok(client) => match
client.execute_compaction_task(task.clone().into()).await {
+ Ok(resp) => resp,
+ Err(e) => {
+ if !self.fallback_local_when_failed {
+ return
Err(flush_compaction::Error::RemoteCompactFailed { source: e });
+ }
+
+ info!(
+ "The compaction task falls back to local because of
error:{}",
+ e
+ );
+ return self.local_compact(task).await;
+ }
+ },
+ Err(e) => {
+ if !self.fallback_local_when_failed {
+ return Err(e);
+ }
+
+ info!(
+ "The compaction task falls back to local because of
error:{}",
+ e
+ );
+ return self.local_compact(task).await;
+ }
+ };
+
+ let resp = pb_resp
+ .try_into()
+ .box_err()
+ .context(ConvertCompactionTaskResponse)?;
+
+ Ok(resp)
+ }
+}
+
+fn make_formatted_endpoint(endpoint: &str) -> String {
+ format!("http://{endpoint}")
+}
diff --git a/src/analytic_engine/src/instance/engine.rs
b/src/analytic_engine/src/instance/engine.rs
index 8c29ab1c..537b8331 100644
--- a/src/analytic_engine/src/instance/engine.rs
+++ b/src/analytic_engine/src/instance/engine.rs
@@ -259,6 +259,12 @@ pub enum Error {
sequence: SequenceNumber,
source: wal::manager::Error,
},
+
+ #[snafu(display(
+ "Failed to find meta client to construct remote compaction
runner.\nBacktrace:\n{}",
+ backtrace
+ ))]
+ MetaClientNotExist { backtrace: Backtrace },
}
define_result!(Error);
@@ -293,6 +299,7 @@ impl From<Error> for table_engine::engine::Error {
| Error::DoManifestSnapshot { .. }
| Error::OpenManifest { .. }
| Error::TableNotExist { .. }
+ | Error::MetaClientNotExist { .. }
| Error::OpenTablesOfShard { .. }
| Error::ReplayWalNoCause { .. }
| Error::PurgeWal { .. }
diff --git a/src/analytic_engine/src/instance/flush_compaction.rs
b/src/analytic_engine/src/instance/flush_compaction.rs
index da1647eb..9deceff5 100644
--- a/src/analytic_engine/src/instance/flush_compaction.rs
+++ b/src/analytic_engine/src/instance/flush_compaction.rs
@@ -41,6 +41,7 @@ use tokio::{sync::oneshot, time::Instant};
use wal::manager::WalLocation;
use crate::{
+ compaction::runner::node_picker,
instance::{
self, reorder_memtable::Reorder, serial_executor::TableFlushScheduler,
SpaceStoreRef,
},
@@ -158,6 +159,25 @@ pub enum Error {
#[snafu(display("Failed to alloc file id, err:{}", source))]
AllocFileId { source: data::Error },
+
+ #[snafu(display("Failed to convert compaction task response, err:{}",
source))]
+ ConvertCompactionTaskResponse { source: GenericError },
+
+ #[snafu(display("Failed to pick remote compaction node, err:{}", source))]
+ PickCompactionNodeFailed { source: node_picker::Error },
+
+ #[snafu(display("Failed to build compaction client, err:{}", source))]
+ BuildCompactionClientFailed {
+ source: crate::compaction::runner::Error,
+ },
+
+ #[snafu(display("Failed to get compaction client, err:{}", source))]
+ GetCompactionClientFailed { source: GenericError },
+
+ #[snafu(display("Failed to execute compaction task remotely, err:{}",
source))]
+ RemoteCompactFailed {
+ source: crate::compaction::runner::Error,
+ },
}
define_result!(Error);
diff --git a/src/analytic_engine/src/instance/open.rs
b/src/analytic_engine/src/instance/open.rs
index 220fa84c..97717c5a 100644
--- a/src/analytic_engine/src/instance/open.rs
+++ b/src/analytic_engine/src/instance/open.rs
@@ -24,20 +24,28 @@ use std::{
use common_types::table::ShardId;
use logger::{error, info};
+use meta_client::MetaClientRef;
use object_store::ObjectStoreRef;
-use snafu::ResultExt;
+use snafu::{OptionExt, ResultExt};
use table_engine::{engine::TableDef, table::TableId};
use wal::manager::WalManagerRef;
use crate::{
compaction::{
- runner::{local_runner::LocalCompactionRunner, CompactionRunnerPtr,
CompactionRunnerRef},
+ runner::{
+ local_runner::LocalCompactionRunner,
+ node_picker::{
+ LocalCompactionNodePickerImpl, NodePicker,
RemoteCompactionNodePickerImpl,
+ },
+ remote_runner::RemoteCompactionRunner,
+ CompactionRunnerPtr, CompactionRunnerRef,
+ },
scheduler::SchedulerImpl,
},
context::OpenContext,
engine,
instance::{
- engine::{OpenManifest, OpenTablesOfShard, ReadMetaUpdate, Result},
+ engine::{MetaClientNotExist, OpenManifest, OpenTablesOfShard,
ReadMetaUpdate, Result},
flush_compaction::Flusher,
mem_collector::MemUsageCollector,
wal_replayer::{ReplayMode, WalReplayer},
@@ -52,7 +60,7 @@ use crate::{
},
table::data::{TableCatalogInfo, TableDataRef},
table_meta_set_impl::TableMetaSetImpl,
- RecoverMode,
+ CompactionMode, RecoverMode,
};
pub(crate) struct InstanceContext {
@@ -68,14 +76,48 @@ impl InstanceContext {
wal_manager: WalManagerRef,
store_picker: ObjectStorePickerRef,
sst_factory: SstFactoryRef,
+ meta_client: Option<MetaClientRef>,
) -> Result<Self> {
- let compaction_runner = Box::new(LocalCompactionRunner::new(
+ info!(
+ "Construct compaction runner with compaction_mode:{:?}",
+ ctx.config.compaction_mode
+ );
+
+ let local_compaction_runner = LocalCompactionRunner::new(
ctx.runtimes.compact_runtime.clone(),
&ctx.config,
sst_factory.clone(),
store_picker.clone(),
ctx.meta_cache.clone(),
- ));
+ );
+
+ let compaction_runner: CompactionRunnerPtr = match
&ctx.config.compaction_mode {
+ CompactionMode::Offload(NodePicker::Local(endpoint)) => {
+ Box::new(RemoteCompactionRunner {
+ node_picker: Arc::new(LocalCompactionNodePickerImpl {
+ endpoint: endpoint.clone(),
+ }),
+ // This field is set to false here for testing.
+ fallback_local_when_failed: false,
+ local_compaction_runner: local_compaction_runner.clone(),
+ })
+ }
+ CompactionMode::Offload(NodePicker::Remote) =>
Box::new(RemoteCompactionRunner {
+ node_picker: Arc::new(RemoteCompactionNodePickerImpl {
+ meta_client: meta_client.context(MetaClientNotExist)?,
+ }),
+ fallback_local_when_failed: true,
+ local_compaction_runner: local_compaction_runner.clone(),
+ }),
+
+ CompactionMode::Local => Box::new(LocalCompactionRunner::new(
+ ctx.runtimes.compact_runtime.clone(),
+ &ctx.config,
+ sst_factory.clone(),
+ store_picker.clone(),
+ ctx.meta_cache.clone(),
+ )),
+ };
let instance = Instance::open(
ctx,
@@ -89,7 +131,7 @@ impl InstanceContext {
Ok(Self {
instance,
- local_compaction_runner: None,
+ local_compaction_runner: Some(Arc::new(local_compaction_runner)),
})
}
}
diff --git a/src/analytic_engine/src/lib.rs b/src/analytic_engine/src/lib.rs
index 4b80741f..687bcf63 100644
--- a/src/analytic_engine/src/lib.rs
+++ b/src/analytic_engine/src/lib.rs
@@ -19,7 +19,7 @@
#![feature(option_get_or_insert_default)]
-mod compaction;
+pub mod compaction;
mod context;
mod engine;
pub mod error;
@@ -40,6 +40,7 @@ pub mod table_meta_set_impl;
#[cfg(any(test, feature = "test"))]
pub mod tests;
+use compaction::runner::node_picker::NodePicker;
use error::ErrorKind;
use manifest::details::Options as ManifestOptions;
use object_store::config::StorageOptions;
@@ -54,6 +55,20 @@ pub use crate::{
table_options::TableOptions,
};
+/// The compaction mode decides compaction offload or not.
+///
+/// [CompactionMode::Offload] means offload the compaction task
+/// to a local or remote node.
+///
+/// [CompactionMode::Local] means local compaction, no offloading.
+#[derive(Clone, Default, Debug, Deserialize, Serialize)]
+#[serde(tag = "compaction_mode")]
+pub enum CompactionMode {
+ #[default]
+ Local,
+ Offload(NodePicker),
+}
+
/// Config of analytic engine
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default)]
@@ -77,6 +92,9 @@ pub struct Config {
pub compaction: SchedulerConfig,
+ /// Offload the compaction task or not.
+ pub compaction_mode: CompactionMode,
+
/// sst meta cache capacity
pub sst_meta_cache_cap: Option<usize>,
/// sst data cache capacity
@@ -187,6 +205,7 @@ impl Default for Config {
table_opts: TableOptions::default(),
try_compat_old_layered_memtable_opts: false,
compaction: SchedulerConfig::default(),
+ compaction_mode: CompactionMode::Local,
sst_meta_cache_cap: Some(1000),
sst_data_cache_cap: Some(1000),
manifest: ManifestOptions::default(),
diff --git a/src/analytic_engine/src/setup.rs b/src/analytic_engine/src/setup.rs
index ee167729..4075e250 100644
--- a/src/analytic_engine/src/setup.rs
+++ b/src/analytic_engine/src/setup.rs
@@ -21,6 +21,7 @@ use std::{num::NonZeroUsize, path::Path, pin::Pin, sync::Arc};
use futures::Future;
use macros::define_result;
+use meta_client::MetaClientRef;
use object_store::{
aliyun,
config::{ObjectStoreOptions, StorageOptions},
@@ -96,6 +97,8 @@ pub struct EngineBuilder<'a> {
pub config: &'a Config,
pub engine_runtimes: Arc<EngineRuntimes>,
pub opened_wals: OpenedWals,
+ // Meta client is needed when compaction offload with remote node picker.
+ pub meta_client: Option<MetaClientRef>,
}
impl<'a> EngineBuilder<'a> {
@@ -116,6 +119,7 @@ impl<'a> EngineBuilder<'a> {
self.opened_wals.data_wal,
manifest_storages,
Arc::new(opened_storages),
+ self.meta_client,
)
.await?;
@@ -134,6 +138,7 @@ async fn build_instance_context(
wal_manager: WalManagerRef,
manifest_storages: ManifestStorages,
store_picker: ObjectStorePickerRef,
+ meta_client: Option<MetaClientRef>,
) -> Result<InstanceContext> {
let meta_cache: Option<MetaCacheRef> = config
.sst_meta_cache_cap
@@ -151,6 +156,7 @@ async fn build_instance_context(
wal_manager,
store_picker,
Arc::new(FactoryImpl),
+ meta_client.clone(),
)
.await
.context(OpenInstance)?;
diff --git a/src/analytic_engine/src/sst/factory.rs
b/src/analytic_engine/src/sst/factory.rs
index 2ddeb246..1f17b8df 100644
--- a/src/analytic_engine/src/sst/factory.rs
+++ b/src/analytic_engine/src/sst/factory.rs
@@ -21,10 +21,11 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use common_types::projected_schema::RowProjectorBuilder;
+use generic_error::{BoxError, GenericError};
use macros::define_result;
use object_store::{ObjectStoreRef, Path};
use runtime::Runtime;
-use snafu::{ResultExt, Snafu};
+use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::predicate::PredicateRef;
use trace_metric::MetricsCollector;
@@ -50,6 +51,15 @@ use crate::{
pub enum Error {
#[snafu(display("Failed to parse sst header, err:{}", source,))]
ParseHeader { source: header::Error },
+
+ #[snafu(display("Empty storage format hint.\nBacktrace:\n{}", backtrace))]
+ EmptyStorageFormatHint { backtrace: Backtrace },
+
+ #[snafu(display("Failed to convert storage format hint, err:{}", source))]
+ ConvertStorageFormatHint { source: GenericError },
+
+ #[snafu(display("Failed to convert compression, err:{}", source))]
+ ConvertCompression { source: GenericError },
}
define_result!(Error);
@@ -164,6 +174,59 @@ pub struct SstWriteOptions {
pub column_stats: HashMap<String, ColumnStats>,
}
+impl TryFrom<horaedbproto::compaction_service::SstWriteOptions> for
SstWriteOptions {
+ type Error = Error;
+
+ fn try_from(value: horaedbproto::compaction_service::SstWriteOptions) ->
Result<Self> {
+ let storage_format_hint: StorageFormatHint = value
+ .storage_format_hint
+ .context(EmptyStorageFormatHint)?
+ .try_into()
+ .box_err()
+ .context(ConvertStorageFormatHint)?;
+
+ let num_rows_per_row_group = value.num_rows_per_row_group as usize;
+ let compression: Compression = value
+ .compression
+ .try_into()
+ .box_err()
+ .context(ConvertCompression)?;
+ let max_buffer_size = value.max_buffer_size as usize;
+
+ let column_stats: HashMap<String, ColumnStats> = value
+ .column_stats
+ .into_iter()
+ .map(|(k, v)| (k, ColumnStats { low_cardinality: v }))
+ .collect();
+
+ Ok(SstWriteOptions {
+ storage_format_hint,
+ num_rows_per_row_group,
+ compression,
+ max_buffer_size,
+ column_stats,
+ })
+ }
+}
+
+impl From<SstWriteOptions> for
horaedbproto::compaction_service::SstWriteOptions {
+ fn from(value: SstWriteOptions) -> Self {
+ let column_stats = value
+ .column_stats
+ .into_iter()
+ .map(|(k, v)| (k, v.low_cardinality))
+ .collect();
+
+ Self {
+ storage_format_hint: Some(value.storage_format_hint.into()),
+ num_rows_per_row_group: value.num_rows_per_row_group as u64,
+ compression: value.compression.into(),
+ max_buffer_size: value.max_buffer_size as u64,
+ column_stats,
+ }
+ }
+}
+
impl From<&ColumnStats> for ColumnEncoding {
fn from(value: &ColumnStats) -> Self {
ColumnEncoding {
diff --git a/src/analytic_engine/src/sst/file.rs
b/src/analytic_engine/src/sst/file.rs
index 39cdc7c7..a6cc336a 100644
--- a/src/analytic_engine/src/sst/file.rs
+++ b/src/analytic_engine/src/sst/file.rs
@@ -35,12 +35,13 @@ use common_types::{
SequenceNumber,
};
use future_ext::{retry_async, BackoffConfig, RetryConfig};
+use generic_error::{BoxError, GenericError};
use logger::{error, info, trace, warn};
use macros::define_result;
use metric_ext::Meter;
use object_store::{ObjectStoreRef, Path};
use runtime::{JoinHandle, Runtime};
-use snafu::{ResultExt, Snafu};
+use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table::TableId;
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
@@ -54,6 +55,18 @@ use crate::{space::SpaceId, sst::manager::FileId,
table::sst_util, table_options
pub enum Error {
#[snafu(display("Failed to join purger, err:{}", source))]
StopPurger { source: runtime::Error },
+
+ #[snafu(display("Empty time range.\nBacktrace:\n{}", backtrace))]
+ EmptyTimeRange { backtrace: Backtrace },
+
+ #[snafu(display("Failed to convert time range, err:{}", source))]
+ ConvertTimeRange { source: GenericError },
+
+ #[snafu(display("Failed to convert storage format, err:{}", source))]
+ ConvertStorageFormat { source: GenericError },
+
+ #[snafu(display("Converted overflow, err:{}", source))]
+ ConvertOverflow { source: GenericError },
}
define_result!(Error);
@@ -95,6 +108,15 @@ impl From<u16> for Level {
}
}
+impl TryFrom<u32> for Level {
+ type Error = Error;
+
+ fn try_from(value: u32) -> Result<Self> {
+ let value: u16 = value.try_into().box_err().context(ConvertOverflow)?;
+ Ok(value.into())
+ }
+}
+
impl fmt::Display for Level {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
@@ -197,6 +219,16 @@ impl FileHandle {
}
}
+ #[inline]
+ pub fn space_id(&self) -> SpaceId {
+ self.inner.purge_queue.space_id()
+ }
+
+ #[inline]
+ pub fn table_id(&self) -> TableId {
+ self.inner.purge_queue.table_id()
+ }
+
#[inline]
pub fn read_meter(&self) -> Arc<Meter> {
self.inner.metrics.read_meter.clone()
@@ -460,6 +492,53 @@ impl FileMeta {
}
}
+impl TryFrom<horaedbproto::compaction_service::FileMeta> for FileMeta {
+ type Error = Error;
+
+ fn try_from(value: horaedbproto::compaction_service::FileMeta) ->
Result<Self> {
+ let time_range: TimeRange = value
+ .time_range
+ .context(EmptyTimeRange)?
+ .try_into()
+ .box_err()
+ .context(ConvertTimeRange)?;
+
+ let storage_format: StorageFormat = value
+ .storage_format
+ .try_into()
+ .box_err()
+ .context(ConvertStorageFormat)?;
+ let mut associated_files: Vec<String> =
Vec::with_capacity(value.associated_files.len());
+ for file in value.associated_files {
+ associated_files.push(file);
+ }
+
+ Ok(FileMeta {
+ id: value.file_id,
+ size: value.size,
+ row_num: value.row_num,
+ time_range,
+ max_seq: value.max_seq,
+ storage_format,
+ associated_files,
+ })
+ }
+}
+
+impl From<FileMeta> for horaedbproto::compaction_service::FileMeta {
+ fn from(value: FileMeta) -> Self {
+ Self {
+ file_id: value.id,
+ max_seq: value.max_seq,
+ time_range: Some(value.time_range.into()),
+ size: value.size,
+ row_num: value.row_num,
+ storage_format: value.storage_format.into(),
+ associated_files: value.associated_files,
+ }
+ }
+}
+
// Queue to store files to be deleted for a table.
#[derive(Clone)]
pub struct FilePurgeQueue {
@@ -508,6 +587,23 @@ impl FilePurgeQueue {
);
}
}
+
+ #[inline]
+ pub fn space_id(&self) -> SpaceId {
+ self.inner.space_id
+ }
+
+ #[inline]
+ pub fn table_id(&self) -> TableId {
+ self.inner.table_id
+ }
+}
+
+impl From<horaedbproto::compaction_service::FilePurgeQueue> for FilePurgeQueue
{
+ fn from(value: horaedbproto::compaction_service::FilePurgeQueue) -> Self {
+ let (tx, _rx) = mpsc::unbounded_channel();
+ FilePurgeQueue::new(value.space_id, value.table_id.into(), tx)
+ }
}
struct FilePurgeQueueInner {
diff --git a/src/analytic_engine/src/sst/writer.rs
b/src/analytic_engine/src/sst/writer.rs
index e424e8af..577f4993 100644
--- a/src/analytic_engine/src/sst/writer.rs
+++ b/src/analytic_engine/src/sst/writer.rs
@@ -26,7 +26,8 @@ use common_types::{
SequenceNumber,
};
use futures::Stream;
-use generic_error::GenericError;
+use generic_error::{BoxError, GenericError};
+use snafu::{OptionExt, ResultExt};
use crate::table_options::StorageFormat;
@@ -96,6 +97,21 @@ pub mod error {
#[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg,
backtrace))]
OtherNoCause { msg: String, backtrace: Backtrace },
+
+ #[snafu(display("Empty time range.\nBacktrace:\n{}", backtrace))]
+ EmptyTimeRange { backtrace: Backtrace },
+
+ #[snafu(display("Empty schema.\nBacktrace:\n{}", backtrace))]
+ EmptySchema { backtrace: Backtrace },
+
+ #[snafu(display("Failed to convert time range, err:{}", source))]
+ ConvertTimeRange { source: GenericError },
+
+ #[snafu(display("Failed to convert sst info, err:{}", source))]
+ ConvertSstInfo { source: GenericError },
+
+ #[snafu(display("Failed to convert schema, err:{}", source))]
+ ConvertSchema { source: GenericError },
}
define_result!(Error);
@@ -117,6 +133,44 @@ pub struct SstInfo {
pub time_range: TimeRange,
}
+impl TryFrom<horaedbproto::compaction_service::SstInfo> for SstInfo {
+ type Error = Error;
+
+ fn try_from(value: horaedbproto::compaction_service::SstInfo) ->
Result<Self> {
+ let storage_format = value
+ .storage_format
+ .try_into()
+ .box_err()
+ .context(ConvertSstInfo)?;
+ let time_range = value
+ .time_range
+ .context(EmptyTimeRange)?
+ .try_into()
+ .box_err()
+ .context(ConvertTimeRange)?;
+
+ Ok(Self {
+ file_size: value.file_size as usize,
+ row_num: value.row_num as usize,
+ storage_format,
+ meta_path: value.meta_path,
+ time_range,
+ })
+ }
+}
+
+impl From<SstInfo> for horaedbproto::compaction_service::SstInfo {
+ fn from(value: SstInfo) -> Self {
+ Self {
+ file_size: value.file_size as u64,
+ row_num: value.row_num as u64,
+ storage_format: value.storage_format.into(),
+ meta_path: value.meta_path,
+ time_range: Some(value.time_range.into()),
+ }
+ }
+}
+
#[derive(Debug, Clone)]
pub struct MetaData {
/// Min key of the sst.
@@ -131,6 +185,45 @@ pub struct MetaData {
pub schema: Schema,
}
+impl TryFrom<horaedbproto::compaction_service::MetaData> for MetaData {
+ type Error = Error;
+
+ fn try_from(meta: horaedbproto::compaction_service::MetaData) ->
Result<Self> {
+ let time_range = meta
+ .time_range
+ .context(EmptyTimeRange)?
+ .try_into()
+ .box_err()
+ .context(ConvertTimeRange)?;
+ let schema = meta
+ .schema
+ .context(EmptySchema)?
+ .try_into()
+ .box_err()
+ .context(ConvertSchema)?;
+
+ Ok(Self {
+ min_key: Bytes::from(meta.min_key),
+ max_key: Bytes::from(meta.max_key),
+ time_range,
+ max_sequence: meta.max_sequence,
+ schema,
+ })
+ }
+}
+
+impl From<MetaData> for horaedbproto::compaction_service::MetaData {
+ fn from(meta: MetaData) -> Self {
+ Self {
+ min_key: meta.min_key.to_vec(),
+ max_key: meta.max_key.to_vec(),
+ max_sequence: meta.max_sequence,
+ time_range: Some(meta.time_range.into()),
+ schema: Some((&meta.schema).into()),
+ }
+ }
+}
+
/// The writer for sst.
///
/// The caller provides a stream of [RecordBatch] and the writer takes
diff --git a/src/analytic_engine/src/table_options.rs
b/src/analytic_engine/src/table_options.rs
index 4c1823ee..0ecabb95 100644
--- a/src/analytic_engine/src/table_options.rs
+++ b/src/analytic_engine/src/table_options.rs
@@ -130,6 +130,13 @@ pub enum Error {
))]
UnknownStorageFormatHint { value: String, backtrace: Backtrace },
+ #[snafu(display(
+ "Unknown compression type. value:{:?}.\nBacktrace:\n{}",
+ value,
+ backtrace
+ ))]
+ UnknownCompressionType { value: i32, backtrace: Backtrace },
+
#[snafu(display("Storage format hint is missing.\nBacktrace:\n{}",
backtrace))]
MissingStorageFormatHint { backtrace: Backtrace },
@@ -234,6 +241,33 @@ impl From<manifest_pb::Compression> for Compression {
}
}
+impl TryFrom<i32> for Compression {
+ type Error = Error;
+
+ fn try_from(compression: i32) -> Result<Self> {
+ let compression = match compression {
+ 0 => Compression::Uncompressed,
+ 1 => Compression::Lz4,
+ 2 => Compression::Snappy,
+ 3 => Compression::Zstd,
+ _ => return UnknownCompressionType { value: compression }.fail(),
+ };
+
+ Ok(compression)
+ }
+}
+
+impl From<Compression> for i32 {
+ fn from(value: Compression) -> Self {
+ match value {
+ Compression::Uncompressed => 0,
+ Compression::Lz4 => 1,
+ Compression::Snappy => 2,
+ Compression::Zstd => 3,
+ }
+ }
+}
+
impl From<Compression> for ParquetCompression {
fn from(compression: Compression) -> Self {
match compression {
@@ -340,6 +374,14 @@ impl From<StorageFormat> for manifest_pb::StorageFormat {
}
}
+impl From<StorageFormat> for i32 {
+ fn from(value: StorageFormat) -> Self {
+ match value {
+ StorageFormat::Columnar => 0,
+ }
+ }
+}
+
impl TryFrom<manifest_pb::StorageFormat> for StorageFormat {
type Error = Error;
@@ -363,6 +405,18 @@ impl TryFrom<&str> for StorageFormat {
}
}
+impl TryFrom<i32> for StorageFormat {
+ type Error = Error;
+
+ fn try_from(value: i32) -> Result<Self> {
+ let format = match value {
+ 0 => Self::Columnar,
+ _ => return UnknownStorageFormatType { value }.fail(),
+ };
+ Ok(format)
+ }
+}
+
impl ToString for StorageFormat {
fn to_string(&self) -> String {
match self {
diff --git a/src/analytic_engine/src/tests/util.rs
b/src/analytic_engine/src/tests/util.rs
index 8fe07106..04bc09f7 100644
--- a/src/analytic_engine/src/tests/util.rs
+++ b/src/analytic_engine/src/tests/util.rs
@@ -141,6 +141,7 @@ impl<T: WalsOpener> TestContext<T> {
config: &self.config,
engine_runtimes: self.runtimes.clone(),
opened_wals: opened_wals.clone(),
+ meta_client: None,
};
self.opened_wals = Some(opened_wals);
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index a7f86f08..97c8457b 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -522,6 +522,7 @@ impl<T: WalsOpener> TestContext<T> {
config: &self.config,
engine_runtimes: self.runtimes.clone(),
opened_wals: opened_wals.clone(),
+ meta_client: None,
};
self.opened_wals = Some(opened_wals);
diff --git a/src/cluster/src/cluster_impl.rs b/src/cluster/src/cluster_impl.rs
index aee54e42..d79eda04 100644
--- a/src/cluster/src/cluster_impl.rs
+++ b/src/cluster/src/cluster_impl.rs
@@ -46,8 +46,8 @@ use crate::{
shard_set::{Shard, ShardRef, ShardSet},
topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp,
EtcdClientFailureWithCause,
- InitEtcdClientConfig, InvalidArguments, MetaClientFailure, OpenShard,
OpenShardWithCause,
- Result, ShardNotFound, TableStatus,
+ InitEtcdClientConfig, InvalidArguments, MetaClientFailure, NodeType,
OpenShard,
+ OpenShardWithCause, Result, ShardNotFound, TableStatus,
};
/// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
@@ -376,6 +376,10 @@ impl Cluster for ClusterImpl {
Ok(())
}
+ fn node_type(&self) -> NodeType {
+ self.config.node_type.clone()
+ }
+
async fn open_shard(&self, shard_info: &ShardInfo) -> Result<ShardRef> {
self.inner.open_shard(shard_info).await
}
diff --git a/src/cluster/src/config.rs b/src/cluster/src/config.rs
index 29e0da97..d0b1c694 100644
--- a/src/cluster/src/config.rs
+++ b/src/cluster/src/config.rs
@@ -23,6 +23,8 @@ use serde::{Deserialize, Serialize};
use table_engine::ANALYTIC_ENGINE_TYPE;
use time_ext::ReadableDuration;
+use crate::NodeType;
+
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default)]
// TODO: move this to table_engine crates
@@ -133,6 +135,7 @@ impl Default for TlsConfig {
#[serde(default)]
pub struct ClusterConfig {
pub cmd_channel_buffer_size: usize,
+ pub node_type: NodeType,
pub meta_client: MetaClientConfig,
pub etcd_client: EtcdClientConfig,
}
diff --git a/src/cluster/src/lib.rs b/src/cluster/src/lib.rs
index a97c945a..ddda6c46 100644
--- a/src/cluster/src/lib.rs
+++ b/src/cluster/src/lib.rs
@@ -28,7 +28,7 @@
use std::sync::Arc;
use async_trait::async_trait;
-use common_types::schema::SchemaName;
+use common_types::{cluster::NodeType, schema::SchemaName};
use generic_error::GenericError;
use macros::define_result;
use meta_client::types::{
@@ -190,12 +190,14 @@ pub struct ClusterNodesResp {
pub cluster_nodes: ClusterNodesRef,
}
-/// Cluster manages tables and shard infos in cluster mode.
#[async_trait]
pub trait Cluster {
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
+ /// Get cluster type.
+ fn node_type(&self) -> NodeType;
+
/// Fetch related information and open shard.
async fn open_shard(&self, shard_info: &ShardInfo) -> Result<ShardRef>;
diff --git a/src/common_types/src/cluster.rs b/src/common_types/src/cluster.rs
new file mode 100644
index 00000000..ad302023
--- /dev/null
+++ b/src/common_types/src/cluster.rs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+
+/// Type to distinguish different node type in cluster mode.
+#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)]
+pub enum NodeType {
+ #[default]
+ HoraeDB,
+ CompactionServer,
+}
diff --git a/src/common_types/src/lib.rs b/src/common_types/src/lib.rs
index 0b6cda17..334bd42f 100644
--- a/src/common_types/src/lib.rs
+++ b/src/common_types/src/lib.rs
@@ -18,6 +18,7 @@
//! Contains common types
pub mod bitset;
+pub mod cluster;
pub mod column;
pub mod column_block;
pub mod column_schema;
diff --git a/src/horaedb/Cargo.toml b/src/horaedb/Cargo.toml
index ce505105..5a6144d3 100644
--- a/src/horaedb/Cargo.toml
+++ b/src/horaedb/Cargo.toml
@@ -38,32 +38,33 @@ wal-rocksdb = ["wal/wal-rocksdb",
"analytic_engine/wal-rocksdb"]
wal-local-storage = ["wal/wal-local-storage",
"analytic_engine/wal-local-storage"]
[dependencies]
-analytic_engine = { workspace = true }
-catalog = { workspace = true }
-catalog_impls = { workspace = true }
-clap = { workspace = true }
-cluster = { workspace = true }
-datafusion = { workspace = true }
-df_operator = { workspace = true }
-etcd-client = { workspace = true }
-interpreters = { workspace = true }
-logger = { workspace = true }
-meta_client = { workspace = true }
-moka = { version = "0.10", features = ["future"] }
-panic_ext = { workspace = true }
-proxy = { workspace = true }
-query_engine = { workspace = true }
-router = { workspace = true }
-runtime = { workspace = true }
-serde = { workspace = true }
-server = { workspace = true }
-signal-hook = "0.3"
-size_ext = { workspace = true }
-table_engine = { workspace = true }
-toml = { workspace = true }
-toml_ext = { workspace = true }
-tracing_util = { workspace = true }
-wal = { workspace = true }
+analytic_engine = { workspace = true }
+catalog = { workspace = true }
+catalog_impls = { workspace = true }
+clap = { workspace = true }
+cluster = { workspace = true }
+common_types = { workspace = true }
+datafusion = { workspace = true }
+df_operator = { workspace = true }
+etcd-client = { workspace = true }
+interpreters = { workspace = true }
+logger = { workspace = true }
+meta_client = { workspace = true }
+moka = { version = "0.10", features = ["future"] }
+panic_ext = { workspace = true }
+proxy = { workspace = true }
+query_engine = { workspace = true }
+router = { workspace = true }
+runtime = { workspace = true }
+serde = { workspace = true }
+server = { workspace = true }
+signal-hook = "0.3"
+size_ext = { workspace = true }
+table_engine = { workspace = true }
+toml = { workspace = true }
+toml_ext = { workspace = true }
+tracing_util = { workspace = true }
+wal = { workspace = true }
[build-dependencies]
vergen = { version = "8", default-features = false, features = [
diff --git a/src/horaedb/src/config.rs b/src/horaedb/src/config.rs
index b9f8932f..e7f19233 100644
--- a/src/horaedb/src/config.rs
+++ b/src/horaedb/src/config.rs
@@ -26,8 +26,8 @@ use size_ext::ReadableSize;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
pub struct NodeInfo {
- /// The address of the horaedb node. It can be a domain name or an IP
- /// address without port followed.
+ /// The address of the horaedb (or compaction server) node. It can be a
+ /// domain name or an IP address without port followed.
pub addr: String,
pub zone: String,
pub idc: String,
diff --git a/src/horaedb/src/setup.rs b/src/horaedb/src/setup.rs
index 9bdb46da..33632b55 100644
--- a/src/horaedb/src/setup.rs
+++ b/src/horaedb/src/setup.rs
@@ -313,6 +313,7 @@ async fn build_with_meta<T: WalsOpener>(
zone: config.node.zone.clone(),
idc: config.node.idc.clone(),
binary_version: config.node.binary_version.clone(),
+ node_type: cluster_config.node_type.clone(),
};
info!("Build horaedb with node meta info:{node_meta_info:?}");
@@ -349,8 +350,12 @@ async fn build_with_meta<T: WalsOpener>(
config: &config.analytic,
engine_runtimes: runtimes.clone(),
opened_wals: opened_wals.clone(),
+ meta_client: Some(meta_client.clone()),
};
- let TableEngineContext { table_engine, .. } = engine_builder
+ let TableEngineContext {
+ table_engine,
+ local_compaction_runner,
+ } = engine_builder
.build()
.await
.expect("Failed to setup analytic engine");
@@ -368,14 +373,18 @@ async fn build_with_meta<T: WalsOpener>(
let table_manipulator =
Arc::new(meta_based::TableManipulatorImpl::new(meta_client));
let schema_config_provider =
Arc::new(ClusterBasedProvider::new(cluster.clone()));
- builder
+
+ let mut builder = builder
.table_engine(engine_proxy)
.catalog_manager(catalog_manager)
.table_manipulator(table_manipulator)
.cluster(cluster)
.opened_wals(opened_wals)
.router(router)
- .schema_config_provider(schema_config_provider)
+ .schema_config_provider(schema_config_provider);
+ builder = builder.compaction_runner(local_compaction_runner.expect("Empty
compaction runner."));
+
+ builder
}
async fn build_without_meta<T: WalsOpener>(
@@ -394,6 +403,7 @@ async fn build_without_meta<T: WalsOpener>(
config: &config.analytic,
engine_runtimes: runtimes.clone(),
opened_wals: opened_wals.clone(),
+ meta_client: None,
};
let TableEngineContext { table_engine, .. } = engine_builder
.build()
diff --git a/src/meta_client/src/lib.rs b/src/meta_client/src/lib.rs
index a6cb8df6..ba933135 100644
--- a/src/meta_client/src/lib.rs
+++ b/src/meta_client/src/lib.rs
@@ -23,9 +23,9 @@ use macros::define_result;
use snafu::{Backtrace, Snafu};
use types::{
AllocSchemaIdRequest, AllocSchemaIdResponse, CreateTableRequest,
CreateTableResponse,
- DropTableRequest, DropTableResponse, GetNodesRequest, GetNodesResponse,
- GetTablesOfShardsRequest, GetTablesOfShardsResponse, RouteTablesRequest,
RouteTablesResponse,
- ShardInfo,
+ DropTableRequest, DropTableResponse, FetchCompactionNodeRequest,
FetchCompactionNodeResponse,
+ GetNodesRequest, GetNodesResponse, GetTablesOfShardsRequest,
GetTablesOfShardsResponse,
+ RouteTablesRequest, RouteTablesResponse, ShardInfo,
};
pub mod meta_impl;
@@ -76,6 +76,9 @@ pub enum Error {
#[snafu(display("Failed to get tables, err:{}", source))]
FailGetTables { source: GenericError },
+ #[snafu(display("Failed to fetch compaction node, err:{}", source))]
+ FailFetchCompactionNode { source: GenericError },
+
#[snafu(display("Failed to route tables, err:{}", source))]
FailRouteTables { source: GenericError },
@@ -113,6 +116,11 @@ pub trait MetaClient: Send + Sync {
async fn get_nodes(&self, req: GetNodesRequest) ->
Result<GetNodesResponse>;
+ async fn fetch_compaction_node(
+ &self,
+ req: FetchCompactionNodeRequest,
+ ) -> Result<FetchCompactionNodeResponse>;
+
async fn send_heartbeat(&self, req: Vec<ShardInfo>) -> Result<()>;
}
diff --git a/src/meta_client/src/meta_impl.rs b/src/meta_client/src/meta_impl.rs
index 5ba98de5..ffe32fae 100644
--- a/src/meta_client/src/meta_impl.rs
+++ b/src/meta_client/src/meta_impl.rs
@@ -31,9 +31,10 @@ use time_ext::ReadableDuration;
use crate::{
types::{
AllocSchemaIdRequest, AllocSchemaIdResponse, CreateTableRequest,
CreateTableResponse,
- DropTableRequest, DropTableResponse, GetNodesRequest, GetNodesResponse,
- GetTablesOfShardsRequest, GetTablesOfShardsResponse, NodeInfo,
NodeMetaInfo, RequestHeader,
- RouteTablesRequest, RouteTablesResponse, ShardInfo,
+ DropTableRequest, DropTableResponse, FetchCompactionNodeRequest,
+ FetchCompactionNodeResponse, GetNodesRequest, GetNodesResponse,
GetTablesOfShardsRequest,
+ GetTablesOfShardsResponse, NodeInfo, NodeMetaInfo, RequestHeader,
RouteTablesRequest,
+ RouteTablesResponse, ShardInfo,
},
BadResponse, FailAllocSchemaId, FailConnect, FailCreateTable,
FailDropTable, FailGetTables,
FailRouteTables, FailSendHeartbeat, MetaClient, MetaClientRef,
MissingHeader, Result,
@@ -236,6 +237,13 @@ impl MetaClient for MetaClientImpl {
GetNodesResponse::try_from(pb_resp)
}
+ async fn fetch_compaction_node(
+ &self,
+ _req: FetchCompactionNodeRequest,
+ ) -> Result<FetchCompactionNodeResponse> {
+ todo!()
+ }
+
async fn send_heartbeat(&self, shard_infos: Vec<ShardInfo>) -> Result<()> {
let node_info = NodeInfo {
node_meta_info: self.node_meta_info.clone(),
diff --git a/src/meta_client/src/types.rs b/src/meta_client/src/types.rs
index 6a6aba69..52484362 100644
--- a/src/meta_client/src/types.rs
+++ b/src/meta_client/src/types.rs
@@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt, sync::Arc};
pub use common_types::table::{ShardId, ShardVersion};
use common_types::{
+ cluster::NodeType,
schema::{SchemaId, SchemaName},
table::{TableId, TableName},
};
@@ -163,6 +164,7 @@ pub struct NodeMetaInfo {
pub zone: String,
pub idc: String,
pub binary_version: String,
+ pub node_type: NodeType,
}
impl NodeMetaInfo {
@@ -589,3 +591,10 @@ impl TryFrom<meta_service_pb::GetNodesResponse> for
GetNodesResponse {
})
}
}
+
+#[derive(Debug, Clone, Default)]
+pub struct FetchCompactionNodeRequest {}
+
+pub struct FetchCompactionNodeResponse {
+ pub endpoint: String,
+}
diff --git a/src/router/src/cluster_based.rs b/src/router/src/cluster_based.rs
index d9291044..bd5efd8b 100644
--- a/src/router/src/cluster_based.rs
+++ b/src/router/src/cluster_based.rs
@@ -205,7 +205,7 @@ mod tests {
shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster,
ClusterNodesResp,
TableStatus,
};
- use common_types::table::ShardId;
+ use common_types::{cluster::NodeType, table::ShardId};
use horaedbproto::storage::{RequestContext, RouteRequest as
RouteRequestPb};
use meta_client::types::{
NodeShard, RouteEntry, RouteTablesResponse, ShardInfo,
ShardRole::Leader, TableInfo,
@@ -226,6 +226,10 @@ mod tests {
unimplemented!();
}
+ fn node_type(&self) -> NodeType {
+ unimplemented!()
+ }
+
async fn open_shard(&self, _: &ShardInfo) -> cluster::Result<ShardRef>
{
unimplemented!();
}
diff --git a/src/server/src/grpc/compaction_service/error.rs
b/src/server/src/grpc/compaction_service/error.rs
new file mode 100644
index 00000000..eadb3f24
--- /dev/null
+++ b/src/server/src/grpc/compaction_service/error.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Error definitions for compaction service.
+
+use generic_error::GenericError;
+use horaedbproto::common::ResponseHeader;
+use macros::define_result;
+use snafu::Snafu;
+
+use crate::error_util;
+
+define_result!(Error);
+
+#[derive(Snafu, Debug)]
+#[snafu(visibility(pub))]
+pub enum Error {
+ #[snafu(display("Server error, code:{:?}, message:{}", code, msg))]
+ ErrNoCause { code: StatusCode, msg: String },
+
+ #[snafu(display("Server error, code:{:?}, message:{}, cause:{}", code,
msg, source))]
+ ErrWithCause {
+ code: StatusCode,
+ msg: String,
+ source: GenericError,
+ },
+}
+
+impl Error {
+ pub fn code(&self) -> StatusCode {
+ match *self {
+ Error::ErrNoCause { code, .. } => code,
+ Error::ErrWithCause { code, .. } => code,
+ }
+ }
+
+ /// Get the error message returned to the user.
+ pub fn error_message(&self) -> String {
+ match self {
+ Error::ErrNoCause { msg, .. } => msg.clone(),
+
+ Error::ErrWithCause { msg, source, .. } => {
+ let err_string = source.to_string();
+ let first_line =
error_util::remove_backtrace_from_err(&err_string);
+ format!("{msg}. Caused by: {first_line}")
+ }
+ }
+ }
+}
+
+/// A set of codes for compaction service.
+///
+/// Note that such a set of codes is different with the codes (alias to http
+/// status code) used by storage service.
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub enum StatusCode {
+ #[default]
+ Ok = 0,
+ BadRequest = 401,
+ Internal = 500,
+}
+
+impl StatusCode {
+ #[inline]
+ pub fn as_u32(self) -> u32 {
+ self as u32
+ }
+}
+
+pub fn build_err_header(err: Error) -> ResponseHeader {
+ ResponseHeader {
+ code: err.code().as_u32(),
+ error: err.error_message(),
+ }
+}
+
+pub fn build_ok_header() -> ResponseHeader {
+ ResponseHeader {
+ code: StatusCode::Ok.as_u32(),
+ ..Default::default()
+ }
+}
diff --git a/src/server/src/grpc/compaction_service/mod.rs
b/src/server/src/grpc/compaction_service/mod.rs
new file mode 100644
index 00000000..3954b78a
--- /dev/null
+++ b/src/server/src/grpc/compaction_service/mod.rs
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Compaction rpc service implementation.
+
+use std::sync::Arc;
+
+use analytic_engine::compaction::runner::{CompactionRunnerRef,
CompactionRunnerTask};
+use async_trait::async_trait;
+use error::{build_err_header, build_ok_header, ErrWithCause, StatusCode};
+use generic_error::BoxError;
+use horaedbproto::compaction_service::{
+ compaction_service_server::CompactionService, ExecResult,
ExecuteCompactionTaskRequest,
+ ExecuteCompactionTaskResponse,
+};
+use runtime::Runtime;
+use snafu::ResultExt;
+use tonic::{Request, Response, Status};
+
+mod error;
+
+/// Builder for [CompactionServiceImpl]
+pub struct Builder {
+ pub runtime: Arc<Runtime>,
+ pub compaction_runner: CompactionRunnerRef,
+}
+
+impl Builder {
+ pub fn build(self) -> CompactionServiceImpl {
+ let Self {
+ runtime,
+ compaction_runner,
+ } = self;
+
+ CompactionServiceImpl {
+ runtime,
+ compaction_runner,
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct CompactionServiceImpl {
+ pub runtime: Arc<Runtime>,
+ pub compaction_runner: CompactionRunnerRef,
+}
+
+#[async_trait]
+impl CompactionService for CompactionServiceImpl {
+ async fn execute_compaction_task(
+ &self,
+ request: Request<ExecuteCompactionTaskRequest>,
+ ) -> Result<Response<ExecuteCompactionTaskResponse>, Status> {
+ let request: Result<CompactionRunnerTask, error::Error> = request
+ .into_inner()
+ .try_into()
+ .box_err()
+ .context(ErrWithCause {
+ code: StatusCode::BadRequest,
+ msg: "fail to convert the execute compaction task request",
+ });
+
+ let mut resp: ExecuteCompactionTaskResponse =
ExecuteCompactionTaskResponse::default();
+ match request {
+ Ok(task) => {
+ let request_id = task.request_id.clone();
+ let res = self
+ .compaction_runner
+ .run(task)
+ .await
+ .box_err()
+ .with_context(|| ErrWithCause {
+ code: StatusCode::Internal,
+ msg: format!("fail to compact task,
request:{request_id}"),
+ });
+
+ match res {
+ Ok(res) => {
+ resp.header = Some(build_ok_header());
+ resp.result = Some(ExecResult {
+ output_file_path: res.output_file_path.into(),
+ sst_info: Some(res.sst_info.into()),
+ sst_meta: Some(res.sst_meta.into()),
+ });
+ // TODO(leslie): Add status.
+ }
+ Err(e) => {
+ resp.header = Some(build_err_header(e));
+ }
+ }
+ }
+ Err(e) => {
+ resp.header = Some(build_err_header(e));
+ }
+ }
+
+ Ok(Response::new(resp))
+ }
+}
diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs
index 7b02a3a2..24a18168 100644
--- a/src/server/src/grpc/mod.rs
+++ b/src/server/src/grpc/mod.rs
@@ -24,11 +24,14 @@ use std::{
time::Duration,
};
+use analytic_engine::compaction::runner::CompactionRunnerRef;
use cluster::ClusterRef;
use common_types::column_schema;
+use compaction_service::CompactionServiceImpl;
use futures::FutureExt;
use generic_error::GenericError;
use horaedbproto::{
+ compaction_service::compaction_service_server::CompactionServiceServer,
meta_event::meta_event_service_server::MetaEventServiceServer,
remote_engine::remote_engine_service_server::RemoteEngineServiceServer,
storage::storage_service_server::StorageServiceServer,
@@ -60,6 +63,7 @@ use crate::{
},
};
+mod compaction_service;
mod meta_event_service;
mod metrics;
mod remote_engine_service;
@@ -105,6 +109,9 @@ pub enum Error {
#[snafu(display("Missing wals.\nBacktrace:\n{}", backtrace))]
MissingWals { backtrace: Backtrace },
+ #[snafu(display("Missing compaction runner.\nBacktrace:\n{}", backtrace))]
+ MissingCompactionRunner { backtrace: Backtrace },
+
#[snafu(display("Missing timeout.\nBacktrace:\n{}", backtrace))]
MissingTimeout { backtrace: Backtrace },
@@ -163,6 +170,7 @@ define_result!(Error);
pub struct RpcServices {
serve_addr: SocketAddr,
rpc_server: InterceptedService<StorageServiceServer<StorageServiceImpl>,
AuthWithFile>,
+ compaction_rpc_server:
Option<CompactionServiceServer<CompactionServiceImpl>>,
meta_rpc_server: Option<MetaEventServiceServer<MetaServiceImpl>>,
remote_engine_server: RemoteEngineServiceServer<RemoteEngineServiceImpl>,
runtime: Arc<Runtime>,
@@ -173,6 +181,7 @@ pub struct RpcServices {
impl RpcServices {
pub async fn start(&mut self) -> Result<()> {
let rpc_server = self.rpc_server.clone();
+ let compaction_rpc_server = self.compaction_rpc_server.clone();
let meta_rpc_server = self.meta_rpc_server.clone();
let remote_engine_server = self.remote_engine_server.clone();
let serve_addr = self.serve_addr;
@@ -182,6 +191,11 @@ impl RpcServices {
let mut router = Server::builder().add_service(rpc_server);
+ if let Some(s) = compaction_rpc_server {
+ info!("Grpc server serves compaction service");
+ router = router.add_service(s);
+ };
+
if let Some(s) = meta_rpc_server {
info!("Grpc server serves meta rpc service");
router = router.add_service(s);
@@ -226,6 +240,7 @@ pub struct Builder {
proxy: Option<Arc<Proxy>>,
query_dedup_config: Option<QueryDedupConfig>,
hotspot_recorder: Option<Arc<HotspotRecorder>>,
+ compaction_runner: Option<CompactionRunnerRef>,
}
impl Builder {
@@ -241,6 +256,7 @@ impl Builder {
proxy: None,
query_dedup_config: None,
hotspot_recorder: None,
+ compaction_runner: None,
}
}
@@ -294,6 +310,12 @@ impl Builder {
self.query_dedup_config = Some(config);
self
}
+
+ // Compaction runner is an optional field for building [RpcServices].
+ pub fn compaction_runner(mut self, runner: Option<CompactionRunnerRef>) ->
Self {
+ self.compaction_runner = runner;
+ self
+ }
}
impl Builder {
@@ -301,19 +323,39 @@ impl Builder {
let auth = self.auth.context(MissingAuth)?;
let runtimes = self.runtimes.context(MissingRuntimes)?;
let instance = self.instance.context(MissingInstance)?;
- let opened_wals = self.opened_wals.context(MissingWals)?;
let proxy = self.proxy.context(MissingProxy)?;
let hotspot_recorder =
self.hotspot_recorder.context(MissingHotspotRecorder)?;
-
- let meta_rpc_server = self.cluster.map(|v| {
- let builder = meta_event_service::Builder {
- cluster: v,
- instance: instance.clone(),
- runtime: runtimes.meta_runtime.clone(),
- opened_wals,
- };
- MetaEventServiceServer::new(builder.build())
- });
+ let mut meta_rpc_server:
Option<MetaEventServiceServer<MetaServiceImpl>> = None;
+ let mut compaction_rpc_server:
Option<CompactionServiceServer<CompactionServiceImpl>> =
+ None;
+
+ self.cluster
+ .map(|v| {
+ let result: Result<()> = (|| {
+ // Support meta rpc service.
+ let opened_wals = self.opened_wals.context(MissingWals)?;
+ let builder = meta_event_service::Builder {
+ cluster: v.clone(),
+ instance: instance.clone(),
+ runtime: runtimes.meta_runtime.clone(),
+ opened_wals,
+ };
+ meta_rpc_server =
Some(MetaEventServiceServer::new(builder.build()));
+
+ // Support remote compaction rpc service.
+ let compaction_runner =
+
self.compaction_runner.context(MissingCompactionRunner)?;
+ let builder = compaction_service::Builder {
+ runtime: runtimes.compact_runtime.clone(),
+ compaction_runner,
+ };
+ compaction_rpc_server =
Some(CompactionServiceServer::new(builder.build()));
+
+ Ok(())
+ })();
+ result
+ })
+ .transpose()?;
let remote_engine_server = {
let query_dedup = self
@@ -349,6 +391,7 @@ impl Builder {
Ok(RpcServices {
serve_addr,
rpc_server,
+ compaction_rpc_server,
meta_rpc_server,
remote_engine_server,
runtime,
diff --git a/src/server/src/server.rs b/src/server/src/server.rs
index f7cd72ec..bca6c8d1 100644
--- a/src/server/src/server.rs
+++ b/src/server/src/server.rs
@@ -19,6 +19,7 @@
use std::sync::Arc;
+use analytic_engine::compaction::runner::CompactionRunnerRef;
use catalog::manager::ManagerRef;
use cluster::ClusterRef;
use datafusion::execution::{runtime_env::RuntimeConfig, FunctionRegistry};
@@ -251,6 +252,7 @@ pub struct Builder {
opened_wals: Option<OpenedWals>,
remote_engine: Option<RemoteEngineRef>,
datatfusion_context: Option<DatafusionContext>,
+ compaction_runner: Option<CompactionRunnerRef>,
}
impl Builder {
@@ -274,6 +276,7 @@ impl Builder {
opened_wals: None,
remote_engine: None,
datatfusion_context: None,
+ compaction_runner: None,
}
}
@@ -368,6 +371,11 @@ impl Builder {
self
}
+ pub fn compaction_runner(mut self, runner: CompactionRunnerRef) -> Self {
+ self.compaction_runner = Some(runner);
+ self
+ }
+
/// Build and run the server
pub fn build(self) -> Result<Server> {
// Build instance
@@ -527,6 +535,7 @@ impl Builder {
.proxy(proxy)
.hotspot_recorder(hotspot_recorder)
.query_dedup(self.server_config.query_dedup)
+ .compaction_runner(self.compaction_runner.clone())
.build()
.context(BuildGrpcService)?;
diff --git a/src/table_engine/src/predicate.rs
b/src/table_engine/src/predicate.rs
index b316b99e..3a3294fc 100644
--- a/src/table_engine/src/predicate.rs
+++ b/src/table_engine/src/predicate.rs
@@ -112,7 +112,7 @@ impl Predicate {
impl TryFrom<&Predicate> for horaedbproto::remote_engine::Predicate {
type Error = Error;
- fn try_from(predicate: &Predicate) -> std::result::Result<Self,
Self::Error> {
+ fn try_from(predicate: &Predicate) -> Result<Self> {
let time_range = predicate.time_range;
let mut exprs = Vec::with_capacity(predicate.exprs.len());
for expr in &predicate.exprs {
@@ -135,9 +135,7 @@ impl TryFrom<&Predicate> for
horaedbproto::remote_engine::Predicate {
impl TryFrom<horaedbproto::remote_engine::Predicate> for Predicate {
type Error = Error;
- fn try_from(
- pb: horaedbproto::remote_engine::Predicate,
- ) -> std::result::Result<Self, Self::Error> {
+ fn try_from(pb: horaedbproto::remote_engine::Predicate) -> Result<Self> {
let time_range = pb.time_range.context(EmptyTimeRange)?;
let mut exprs = Vec::with_capacity(pb.exprs.len());
for pb_expr in pb.exprs {
diff --git a/src/table_engine/src/table.rs b/src/table_engine/src/table.rs
index 3c611b43..6526579a 100644
--- a/src/table_engine/src/table.rs
+++ b/src/table_engine/src/table.rs
@@ -307,6 +307,12 @@ impl From<u64> for TableId {
}
}
+impl From<TableId> for u64 {
+ fn from(id: TableId) -> Self {
+ id.0
+ }
+}
+
impl fmt::Display for TableId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]