This is an automated email from the ASF dual-hosted git repository.

kamille pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new e47d9ae7 feat: support horaedb submit compaction task to remote (#1563)
e47d9ae7 is described below

commit e47d9ae7cbf240d50d009317562bc4247c92b8c4
Author: Leslie Su <[email protected]>
AuthorDate: Wed Oct 30 08:48:52 2024 +0800

    feat: support horaedb submit compaction task to remote (#1563)
    
    ## Rationale
    The subtask to support compaction offloading. See #1545
    
    ## Detailed Changes
    **Compaction node support remote compaction service**
    
    - Define `CompactionServiceImpl` to support compaction rpc service.
    - Introduce `NodeType` to distinguish compaction node and horaedb node.
    Enable the deployment of compaction node.
    
    - Impl `compaction_client` for horaedb node to access remote compaction
    node.
    
    **Horaedb node support compaction offload**
    
    - Introduce `compaction_mode` in analytic engine's `Config` to determine
    whether exec compaction offload or not.
    - Define `CompactionNodePicker` trait, supporting get remote compaction
    node info.
    - Impl `RemoteCompactionRunner`, supporting pick remote node and pass
    compaction task to the node.
    - Add docs (e.g. `example-cluster-n.toml`) to explain how to deploy a
    cluster supporting compaction offload.
    
    ## Test Plan
    
    ---------
    
    Co-authored-by: kamille <[email protected]>
---
 Cargo.lock                                         |   9 +-
 Cargo.toml                                         |   2 +-
 src/analytic_engine/Cargo.toml                     |   6 +
 src/analytic_engine/src/compaction/mod.rs          |  91 +++++++-
 .../src/compaction/runner/local_runner.rs          |   1 +
 src/analytic_engine/src/compaction/runner/mod.rs   | 244 ++++++++++++++++++++-
 .../src/compaction/runner/node_picker.rs           |  88 ++++++++
 .../src/compaction/runner/remote_client.rs         | 148 +++++++++++++
 .../src/compaction/runner/remote_runner.rs         | 116 ++++++++++
 src/analytic_engine/src/instance/engine.rs         |   7 +
 .../src/instance/flush_compaction.rs               |  20 ++
 src/analytic_engine/src/instance/open.rs           |  56 ++++-
 src/analytic_engine/src/lib.rs                     |  21 +-
 src/analytic_engine/src/setup.rs                   |   6 +
 src/analytic_engine/src/sst/factory.rs             |  65 +++++-
 src/analytic_engine/src/sst/file.rs                |  98 ++++++++-
 src/analytic_engine/src/sst/writer.rs              |  95 +++++++-
 src/analytic_engine/src/table_options.rs           |  54 +++++
 src/analytic_engine/src/tests/util.rs              |   1 +
 src/benchmarks/src/util.rs                         |   1 +
 src/cluster/src/cluster_impl.rs                    |   8 +-
 src/cluster/src/config.rs                          |   3 +
 src/cluster/src/lib.rs                             |   6 +-
 src/common_types/src/cluster.rs                    |  26 +++
 src/common_types/src/lib.rs                        |   1 +
 src/horaedb/Cargo.toml                             |  53 ++---
 src/horaedb/src/config.rs                          |   4 +-
 src/horaedb/src/setup.rs                           |  16 +-
 src/meta_client/src/lib.rs                         |  14 +-
 src/meta_client/src/meta_impl.rs                   |  14 +-
 src/meta_client/src/types.rs                       |   9 +
 src/router/src/cluster_based.rs                    |   6 +-
 src/server/src/grpc/compaction_service/error.rs    |  96 ++++++++
 src/server/src/grpc/compaction_service/mod.rs      | 113 ++++++++++
 src/server/src/grpc/mod.rs                         |  65 +++++-
 src/server/src/server.rs                           |   9 +
 src/table_engine/src/predicate.rs                  |   6 +-
 src/table_engine/src/table.rs                      |   6 +
 38 files changed, 1505 insertions(+), 79 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 33ed54bc..1830a3fb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -89,6 +89,7 @@ dependencies = [
  "atomic_enum",
  "base64 0.13.1",
  "bytes_ext",
+ "cluster",
  "codec",
  "common_types",
  "datafusion",
@@ -107,6 +108,7 @@ dependencies = [
  "lru 0.7.8",
  "macros",
  "message_queue",
+ "meta_client",
  "metric_ext",
  "object_store 2.1.0",
  "parquet",
@@ -116,10 +118,12 @@ dependencies = [
  "prost 0.11.8",
  "rand 0.8.5",
  "remote_engine_client",
+ "reqwest 0.12.4",
  "router",
  "runtime",
  "sampling_cache",
  "serde",
+ "serde_json",
  "size_ext",
  "skiplist",
  "smallvec",
@@ -131,7 +135,9 @@ dependencies = [
  "thiserror",
  "time_ext",
  "tokio",
+ "tonic 0.8.3",
  "trace_metric",
+ "url",
  "wal",
  "xorfilter-rs",
 ]
@@ -3150,6 +3156,7 @@ dependencies = [
  "catalog_impls",
  "clap",
  "cluster",
+ "common_types",
  "datafusion",
  "df_operator",
  "etcd-client",
@@ -3223,7 +3230,7 @@ dependencies = [
 [[package]]
 name = "horaedbproto"
 version = "2.0.0"
-source = 
"git+https://github.com/apache/incubator-horaedb-proto.git?rev=a5874d9fedee32ab1292252c4eb6defc4f6e245a#a5874d9fedee32ab1292252c4eb6defc4f6e245a";
+source = 
"git+https://github.com/apache/incubator-horaedb-proto.git?rev=fac8564e6e3d50e51daa2af6eb905e747f3191b0#fac8564e6e3d50e51daa2af6eb905e747f3191b0";
 dependencies = [
  "prost 0.11.8",
  "protoc-bin-vendored",
diff --git a/Cargo.toml b/Cargo.toml
index d2d73fd0..b6ca6273 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -103,7 +103,7 @@ thiserror = "1"
 bytes_ext = { path = "src/components/bytes_ext" }
 catalog = { path = "src/catalog" }
 catalog_impls = { path = "src/catalog_impls" }
-horaedbproto = { git = 
"https://github.com/apache/incubator-horaedb-proto.git";, rev = 
"a5874d9fedee32ab1292252c4eb6defc4f6e245a" }
+horaedbproto = { git = 
"https://github.com/apache/incubator-horaedb-proto.git";, rev = 
"fac8564e6e3d50e51daa2af6eb905e747f3191b0" }
 codec = { path = "src/components/codec" }
 chrono = "0.4"
 clap = { version = "4.5.1", features = ["derive"] }
diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml
index 09ff47af..d6c642eb 100644
--- a/src/analytic_engine/Cargo.toml
+++ b/src/analytic_engine/Cargo.toml
@@ -49,6 +49,7 @@ async-trait = { workspace = true }
 atomic_enum = { workspace = true }
 base64 = { workspace = true }
 bytes_ext = { workspace = true }
+cluster = { workspace = true }
 codec = { workspace = true }
 common_types = { workspace = true }
 datafusion = { workspace = true }
@@ -66,6 +67,7 @@ logger = { workspace = true }
 lru = { workspace = true }
 macros = { workspace = true }
 message_queue = { workspace = true }
+meta_client = { workspace = true }
 metric_ext = { workspace = true }
 object_store = { workspace = true }
 parquet = { workspace = true }
@@ -73,10 +75,12 @@ parquet_ext = { workspace = true }
 prometheus = { workspace = true }
 prost = { workspace = true }
 remote_engine_client = { workspace = true }
+reqwest = { workspace = true }
 router = { workspace = true }
 runtime = { workspace = true }
 sampling_cache = { workspace = true }
 serde = { workspace = true }
+serde_json = { workspace = true }
 size_ext = { workspace = true }
 skiplist = { path = "../components/skiplist" }
 smallvec = { workspace = true }
@@ -87,7 +91,9 @@ tempfile = { workspace = true, optional = true }
 thiserror = { workspace = true }
 time_ext = { workspace = true }
 tokio = { workspace = true }
+tonic = { workspace = true }
 trace_metric = { workspace = true }
+url = "2.2"
 wal = { workspace = true }
 xorfilter-rs = { workspace = true }
 
diff --git a/src/analytic_engine/src/compaction/mod.rs 
b/src/analytic_engine/src/compaction/mod.rs
index 34048d6b..8f63c93e 100644
--- a/src/analytic_engine/src/compaction/mod.rs
+++ b/src/analytic_engine/src/compaction/mod.rs
@@ -20,15 +20,17 @@
 use std::{collections::HashMap, fmt, str::FromStr, sync::Arc};
 
 use common_types::COMPACTION_STRATEGY;
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
 use serde::{Deserialize, Serialize};
 use size_ext::ReadableSize;
-use snafu::{ensure, Backtrace, GenerateBacktrace, ResultExt, Snafu};
+use snafu::{ensure, Backtrace, GenerateBacktrace, OptionExt, ResultExt, Snafu};
 use time_ext::TimeUnit;
 use tokio::sync::oneshot;
 
 use crate::{
     compaction::picker::{CommonCompactionPicker, CompactionPickerRef},
-    sst::file::{FileHandle, Level},
+    sst::file::{FileHandle, FileMeta, FilePurgeQueue, Level},
     table::data::TableDataRef,
 };
 
@@ -72,8 +74,22 @@ pub enum Error {
     },
     #[snafu(display("Invalid compaction option value, err: {}", error))]
     InvalidOption { error: String, backtrace: Backtrace },
+
+    #[snafu(display("Empty file meta.\nBacktrace:\n{}", backtrace))]
+    EmptyFileMeta { backtrace: Backtrace },
+
+    #[snafu(display("Failed to convert file meta, err:{}", source))]
+    ConvertFileMeta { source: GenericError },
+
+    #[snafu(display("Empty purge queue.\nBacktrace:\n{}", backtrace))]
+    EmptyPurgeQueue { backtrace: Backtrace },
+
+    #[snafu(display("Failed to convert level, err:{}", source))]
+    ConvertLevel { source: GenericError },
 }
 
+define_result!(Error);
+
 #[derive(Debug, Clone, Copy, Deserialize, Default, PartialEq, Serialize)]
 pub enum CompactionStrategy {
     #[default]
@@ -145,7 +161,7 @@ impl CompactionStrategy {
     pub(crate) fn parse_from(
         value: &str,
         options: &HashMap<String, String>,
-    ) -> Result<CompactionStrategy, Error> {
+    ) -> Result<CompactionStrategy> {
         match value.trim().to_lowercase().as_str() {
             DEFAULT_STRATEGY => Ok(CompactionStrategy::Default),
             STC_STRATEGY => Ok(CompactionStrategy::SizeTiered(
@@ -182,7 +198,7 @@ impl CompactionStrategy {
 }
 
 impl SizeTieredCompactionOptions {
-    pub(crate) fn validate(&self) -> Result<(), Error> {
+    pub(crate) fn validate(&self) -> Result<()> {
         ensure!(
             self.bucket_high > self.bucket_low,
             InvalidOption {
@@ -215,7 +231,7 @@ impl SizeTieredCompactionOptions {
 
     pub(crate) fn parse_from(
         options: &HashMap<String, String>,
-    ) -> Result<SizeTieredCompactionOptions, Error> {
+    ) -> Result<SizeTieredCompactionOptions> {
         let mut opts = SizeTieredCompactionOptions::default();
         if let Some(v) = options.get(BUCKET_LOW_KEY) {
             opts.bucket_low = v.parse().context(ParseFloat {
@@ -278,7 +294,7 @@ impl TimeWindowCompactionOptions {
         );
     }
 
-    pub(crate) fn validate(&self) -> Result<(), Error> {
+    pub(crate) fn validate(&self) -> Result<()> {
         if !Self::valid_timestamp_unit(self.timestamp_resolution) {
             return InvalidOption {
                 error: format!(
@@ -294,7 +310,7 @@ impl TimeWindowCompactionOptions {
 
     pub(crate) fn parse_from(
         options: &HashMap<String, String>,
-    ) -> Result<TimeWindowCompactionOptions, Error> {
+    ) -> Result<TimeWindowCompactionOptions> {
         let mut opts = TimeWindowCompactionOptions {
             size_tiered: SizeTieredCompactionOptions::parse_from(options)?,
             ..Default::default()
@@ -326,6 +342,67 @@ pub struct CompactionInputFiles {
     pub output_level: Level,
 }
 
+impl TryFrom<horaedbproto::compaction_service::CompactionInputFiles> for 
CompactionInputFiles {
+    type Error = Error;
+
+    fn try_from(value: horaedbproto::compaction_service::CompactionInputFiles) 
-> Result<Self> {
+        let level: Level = 
value.level.try_into().box_err().context(ConvertLevel)?;
+        let output_level: Level = value
+            .output_level
+            .try_into()
+            .box_err()
+            .context(ConvertLevel)?;
+
+        let mut files: Vec<FileHandle> = Vec::with_capacity(value.files.len());
+        for file in value.files {
+            let meta: FileMeta = file
+                .meta
+                .context(EmptyFileMeta)?
+                .try_into()
+                .box_err()
+                .context(ConvertFileMeta)?;
+
+            let purge_queue: FilePurgeQueue = 
file.purge_queue.context(EmptyPurgeQueue)?.into();
+
+            files.push({
+                let handle = FileHandle::new(meta, purge_queue);
+                handle.set_being_compacted(file.being_compacted);
+                handle
+            });
+        }
+
+        Ok(CompactionInputFiles {
+            level,
+            files,
+            output_level,
+        })
+    }
+}
+
+impl From<CompactionInputFiles> for 
horaedbproto::compaction_service::CompactionInputFiles {
+    fn from(value: CompactionInputFiles) -> Self {
+        let mut files = Vec::with_capacity(value.files.len());
+        for file in value.files {
+            let handle = horaedbproto::compaction_service::FileHandle {
+                meta: Some(file.meta().into()),
+                purge_queue: 
Some(horaedbproto::compaction_service::FilePurgeQueue {
+                    space_id: file.space_id(),
+                    table_id: file.table_id().into(),
+                }),
+                being_compacted: file.being_compacted(),
+                metrics: Some(horaedbproto::compaction_service::SstMetrics {}),
+            };
+            files.push(handle);
+        }
+
+        Self {
+            level: value.level.as_u32(),
+            files,
+            output_level: value.output_level.as_u32(),
+        }
+    }
+}
+
 #[derive(Debug, Default, Clone)]
 pub struct ExpiredFiles {
     /// Level of the expired files.
diff --git a/src/analytic_engine/src/compaction/runner/local_runner.rs 
b/src/analytic_engine/src/compaction/runner/local_runner.rs
index fc34b2bf..e379d785 100644
--- a/src/analytic_engine/src/compaction/runner/local_runner.rs
+++ b/src/analytic_engine/src/compaction/runner/local_runner.rs
@@ -45,6 +45,7 @@ use crate::{
 const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;
 
 /// Executor carrying for actual compaction work
+#[derive(Clone)]
 pub struct LocalCompactionRunner {
     runtime: Arc<Runtime>,
     scan_options: ScanOptions,
diff --git a/src/analytic_engine/src/compaction/runner/mod.rs 
b/src/analytic_engine/src/compaction/runner/mod.rs
index 12f333ea..c8e34484 100644
--- a/src/analytic_engine/src/compaction/runner/mod.rs
+++ b/src/analytic_engine/src/compaction/runner/mod.rs
@@ -16,17 +16,23 @@
 // under the License.
 
 pub mod local_runner;
+pub mod node_picker;
+mod remote_client;
+pub mod remote_runner;
 
 use std::sync::Arc;
 
 use async_trait::async_trait;
 use common_types::{request_id::RequestId, schema::Schema, SequenceNumber};
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
 use object_store::Path;
+use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
 use table_engine::table::TableId;
 
 use crate::{
     compaction::CompactionInputFiles,
-    instance::flush_compaction::Result,
+    instance::flush_compaction,
     row_iter::IterOptions,
     space::SpaceId,
     sst::{
@@ -39,12 +45,87 @@ use crate::{
 /// Compaction runner
 #[async_trait]
 pub trait CompactionRunner: Send + Sync + 'static {
-    async fn run(&self, task: CompactionRunnerTask) -> 
Result<CompactionRunnerResult>;
+    async fn run(
+        &self,
+        task: CompactionRunnerTask,
+    ) -> flush_compaction::Result<CompactionRunnerResult>;
 }
 
 pub type CompactionRunnerPtr = Box<dyn CompactionRunner>;
 pub type CompactionRunnerRef = Arc<dyn CompactionRunner>;
 
+#[derive(Debug, Snafu)]
+#[snafu(visibility = "pub")]
+pub enum Error {
+    #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))]
+    EmptyTableSchema { backtrace: Backtrace },
+
+    #[snafu(display("Empty input context.\nBacktrace:\n{}", backtrace))]
+    EmptyInputContext { backtrace: Backtrace },
+
+    #[snafu(display("Empty ouput context.\nBacktrace:\n{}", backtrace))]
+    EmptyOuputContext { backtrace: Backtrace },
+
+    #[snafu(display("Empty compaction input files.\nBacktrace:\n{}", 
backtrace))]
+    EmptyCompactionInputFiles { backtrace: Backtrace },
+
+    #[snafu(display("Empty write options.\nBacktrace:\n{}", backtrace))]
+    EmptySstWriteOptions { backtrace: Backtrace },
+
+    #[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))]
+    EmptySstMeta { backtrace: Backtrace },
+
+    #[snafu(display("Empty sst info.\nBacktrace:\n{}", backtrace))]
+    EmptySstInfo { backtrace: Backtrace },
+
+    #[snafu(display("Empty compaction task exec result.\nBacktrace:\n{}", 
backtrace))]
+    EmptyExecResult { backtrace: Backtrace },
+
+    #[snafu(display("Failed to convert table schema, err:{}", source))]
+    ConvertTableSchema { source: GenericError },
+
+    #[snafu(display("Failed to convert input context, err:{}", source))]
+    ConvertInputContext { source: GenericError },
+
+    #[snafu(display("Failed to convert ouput context, err:{}", source))]
+    ConvertOuputContext { source: GenericError },
+
+    #[snafu(display("Failed to convert compaction input files, err:{}", 
source))]
+    ConvertCompactionInputFiles { source: GenericError },
+
+    #[snafu(display("Failed to convert write options, err:{}", source))]
+    ConvertSstWriteOptions { source: GenericError },
+
+    #[snafu(display("Failed to convert sst info, err:{}", source))]
+    ConvertSstInfo { source: GenericError },
+
+    #[snafu(display("Failed to convert sst meta, err:{}", source))]
+    ConvertSstMeta { source: GenericError },
+
+    #[snafu(display("Failed to connect the service endpoint:{}, err:{}", addr, 
source,))]
+    FailConnect { addr: String, source: GenericError },
+
+    #[snafu(display("Failed to execute compaction task, err:{}", source))]
+    FailExecuteCompactionTask { source: GenericError },
+
+    #[snafu(display("Missing header in rpc response.\nBacktrace:\n{}", 
backtrace))]
+    MissingHeader { backtrace: Backtrace },
+
+    #[snafu(display(
+        "Bad response, resp code:{}, msg:{}.\nBacktrace:\n{}",
+        code,
+        msg,
+        backtrace
+    ))]
+    BadResponse {
+        code: u32,
+        msg: String,
+        backtrace: Backtrace,
+    },
+}
+
+define_result!(Error);
+
 /// Compaction runner task
 #[derive(Debug, Clone)]
 pub struct CompactionRunnerTask {
@@ -113,12 +194,106 @@ impl CompactionRunnerTask {
     }
 }
 
+impl TryFrom<horaedbproto::compaction_service::ExecuteCompactionTaskRequest>
+    for CompactionRunnerTask
+{
+    type Error = Error;
+
+    fn try_from(
+        request: 
horaedbproto::compaction_service::ExecuteCompactionTaskRequest,
+    ) -> Result<Self> {
+        let task_key = request.task_key;
+        let request_id: RequestId = request.request_id.into();
+
+        let schema: Schema = request
+            .schema
+            .context(EmptyTableSchema)?
+            .try_into()
+            .box_err()
+            .context(ConvertTableSchema)?;
+
+        let space_id: SpaceId = request.space_id;
+        let table_id: TableId = request.table_id.into();
+        let sequence: SequenceNumber = request.sequence;
+
+        let input_ctx: InputContext = request
+            .input_ctx
+            .context(EmptyInputContext)?
+            .try_into()
+            .box_err()
+            .context(ConvertInputContext)?;
+
+        let output_ctx: OutputContext = request
+            .output_ctx
+            .context(EmptyOuputContext)?
+            .try_into()
+            .box_err()
+            .context(ConvertOuputContext)?;
+
+        Ok(Self {
+            task_key,
+            request_id,
+            schema,
+            space_id,
+            table_id,
+            sequence,
+            input_ctx,
+            output_ctx,
+        })
+    }
+}
+
+impl From<CompactionRunnerTask> for 
horaedbproto::compaction_service::ExecuteCompactionTaskRequest {
+    fn from(task: CompactionRunnerTask) -> Self {
+        Self {
+            task_key: task.task_key,
+            request_id: task.request_id.into(),
+            schema: Some((&(task.schema)).into()),
+            space_id: task.space_id,
+            table_id: task.table_id.into(),
+            sequence: task.sequence,
+            input_ctx: Some(task.input_ctx.into()),
+            output_ctx: Some(task.output_ctx.into()),
+        }
+    }
+}
+
 pub struct CompactionRunnerResult {
     pub output_file_path: Path,
     pub sst_info: SstInfo,
     pub sst_meta: MetaData,
 }
 
+impl TryFrom<horaedbproto::compaction_service::ExecuteCompactionTaskResponse>
+    for CompactionRunnerResult
+{
+    type Error = Error;
+
+    fn try_from(
+        resp: horaedbproto::compaction_service::ExecuteCompactionTaskResponse,
+    ) -> Result<Self> {
+        let res = resp.result.context(EmptyExecResult)?;
+        let sst_info = res
+            .sst_info
+            .context(EmptySstInfo)?
+            .try_into()
+            .box_err()
+            .context(ConvertSstInfo)?;
+        let sst_meta = res
+            .sst_meta
+            .context(EmptySstMeta)?
+            .try_into()
+            .box_err()
+            .context(ConvertSstMeta)?;
+
+        Ok(Self {
+            output_file_path: res.output_file_path.into(),
+            sst_info,
+            sst_meta,
+        })
+    }
+}
+
 #[derive(Debug, Clone)]
 pub struct InputContext {
     /// Input sst files in this compaction
@@ -128,6 +303,43 @@ pub struct InputContext {
     pub need_dedup: bool,
 }
 
+impl TryFrom<horaedbproto::compaction_service::InputContext> for InputContext {
+    type Error = Error;
+
+    fn try_from(value: horaedbproto::compaction_service::InputContext) -> 
Result<Self> {
+        let num_rows_per_row_group: usize = value.num_rows_per_row_group as 
usize;
+        let merge_iter_options = IterOptions {
+            batch_size: value.merge_iter_options as usize,
+        };
+        let need_dedup = value.need_dedup;
+
+        let files: CompactionInputFiles = value
+            .files
+            .context(EmptyCompactionInputFiles)?
+            .try_into()
+            .box_err()
+            .context(ConvertCompactionInputFiles)?;
+
+        Ok(InputContext {
+            files,
+            num_rows_per_row_group,
+            merge_iter_options,
+            need_dedup,
+        })
+    }
+}
+
+impl From<InputContext> for horaedbproto::compaction_service::InputContext {
+    fn from(value: InputContext) -> Self {
+        Self {
+            files: Some(value.files.into()),
+            num_rows_per_row_group: value.num_rows_per_row_group as u64,
+            merge_iter_options: value.merge_iter_options.batch_size as u64,
+            need_dedup: value.need_dedup,
+        }
+    }
+}
+
 #[derive(Debug, Clone)]
 pub struct OutputContext {
     /// Output sst file path
@@ -135,3 +347,31 @@ pub struct OutputContext {
     /// Output sst write context
     pub write_options: SstWriteOptions,
 }
+
+impl TryFrom<horaedbproto::compaction_service::OutputContext> for 
OutputContext {
+    type Error = Error;
+
+    fn try_from(value: horaedbproto::compaction_service::OutputContext) -> 
Result<Self> {
+        let file_path: Path = value.file_path.into();
+        let write_options: SstWriteOptions = value
+            .write_options
+            .context(EmptySstWriteOptions)?
+            .try_into()
+            .box_err()
+            .context(ConvertSstWriteOptions)?;
+
+        Ok(OutputContext {
+            file_path,
+            write_options,
+        })
+    }
+}
+
+impl From<OutputContext> for horaedbproto::compaction_service::OutputContext {
+    fn from(value: OutputContext) -> Self {
+        Self {
+            file_path: value.file_path.into(),
+            write_options: Some(value.write_options.into()),
+        }
+    }
+}
diff --git a/src/analytic_engine/src/compaction/runner/node_picker.rs 
b/src/analytic_engine/src/compaction/runner/node_picker.rs
new file mode 100644
index 00000000..bf21787c
--- /dev/null
+++ b/src/analytic_engine/src/compaction/runner/node_picker.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Remote compaction node picker.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use macros::define_result;
+use meta_client::{types::FetchCompactionNodeRequest, MetaClientRef};
+use serde::{Deserialize, Serialize};
+use snafu::{ResultExt, Snafu};
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+#[serde(tag = "node_picker", content = "endpoint")]
+pub enum NodePicker {
+    // Local node picker that specifies the local endpoint.
+    // The endpoint in the form `addr:port`.
+    Local(String),
+    Remote,
+}
+
+#[async_trait]
+pub trait CompactionNodePicker: Send + Sync {
+    /// Get the addr of the remote compaction node.
+    async fn get_compaction_node(&self) -> Result<String>;
+}
+
+pub type RemoteCompactionNodePickerRef = Arc<dyn CompactionNodePicker>;
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Meta client fetch compaciton node failed, 
err:{source}."))]
+    FetchCompactionNodeFailure { source: meta_client::Error },
+}
+
+define_result!(Error);
+
+/// RemoteCompactionNodePickerImpl is an implementation of
+/// [`CompactionNodePicker`] based [`MetaClient`].
+pub struct RemoteCompactionNodePickerImpl {
+    pub meta_client: MetaClientRef,
+}
+
+#[async_trait]
+impl CompactionNodePicker for RemoteCompactionNodePickerImpl {
+    /// Get proper remote compaction node info for compaction offload with meta
+    /// client.
+    async fn get_compaction_node(&self) -> Result<String> {
+        let req = FetchCompactionNodeRequest::default();
+        let resp = self
+            .meta_client
+            .fetch_compaction_node(req)
+            .await
+            .context(FetchCompactionNodeFailure)?;
+
+        let compaction_node_addr = resp.endpoint;
+        Ok(compaction_node_addr)
+    }
+}
+
+/// LocalCompactionNodePickerImpl is an implementation of
+/// [`CompactionNodePicker`] mainly used for testing.
+pub struct LocalCompactionNodePickerImpl {
+    pub endpoint: String,
+}
+
+#[async_trait]
+impl CompactionNodePicker for LocalCompactionNodePickerImpl {
+    /// Return the local addr and port of grpc service.
+    async fn get_compaction_node(&self) -> Result<String> {
+        Ok(self.endpoint.clone())
+    }
+}
diff --git a/src/analytic_engine/src/compaction/runner/remote_client.rs 
b/src/analytic_engine/src/compaction/runner/remote_client.rs
new file mode 100644
index 00000000..cf1f69be
--- /dev/null
+++ b/src/analytic_engine/src/compaction/runner/remote_client.rs
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use generic_error::BoxError;
+use horaedbproto::{
+    common::ResponseHeader, 
compaction_service::compaction_service_client::CompactionServiceClient,
+};
+use logger::info;
+use serde::{Deserialize, Serialize};
+use snafu::{OptionExt, ResultExt};
+use time_ext::ReadableDuration;
+
+use crate::compaction::runner::{
+    BadResponse, FailConnect, FailExecuteCompactionTask, MissingHeader, Result,
+};
+
+type CompactionServiceGrpcClient = 
CompactionServiceClient<tonic::transport::Channel>;
+
+#[derive(Debug, Deserialize, Clone, Serialize)]
+#[serde(default)]
+pub struct CompactionClientConfig {
+    pub compaction_server_addr: String,
+    pub timeout: ReadableDuration,
+}
+
+impl Default for CompactionClientConfig {
+    fn default() -> Self {
+        Self {
+            compaction_server_addr: "127.0.0.1:7878".to_string(),
+            timeout: ReadableDuration::secs(5),
+        }
+    }
+}
+
+/// CompactionClient is the abstraction of client used for HoraeDB to
+/// communicate with CompactionServer cluster.
+#[async_trait]
+pub trait CompactionClient: Send + Sync {
+    async fn execute_compaction_task(
+        &self,
+        req: horaedbproto::compaction_service::ExecuteCompactionTaskRequest,
+    ) -> 
Result<horaedbproto::compaction_service::ExecuteCompactionTaskResponse>;
+}
+
+pub type CompactionClientRef = Arc<dyn CompactionClient>;
+
+/// Default compaction client impl, will interact with the remote compaction
+/// node.
+pub struct CompactionClientImpl {
+    client: CompactionServiceGrpcClient,
+}
+
+impl CompactionClientImpl {
+    pub async fn connect(config: CompactionClientConfig) -> Result<Self> {
+        let client = {
+            let endpoint =
+                
tonic::transport::Endpoint::from_shared(config.compaction_server_addr.to_string())
+                    .box_err()
+                    .context(FailConnect {
+                        addr: &config.compaction_server_addr,
+                    })?
+                    .timeout(config.timeout.0);
+            CompactionServiceGrpcClient::connect(endpoint)
+                .await
+                .box_err()
+                .context(FailConnect {
+                    addr: &config.compaction_server_addr,
+                })?
+        };
+
+        Ok(Self { client })
+    }
+
+    #[inline]
+    fn client(&self) -> CompactionServiceGrpcClient {
+        self.client.clone()
+    }
+}
+
+#[async_trait]
+impl CompactionClient for CompactionClientImpl {
+    async fn execute_compaction_task(
+        &self,
+        pb_req: horaedbproto::compaction_service::ExecuteCompactionTaskRequest,
+    ) -> 
Result<horaedbproto::compaction_service::ExecuteCompactionTaskResponse> {
+        // TODO(leslie): Add request header for ExecuteCompactionTaskRequest.
+
+        info!(
+            "Compaction client try to execute compaction task in remote 
compaction node, req:{:?}",
+            pb_req
+        );
+
+        let pb_resp = self
+            .client()
+            .execute_compaction_task(pb_req)
+            .await
+            .box_err()
+            .context(FailExecuteCompactionTask)?
+            .into_inner();
+
+        info!(
+            "Compaction client finish executing compaction task in remote 
compaction node, req:{:?}",
+            pb_resp
+        );
+
+        check_response_header(&pb_resp.header)?;
+        Ok(pb_resp)
+    }
+}
+
+// TODO(leslie): Consider to refactor and reuse the similar function in
+// meta_client.
+fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
+    let header = header.as_ref().context(MissingHeader)?;
+    if header.code == 0 {
+        Ok(())
+    } else {
+        BadResponse {
+            code: header.code,
+            msg: header.error.clone(),
+        }
+        .fail()
+    }
+}
+
+pub async fn build_compaction_client(
+    config: CompactionClientConfig,
+) -> Result<CompactionClientRef> {
+    let compaction_client = CompactionClientImpl::connect(config).await?;
+    Ok(Arc::new(compaction_client))
+}
diff --git a/src/analytic_engine/src/compaction/runner/remote_runner.rs 
b/src/analytic_engine/src/compaction/runner/remote_runner.rs
new file mode 100644
index 00000000..59a70c2f
--- /dev/null
+++ b/src/analytic_engine/src/compaction/runner/remote_runner.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use generic_error::BoxError;
+use logger::info;
+use snafu::ResultExt;
+
+use super::{local_runner::LocalCompactionRunner, 
node_picker::RemoteCompactionNodePickerRef};
+use crate::{
+    compaction::runner::{
+        remote_client::{build_compaction_client, CompactionClientConfig, 
CompactionClientRef},
+        CompactionRunner, CompactionRunnerResult, CompactionRunnerTask,
+    },
+    instance::flush_compaction::{
+        self, BuildCompactionClientFailed, ConvertCompactionTaskResponse,
+        GetCompactionClientFailed, PickCompactionNodeFailed, Result,
+    },
+};
+
+pub struct RemoteCompactionRunner {
+    pub node_picker: RemoteCompactionNodePickerRef,
+
+    pub fallback_local_when_failed: bool,
+    /// Responsible for executing compaction task locally if fail to remote
+    /// compact when `fallback_local_when_failed` is true, used for better 
fault
+    /// tolerance.
+    pub local_compaction_runner: LocalCompactionRunner,
+}
+
+impl RemoteCompactionRunner {
+    async fn get_compaction_client(&self) -> Result<CompactionClientRef> {
+        let mut config = CompactionClientConfig::default();
+        let endpoint = self
+            .node_picker
+            .get_compaction_node()
+            .await
+            .context(PickCompactionNodeFailed)?;
+        config.compaction_server_addr = make_formatted_endpoint(&endpoint);
+
+        let client = build_compaction_client(config)
+            .await
+            .context(BuildCompactionClientFailed)?;
+        Ok(client)
+    }
+
+    async fn local_compact(&self, task: CompactionRunnerTask) -> 
Result<CompactionRunnerResult> {
+        self.local_compaction_runner.run(task).await
+    }
+}
+
+#[async_trait]
+impl CompactionRunner for RemoteCompactionRunner {
+    /// Run the compaction task either on a remote node or fall back to local
+    /// compaction.
+    async fn run(&self, task: CompactionRunnerTask) -> 
Result<CompactionRunnerResult> {
+        let client = self
+            .get_compaction_client()
+            .await
+            .box_err()
+            .context(GetCompactionClientFailed);
+
+        let pb_resp = match client {
+            Ok(client) => match 
client.execute_compaction_task(task.clone().into()).await {
+                Ok(resp) => resp,
+                Err(e) => {
+                    if !self.fallback_local_when_failed {
+                        return 
Err(flush_compaction::Error::RemoteCompactFailed { source: e });
+                    }
+
+                    info!(
+                        "The compaction task falls back to local because of 
error:{}",
+                        e
+                    );
+                    return self.local_compact(task).await;
+                }
+            },
+            Err(e) => {
+                if !self.fallback_local_when_failed {
+                    return Err(e);
+                }
+
+                info!(
+                    "The compaction task falls back to local because of 
error:{}",
+                    e
+                );
+                return self.local_compact(task).await;
+            }
+        };
+
+        let resp = pb_resp
+            .try_into()
+            .box_err()
+            .context(ConvertCompactionTaskResponse)?;
+
+        Ok(resp)
+    }
+}
+
+fn make_formatted_endpoint(endpoint: &str) -> String {
+    format!("http://{endpoint}";)
+}
diff --git a/src/analytic_engine/src/instance/engine.rs 
b/src/analytic_engine/src/instance/engine.rs
index 8c29ab1c..537b8331 100644
--- a/src/analytic_engine/src/instance/engine.rs
+++ b/src/analytic_engine/src/instance/engine.rs
@@ -259,6 +259,12 @@ pub enum Error {
         sequence: SequenceNumber,
         source: wal::manager::Error,
     },
+
+    #[snafu(display(
+        "Failed to find meta client to construct remote compaction 
runner.\nBacktrace:\n{}",
+        backtrace
+    ))]
+    MetaClientNotExist { backtrace: Backtrace },
 }
 
 define_result!(Error);
@@ -293,6 +299,7 @@ impl From<Error> for table_engine::engine::Error {
             | Error::DoManifestSnapshot { .. }
             | Error::OpenManifest { .. }
             | Error::TableNotExist { .. }
+            | Error::MetaClientNotExist { .. }
             | Error::OpenTablesOfShard { .. }
             | Error::ReplayWalNoCause { .. }
             | Error::PurgeWal { .. }
diff --git a/src/analytic_engine/src/instance/flush_compaction.rs 
b/src/analytic_engine/src/instance/flush_compaction.rs
index da1647eb..9deceff5 100644
--- a/src/analytic_engine/src/instance/flush_compaction.rs
+++ b/src/analytic_engine/src/instance/flush_compaction.rs
@@ -41,6 +41,7 @@ use tokio::{sync::oneshot, time::Instant};
 use wal::manager::WalLocation;
 
 use crate::{
+    compaction::runner::node_picker,
     instance::{
         self, reorder_memtable::Reorder, serial_executor::TableFlushScheduler, 
SpaceStoreRef,
     },
@@ -158,6 +159,25 @@ pub enum Error {
 
     #[snafu(display("Failed to alloc file id, err:{}", source))]
     AllocFileId { source: data::Error },
+
+    #[snafu(display("Failed to convert compaction task response, err:{}", 
source))]
+    ConvertCompactionTaskResponse { source: GenericError },
+
+    #[snafu(display("Failed to pick remote compaction node, err:{}", source))]
+    PickCompactionNodeFailed { source: node_picker::Error },
+
+    #[snafu(display("Failed to build compaction client, err:{}", source))]
+    BuildCompactionClientFailed {
+        source: crate::compaction::runner::Error,
+    },
+
+    #[snafu(display("Failed to get compaction client, err:{}", source))]
+    GetCompactionClientFailed { source: GenericError },
+
+    #[snafu(display("Failed to execute compaction task remotely, err:{}", 
source))]
+    RemoteCompactFailed {
+        source: crate::compaction::runner::Error,
+    },
 }
 
 define_result!(Error);
diff --git a/src/analytic_engine/src/instance/open.rs 
b/src/analytic_engine/src/instance/open.rs
index 220fa84c..97717c5a 100644
--- a/src/analytic_engine/src/instance/open.rs
+++ b/src/analytic_engine/src/instance/open.rs
@@ -24,20 +24,28 @@ use std::{
 
 use common_types::table::ShardId;
 use logger::{error, info};
+use meta_client::MetaClientRef;
 use object_store::ObjectStoreRef;
-use snafu::ResultExt;
+use snafu::{OptionExt, ResultExt};
 use table_engine::{engine::TableDef, table::TableId};
 use wal::manager::WalManagerRef;
 
 use crate::{
     compaction::{
-        runner::{local_runner::LocalCompactionRunner, CompactionRunnerPtr, 
CompactionRunnerRef},
+        runner::{
+            local_runner::LocalCompactionRunner,
+            node_picker::{
+                LocalCompactionNodePickerImpl, NodePicker, 
RemoteCompactionNodePickerImpl,
+            },
+            remote_runner::RemoteCompactionRunner,
+            CompactionRunnerPtr, CompactionRunnerRef,
+        },
         scheduler::SchedulerImpl,
     },
     context::OpenContext,
     engine,
     instance::{
-        engine::{OpenManifest, OpenTablesOfShard, ReadMetaUpdate, Result},
+        engine::{MetaClientNotExist, OpenManifest, OpenTablesOfShard, 
ReadMetaUpdate, Result},
         flush_compaction::Flusher,
         mem_collector::MemUsageCollector,
         wal_replayer::{ReplayMode, WalReplayer},
@@ -52,7 +60,7 @@ use crate::{
     },
     table::data::{TableCatalogInfo, TableDataRef},
     table_meta_set_impl::TableMetaSetImpl,
-    RecoverMode,
+    CompactionMode, RecoverMode,
 };
 
 pub(crate) struct InstanceContext {
@@ -68,14 +76,48 @@ impl InstanceContext {
         wal_manager: WalManagerRef,
         store_picker: ObjectStorePickerRef,
         sst_factory: SstFactoryRef,
+        meta_client: Option<MetaClientRef>,
     ) -> Result<Self> {
-        let compaction_runner = Box::new(LocalCompactionRunner::new(
+        info!(
+            "Construct compaction runner with compaction_mode:{:?}",
+            ctx.config.compaction_mode
+        );
+
+        let local_compaction_runner = LocalCompactionRunner::new(
             ctx.runtimes.compact_runtime.clone(),
             &ctx.config,
             sst_factory.clone(),
             store_picker.clone(),
             ctx.meta_cache.clone(),
-        ));
+        );
+
+        let compaction_runner: CompactionRunnerPtr = match 
&ctx.config.compaction_mode {
+            CompactionMode::Offload(NodePicker::Local(endpoint)) => {
+                Box::new(RemoteCompactionRunner {
+                    node_picker: Arc::new(LocalCompactionNodePickerImpl {
+                        endpoint: endpoint.clone(),
+                    }),
+                    // This field is set to false here for testing.
+                    fallback_local_when_failed: false,
+                    local_compaction_runner: local_compaction_runner.clone(),
+                })
+            }
+            CompactionMode::Offload(NodePicker::Remote) => 
Box::new(RemoteCompactionRunner {
+                node_picker: Arc::new(RemoteCompactionNodePickerImpl {
+                    meta_client: meta_client.context(MetaClientNotExist)?,
+                }),
+                fallback_local_when_failed: true,
+                local_compaction_runner: local_compaction_runner.clone(),
+            }),
+
+            CompactionMode::Local => Box::new(LocalCompactionRunner::new(
+                ctx.runtimes.compact_runtime.clone(),
+                &ctx.config,
+                sst_factory.clone(),
+                store_picker.clone(),
+                ctx.meta_cache.clone(),
+            )),
+        };
 
         let instance = Instance::open(
             ctx,
@@ -89,7 +131,7 @@ impl InstanceContext {
 
         Ok(Self {
             instance,
-            local_compaction_runner: None,
+            local_compaction_runner: Some(Arc::new(local_compaction_runner)),
         })
     }
 }
diff --git a/src/analytic_engine/src/lib.rs b/src/analytic_engine/src/lib.rs
index 4b80741f..687bcf63 100644
--- a/src/analytic_engine/src/lib.rs
+++ b/src/analytic_engine/src/lib.rs
@@ -19,7 +19,7 @@
 
 #![feature(option_get_or_insert_default)]
 
-mod compaction;
+pub mod compaction;
 mod context;
 mod engine;
 pub mod error;
@@ -40,6 +40,7 @@ pub mod table_meta_set_impl;
 #[cfg(any(test, feature = "test"))]
 pub mod tests;
 
+use compaction::runner::node_picker::NodePicker;
 use error::ErrorKind;
 use manifest::details::Options as ManifestOptions;
 use object_store::config::StorageOptions;
@@ -54,6 +55,20 @@ pub use crate::{
     table_options::TableOptions,
 };
 
+/// The compaction mode decides compaction offload or not.
+///
+/// [CompactionMode::Offload] means offload the compaction task
+/// to a local or remote node.
+///
+/// [CompactionMode::Local] means local compaction, no offloading.
+#[derive(Clone, Default, Debug, Deserialize, Serialize)]
+#[serde(tag = "compaction_mode")]
+pub enum CompactionMode {
+    #[default]
+    Local,
+    Offload(NodePicker),
+}
+
 /// Config of analytic engine
 #[derive(Debug, Clone, Deserialize, Serialize)]
 #[serde(default)]
@@ -77,6 +92,9 @@ pub struct Config {
 
     pub compaction: SchedulerConfig,
 
+    /// Offload the compaction task or not.
+    pub compaction_mode: CompactionMode,
+
     /// sst meta cache capacity
     pub sst_meta_cache_cap: Option<usize>,
     /// sst data cache capacity
@@ -187,6 +205,7 @@ impl Default for Config {
             table_opts: TableOptions::default(),
             try_compat_old_layered_memtable_opts: false,
             compaction: SchedulerConfig::default(),
+            compaction_mode: CompactionMode::Local,
             sst_meta_cache_cap: Some(1000),
             sst_data_cache_cap: Some(1000),
             manifest: ManifestOptions::default(),
diff --git a/src/analytic_engine/src/setup.rs b/src/analytic_engine/src/setup.rs
index ee167729..4075e250 100644
--- a/src/analytic_engine/src/setup.rs
+++ b/src/analytic_engine/src/setup.rs
@@ -21,6 +21,7 @@ use std::{num::NonZeroUsize, path::Path, pin::Pin, sync::Arc};
 
 use futures::Future;
 use macros::define_result;
+use meta_client::MetaClientRef;
 use object_store::{
     aliyun,
     config::{ObjectStoreOptions, StorageOptions},
@@ -96,6 +97,8 @@ pub struct EngineBuilder<'a> {
     pub config: &'a Config,
     pub engine_runtimes: Arc<EngineRuntimes>,
     pub opened_wals: OpenedWals,
+    // Meta client is needed when compaction offload with remote node picker.
+    pub meta_client: Option<MetaClientRef>,
 }
 
 impl<'a> EngineBuilder<'a> {
@@ -116,6 +119,7 @@ impl<'a> EngineBuilder<'a> {
             self.opened_wals.data_wal,
             manifest_storages,
             Arc::new(opened_storages),
+            self.meta_client,
         )
         .await?;
 
@@ -134,6 +138,7 @@ async fn build_instance_context(
     wal_manager: WalManagerRef,
     manifest_storages: ManifestStorages,
     store_picker: ObjectStorePickerRef,
+    meta_client: Option<MetaClientRef>,
 ) -> Result<InstanceContext> {
     let meta_cache: Option<MetaCacheRef> = config
         .sst_meta_cache_cap
@@ -151,6 +156,7 @@ async fn build_instance_context(
         wal_manager,
         store_picker,
         Arc::new(FactoryImpl),
+        meta_client.clone(),
     )
     .await
     .context(OpenInstance)?;
diff --git a/src/analytic_engine/src/sst/factory.rs 
b/src/analytic_engine/src/sst/factory.rs
index 2ddeb246..1f17b8df 100644
--- a/src/analytic_engine/src/sst/factory.rs
+++ b/src/analytic_engine/src/sst/factory.rs
@@ -21,10 +21,11 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc};
 
 use async_trait::async_trait;
 use common_types::projected_schema::RowProjectorBuilder;
+use generic_error::{BoxError, GenericError};
 use macros::define_result;
 use object_store::{ObjectStoreRef, Path};
 use runtime::Runtime;
-use snafu::{ResultExt, Snafu};
+use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
 use table_engine::predicate::PredicateRef;
 use trace_metric::MetricsCollector;
 
@@ -50,6 +51,15 @@ use crate::{
 pub enum Error {
     #[snafu(display("Failed to parse sst header, err:{}", source,))]
     ParseHeader { source: header::Error },
+
+    #[snafu(display("Empty storage format hint.\nBacktrace:\n{}", backtrace))]
+    EmptyStorageFormatHint { backtrace: Backtrace },
+
+    #[snafu(display("Failed to convert storage format hint, err:{}", source))]
+    ConvertStorageFormatHint { source: GenericError },
+
+    #[snafu(display("Failed to convert compression, err:{}", source))]
+    ConvertCompression { source: GenericError },
 }
 
 define_result!(Error);
@@ -164,6 +174,59 @@ pub struct SstWriteOptions {
     pub column_stats: HashMap<String, ColumnStats>,
 }
 
+impl TryFrom<horaedbproto::compaction_service::SstWriteOptions> for 
SstWriteOptions {
+    type Error = Error;
+
+    fn try_from(value: horaedbproto::compaction_service::SstWriteOptions) -> 
Result<Self> {
+        let storage_format_hint: StorageFormatHint = value
+            .storage_format_hint
+            .context(EmptyStorageFormatHint)?
+            .try_into()
+            .box_err()
+            .context(ConvertStorageFormatHint)?;
+
+        let num_rows_per_row_group = value.num_rows_per_row_group as usize;
+        let compression: Compression = value
+            .compression
+            .try_into()
+            .box_err()
+            .context(ConvertCompression)?;
+        let max_buffer_size = value.max_buffer_size as usize;
+
+        let column_stats: HashMap<String, ColumnStats> = value
+            .column_stats
+            .into_iter()
+            .map(|(k, v)| (k, ColumnStats { low_cardinality: v }))
+            .collect();
+
+        Ok(SstWriteOptions {
+            storage_format_hint,
+            num_rows_per_row_group,
+            compression,
+            max_buffer_size,
+            column_stats,
+        })
+    }
+}
+
+impl From<SstWriteOptions> for 
horaedbproto::compaction_service::SstWriteOptions {
+    fn from(value: SstWriteOptions) -> Self {
+        let column_stats = value
+            .column_stats
+            .into_iter()
+            .map(|(k, v)| (k, v.low_cardinality))
+            .collect();
+
+        Self {
+            storage_format_hint: Some(value.storage_format_hint.into()),
+            num_rows_per_row_group: value.num_rows_per_row_group as u64,
+            compression: value.compression.into(),
+            max_buffer_size: value.max_buffer_size as u64,
+            column_stats,
+        }
+    }
+}
+
 impl From<&ColumnStats> for ColumnEncoding {
     fn from(value: &ColumnStats) -> Self {
         ColumnEncoding {
diff --git a/src/analytic_engine/src/sst/file.rs 
b/src/analytic_engine/src/sst/file.rs
index 39cdc7c7..a6cc336a 100644
--- a/src/analytic_engine/src/sst/file.rs
+++ b/src/analytic_engine/src/sst/file.rs
@@ -35,12 +35,13 @@ use common_types::{
     SequenceNumber,
 };
 use future_ext::{retry_async, BackoffConfig, RetryConfig};
+use generic_error::{BoxError, GenericError};
 use logger::{error, info, trace, warn};
 use macros::define_result;
 use metric_ext::Meter;
 use object_store::{ObjectStoreRef, Path};
 use runtime::{JoinHandle, Runtime};
-use snafu::{ResultExt, Snafu};
+use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
 use table_engine::table::TableId;
 use tokio::sync::{
     mpsc::{self, UnboundedReceiver, UnboundedSender},
@@ -54,6 +55,18 @@ use crate::{space::SpaceId, sst::manager::FileId, 
table::sst_util, table_options
 pub enum Error {
     #[snafu(display("Failed to join purger, err:{}", source))]
     StopPurger { source: runtime::Error },
+
+    #[snafu(display("Empty time range.\nBacktrace:\n{}", backtrace))]
+    EmptyTimeRange { backtrace: Backtrace },
+
+    #[snafu(display("Failed to convert time range, err:{}", source))]
+    ConvertTimeRange { source: GenericError },
+
+    #[snafu(display("Failed to convert storage format, err:{}", source))]
+    ConvertStorageFormat { source: GenericError },
+
+    #[snafu(display("Converted overflow, err:{}", source))]
+    ConvertOverflow { source: GenericError },
 }
 
 define_result!(Error);
@@ -95,6 +108,15 @@ impl From<u16> for Level {
     }
 }
 
+impl TryFrom<u32> for Level {
+    type Error = Error;
+
+    fn try_from(value: u32) -> Result<Self> {
+        let value: u16 = value.try_into().box_err().context(ConvertOverflow)?;
+        Ok(value.into())
+    }
+}
+
 impl fmt::Display for Level {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         write!(f, "{}", self.0)
@@ -197,6 +219,16 @@ impl FileHandle {
         }
     }
 
+    #[inline]
+    pub fn space_id(&self) -> SpaceId {
+        self.inner.purge_queue.space_id()
+    }
+
+    #[inline]
+    pub fn table_id(&self) -> TableId {
+        self.inner.purge_queue.table_id()
+    }
+
     #[inline]
     pub fn read_meter(&self) -> Arc<Meter> {
         self.inner.metrics.read_meter.clone()
@@ -460,6 +492,53 @@ impl FileMeta {
     }
 }
 
+impl TryFrom<horaedbproto::compaction_service::FileMeta> for FileMeta {
+    type Error = Error;
+
+    fn try_from(value: horaedbproto::compaction_service::FileMeta) -> 
Result<Self> {
+        let time_range: TimeRange = value
+            .time_range
+            .context(EmptyTimeRange)?
+            .try_into()
+            .box_err()
+            .context(ConvertTimeRange)?;
+
+        let storage_format: StorageFormat = value
+            .storage_format
+            .try_into()
+            .box_err()
+            .context(ConvertStorageFormat)?;
+        let mut associated_files: Vec<String> = 
Vec::with_capacity(value.associated_files.len());
+        for file in value.associated_files {
+            associated_files.push(file);
+        }
+
+        Ok(FileMeta {
+            id: value.file_id,
+            size: value.size,
+            row_num: value.row_num,
+            time_range,
+            max_seq: value.max_seq,
+            storage_format,
+            associated_files,
+        })
+    }
+}
+
+impl From<FileMeta> for horaedbproto::compaction_service::FileMeta {
+    fn from(value: FileMeta) -> Self {
+        Self {
+            file_id: value.id,
+            max_seq: value.max_seq,
+            time_range: Some(value.time_range.into()),
+            size: value.size,
+            row_num: value.row_num,
+            storage_format: value.storage_format.into(),
+            associated_files: value.associated_files,
+        }
+    }
+}
+
 // Queue to store files to be deleted for a table.
 #[derive(Clone)]
 pub struct FilePurgeQueue {
@@ -508,6 +587,23 @@ impl FilePurgeQueue {
             );
         }
     }
+
+    #[inline]
+    pub fn space_id(&self) -> SpaceId {
+        self.inner.space_id
+    }
+
+    #[inline]
+    pub fn table_id(&self) -> TableId {
+        self.inner.table_id
+    }
+}
+
+impl From<horaedbproto::compaction_service::FilePurgeQueue> for FilePurgeQueue 
{
+    fn from(value: horaedbproto::compaction_service::FilePurgeQueue) -> Self {
+        let (tx, _rx) = mpsc::unbounded_channel();
+        FilePurgeQueue::new(value.space_id, value.table_id.into(), tx)
+    }
 }
 
 struct FilePurgeQueueInner {
diff --git a/src/analytic_engine/src/sst/writer.rs 
b/src/analytic_engine/src/sst/writer.rs
index e424e8af..577f4993 100644
--- a/src/analytic_engine/src/sst/writer.rs
+++ b/src/analytic_engine/src/sst/writer.rs
@@ -26,7 +26,8 @@ use common_types::{
     SequenceNumber,
 };
 use futures::Stream;
-use generic_error::GenericError;
+use generic_error::{BoxError, GenericError};
+use snafu::{OptionExt, ResultExt};
 
 use crate::table_options::StorageFormat;
 
@@ -96,6 +97,21 @@ pub mod error {
 
         #[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg, 
backtrace))]
         OtherNoCause { msg: String, backtrace: Backtrace },
+
+        #[snafu(display("Empty time range.\nBacktrace:\n{}", backtrace))]
+        EmptyTimeRange { backtrace: Backtrace },
+
+        #[snafu(display("Empty schema.\nBacktrace:\n{}", backtrace))]
+        EmptySchema { backtrace: Backtrace },
+
+        #[snafu(display("Failed to convert time range, err:{}", source))]
+        ConvertTimeRange { source: GenericError },
+
+        #[snafu(display("Failed to convert sst info, err:{}", source))]
+        ConvertSstInfo { source: GenericError },
+
+        #[snafu(display("Failed to convert schema, err:{}", source))]
+        ConvertSchema { source: GenericError },
     }
 
     define_result!(Error);
@@ -117,6 +133,44 @@ pub struct SstInfo {
     pub time_range: TimeRange,
 }
 
+impl TryFrom<horaedbproto::compaction_service::SstInfo> for SstInfo {
+    type Error = Error;
+
+    fn try_from(value: horaedbproto::compaction_service::SstInfo) -> 
Result<Self> {
+        let storage_format = value
+            .storage_format
+            .try_into()
+            .box_err()
+            .context(ConvertSstInfo)?;
+        let time_range = value
+            .time_range
+            .context(EmptyTimeRange)?
+            .try_into()
+            .box_err()
+            .context(ConvertTimeRange)?;
+
+        Ok(Self {
+            file_size: value.file_size as usize,
+            row_num: value.row_num as usize,
+            storage_format,
+            meta_path: value.meta_path,
+            time_range,
+        })
+    }
+}
+
+impl From<SstInfo> for horaedbproto::compaction_service::SstInfo {
+    fn from(value: SstInfo) -> Self {
+        Self {
+            file_size: value.file_size as u64,
+            row_num: value.row_num as u64,
+            storage_format: value.storage_format.into(),
+            meta_path: value.meta_path,
+            time_range: Some(value.time_range.into()),
+        }
+    }
+}
+
 #[derive(Debug, Clone)]
 pub struct MetaData {
     /// Min key of the sst.
@@ -131,6 +185,45 @@ pub struct MetaData {
     pub schema: Schema,
 }
 
+impl TryFrom<horaedbproto::compaction_service::MetaData> for MetaData {
+    type Error = Error;
+
+    fn try_from(meta: horaedbproto::compaction_service::MetaData) -> 
Result<Self> {
+        let time_range = meta
+            .time_range
+            .context(EmptyTimeRange)?
+            .try_into()
+            .box_err()
+            .context(ConvertTimeRange)?;
+        let schema = meta
+            .schema
+            .context(EmptySchema)?
+            .try_into()
+            .box_err()
+            .context(ConvertSchema)?;
+
+        Ok(Self {
+            min_key: Bytes::from(meta.min_key),
+            max_key: Bytes::from(meta.max_key),
+            time_range,
+            max_sequence: meta.max_sequence,
+            schema,
+        })
+    }
+}
+
+impl From<MetaData> for horaedbproto::compaction_service::MetaData {
+    fn from(meta: MetaData) -> Self {
+        Self {
+            min_key: meta.min_key.to_vec(),
+            max_key: meta.max_key.to_vec(),
+            max_sequence: meta.max_sequence,
+            time_range: Some(meta.time_range.into()),
+            schema: Some((&meta.schema).into()),
+        }
+    }
+}
+
 /// The writer for sst.
 ///
 /// The caller provides a stream of [RecordBatch] and the writer takes
diff --git a/src/analytic_engine/src/table_options.rs 
b/src/analytic_engine/src/table_options.rs
index 4c1823ee..0ecabb95 100644
--- a/src/analytic_engine/src/table_options.rs
+++ b/src/analytic_engine/src/table_options.rs
@@ -130,6 +130,13 @@ pub enum Error {
     ))]
     UnknownStorageFormatHint { value: String, backtrace: Backtrace },
 
+    #[snafu(display(
+        "Unknown compression type. value:{:?}.\nBacktrace:\n{}",
+        value,
+        backtrace
+    ))]
+    UnknownCompressionType { value: i32, backtrace: Backtrace },
+
     #[snafu(display("Storage format hint is missing.\nBacktrace:\n{}", 
backtrace))]
     MissingStorageFormatHint { backtrace: Backtrace },
 
@@ -234,6 +241,33 @@ impl From<manifest_pb::Compression> for Compression {
     }
 }
 
+impl TryFrom<i32> for Compression {
+    type Error = Error;
+
+    fn try_from(compression: i32) -> Result<Self> {
+        let compression = match compression {
+            0 => Compression::Uncompressed,
+            1 => Compression::Lz4,
+            2 => Compression::Snappy,
+            3 => Compression::Zstd,
+            _ => return UnknownCompressionType { value: compression }.fail(),
+        };
+
+        Ok(compression)
+    }
+}
+
+impl From<Compression> for i32 {
+    fn from(value: Compression) -> Self {
+        match value {
+            Compression::Uncompressed => 0,
+            Compression::Lz4 => 1,
+            Compression::Snappy => 2,
+            Compression::Zstd => 3,
+        }
+    }
+}
+
 impl From<Compression> for ParquetCompression {
     fn from(compression: Compression) -> Self {
         match compression {
@@ -340,6 +374,14 @@ impl From<StorageFormat> for manifest_pb::StorageFormat {
     }
 }
 
+impl From<StorageFormat> for i32 {
+    fn from(value: StorageFormat) -> Self {
+        match value {
+            StorageFormat::Columnar => 0,
+        }
+    }
+}
+
 impl TryFrom<manifest_pb::StorageFormat> for StorageFormat {
     type Error = Error;
 
@@ -363,6 +405,18 @@ impl TryFrom<&str> for StorageFormat {
     }
 }
 
+impl TryFrom<i32> for StorageFormat {
+    type Error = Error;
+
+    fn try_from(value: i32) -> Result<Self> {
+        let format = match value {
+            0 => Self::Columnar,
+            _ => return UnknownStorageFormatType { value }.fail(),
+        };
+        Ok(format)
+    }
+}
+
 impl ToString for StorageFormat {
     fn to_string(&self) -> String {
         match self {
diff --git a/src/analytic_engine/src/tests/util.rs 
b/src/analytic_engine/src/tests/util.rs
index 8fe07106..04bc09f7 100644
--- a/src/analytic_engine/src/tests/util.rs
+++ b/src/analytic_engine/src/tests/util.rs
@@ -141,6 +141,7 @@ impl<T: WalsOpener> TestContext<T> {
             config: &self.config,
             engine_runtimes: self.runtimes.clone(),
             opened_wals: opened_wals.clone(),
+            meta_client: None,
         };
         self.opened_wals = Some(opened_wals);
 
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index a7f86f08..97c8457b 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -522,6 +522,7 @@ impl<T: WalsOpener> TestContext<T> {
             config: &self.config,
             engine_runtimes: self.runtimes.clone(),
             opened_wals: opened_wals.clone(),
+            meta_client: None,
         };
         self.opened_wals = Some(opened_wals);
 
diff --git a/src/cluster/src/cluster_impl.rs b/src/cluster/src/cluster_impl.rs
index aee54e42..d79eda04 100644
--- a/src/cluster/src/cluster_impl.rs
+++ b/src/cluster/src/cluster_impl.rs
@@ -46,8 +46,8 @@ use crate::{
     shard_set::{Shard, ShardRef, ShardSet},
     topology::ClusterTopology,
     Cluster, ClusterNodesNotFound, ClusterNodesResp, 
EtcdClientFailureWithCause,
-    InitEtcdClientConfig, InvalidArguments, MetaClientFailure, OpenShard, 
OpenShardWithCause,
-    Result, ShardNotFound, TableStatus,
+    InitEtcdClientConfig, InvalidArguments, MetaClientFailure, NodeType, 
OpenShard,
+    OpenShardWithCause, Result, ShardNotFound, TableStatus,
 };
 
 /// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
@@ -376,6 +376,10 @@ impl Cluster for ClusterImpl {
         Ok(())
     }
 
+    fn node_type(&self) -> NodeType {
+        self.config.node_type.clone()
+    }
+
     async fn open_shard(&self, shard_info: &ShardInfo) -> Result<ShardRef> {
         self.inner.open_shard(shard_info).await
     }
diff --git a/src/cluster/src/config.rs b/src/cluster/src/config.rs
index 29e0da97..d0b1c694 100644
--- a/src/cluster/src/config.rs
+++ b/src/cluster/src/config.rs
@@ -23,6 +23,8 @@ use serde::{Deserialize, Serialize};
 use table_engine::ANALYTIC_ENGINE_TYPE;
 use time_ext::ReadableDuration;
 
+use crate::NodeType;
+
 #[derive(Debug, Clone, Deserialize, Serialize)]
 #[serde(default)]
 // TODO: move this to table_engine crates
@@ -133,6 +135,7 @@ impl Default for TlsConfig {
 #[serde(default)]
 pub struct ClusterConfig {
     pub cmd_channel_buffer_size: usize,
+    pub node_type: NodeType,
     pub meta_client: MetaClientConfig,
     pub etcd_client: EtcdClientConfig,
 }
diff --git a/src/cluster/src/lib.rs b/src/cluster/src/lib.rs
index a97c945a..ddda6c46 100644
--- a/src/cluster/src/lib.rs
+++ b/src/cluster/src/lib.rs
@@ -28,7 +28,7 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use common_types::schema::SchemaName;
+use common_types::{cluster::NodeType, schema::SchemaName};
 use generic_error::GenericError;
 use macros::define_result;
 use meta_client::types::{
@@ -190,12 +190,14 @@ pub struct ClusterNodesResp {
     pub cluster_nodes: ClusterNodesRef,
 }
 
-/// Cluster manages tables and shard infos in cluster mode.
 #[async_trait]
 pub trait Cluster {
     async fn start(&self) -> Result<()>;
     async fn stop(&self) -> Result<()>;
 
+    /// Get cluster type.
+    fn node_type(&self) -> NodeType;
+
     /// Fetch related information and open shard.
     async fn open_shard(&self, shard_info: &ShardInfo) -> Result<ShardRef>;
 
diff --git a/src/common_types/src/cluster.rs b/src/common_types/src/cluster.rs
new file mode 100644
index 00000000..ad302023
--- /dev/null
+++ b/src/common_types/src/cluster.rs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+
+/// Type to distinguish different node type in cluster mode.
+#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)]
+pub enum NodeType {
+    #[default]
+    HoraeDB,
+    CompactionServer,
+}
diff --git a/src/common_types/src/lib.rs b/src/common_types/src/lib.rs
index 0b6cda17..334bd42f 100644
--- a/src/common_types/src/lib.rs
+++ b/src/common_types/src/lib.rs
@@ -18,6 +18,7 @@
 //! Contains common types
 
 pub mod bitset;
+pub mod cluster;
 pub mod column;
 pub mod column_block;
 pub mod column_schema;
diff --git a/src/horaedb/Cargo.toml b/src/horaedb/Cargo.toml
index ce505105..5a6144d3 100644
--- a/src/horaedb/Cargo.toml
+++ b/src/horaedb/Cargo.toml
@@ -38,32 +38,33 @@ wal-rocksdb = ["wal/wal-rocksdb", 
"analytic_engine/wal-rocksdb"]
 wal-local-storage = ["wal/wal-local-storage", 
"analytic_engine/wal-local-storage"]
 
 [dependencies]
-analytic_engine = { workspace = true }
-catalog         = { workspace = true }
-catalog_impls   = { workspace = true }
-clap            = { workspace = true }
-cluster         = { workspace = true }
-datafusion      = { workspace = true }
-df_operator     = { workspace = true }
-etcd-client     = { workspace = true }
-interpreters    = { workspace = true }
-logger          = { workspace = true }
-meta_client     = { workspace = true }
-moka            = { version = "0.10", features = ["future"] }
-panic_ext       = { workspace = true }
-proxy           = { workspace = true }
-query_engine    = { workspace = true }
-router          = { workspace = true }
-runtime         = { workspace = true }
-serde           = { workspace = true }
-server          = { workspace = true }
-signal-hook     = "0.3"
-size_ext        = { workspace = true }
-table_engine    = { workspace = true }
-toml            = { workspace = true }
-toml_ext        = { workspace = true }
-tracing_util    = { workspace = true }
-wal             = { workspace = true }
+analytic_engine   = { workspace = true }
+catalog           = { workspace = true }
+catalog_impls     = { workspace = true }
+clap              = { workspace = true }
+cluster           = { workspace = true }
+common_types      = { workspace = true }
+datafusion        = { workspace = true }
+df_operator       = { workspace = true }
+etcd-client       = { workspace = true }
+interpreters      = { workspace = true }
+logger            = { workspace = true }
+meta_client       = { workspace = true }
+moka              = { version = "0.10", features = ["future"] }
+panic_ext         = { workspace = true }
+proxy             = { workspace = true }
+query_engine      = { workspace = true }
+router            = { workspace = true }
+runtime           = { workspace = true }
+serde             = { workspace = true }
+server            = { workspace = true }
+signal-hook       = "0.3"
+size_ext          = { workspace = true }
+table_engine      = { workspace = true }
+toml              = { workspace = true }
+toml_ext          = { workspace = true }
+tracing_util      = { workspace = true }
+wal               = { workspace = true }
 
 [build-dependencies]
 vergen = { version = "8", default-features = false, features = [
diff --git a/src/horaedb/src/config.rs b/src/horaedb/src/config.rs
index b9f8932f..e7f19233 100644
--- a/src/horaedb/src/config.rs
+++ b/src/horaedb/src/config.rs
@@ -26,8 +26,8 @@ use size_ext::ReadableSize;
 #[derive(Clone, Debug, Deserialize, Serialize)]
 #[serde(default)]
 pub struct NodeInfo {
-    /// The address of the horaedb node. It can be a domain name or an IP
-    /// address without port followed.
+    /// The address of the horaedb (or compaction server) node. It can be a
+    /// domain name or an IP address without port followed.
     pub addr: String,
     pub zone: String,
     pub idc: String,
diff --git a/src/horaedb/src/setup.rs b/src/horaedb/src/setup.rs
index 9bdb46da..33632b55 100644
--- a/src/horaedb/src/setup.rs
+++ b/src/horaedb/src/setup.rs
@@ -313,6 +313,7 @@ async fn build_with_meta<T: WalsOpener>(
         zone: config.node.zone.clone(),
         idc: config.node.idc.clone(),
         binary_version: config.node.binary_version.clone(),
+        node_type: cluster_config.node_type.clone(),
     };
 
     info!("Build horaedb with node meta info:{node_meta_info:?}");
@@ -349,8 +350,12 @@ async fn build_with_meta<T: WalsOpener>(
         config: &config.analytic,
         engine_runtimes: runtimes.clone(),
         opened_wals: opened_wals.clone(),
+        meta_client: Some(meta_client.clone()),
     };
-    let TableEngineContext { table_engine, .. } = engine_builder
+    let TableEngineContext {
+        table_engine,
+        local_compaction_runner,
+    } = engine_builder
         .build()
         .await
         .expect("Failed to setup analytic engine");
@@ -368,14 +373,18 @@ async fn build_with_meta<T: WalsOpener>(
     let table_manipulator = 
Arc::new(meta_based::TableManipulatorImpl::new(meta_client));
 
     let schema_config_provider = 
Arc::new(ClusterBasedProvider::new(cluster.clone()));
-    builder
+
+    let mut builder = builder
         .table_engine(engine_proxy)
         .catalog_manager(catalog_manager)
         .table_manipulator(table_manipulator)
         .cluster(cluster)
         .opened_wals(opened_wals)
         .router(router)
-        .schema_config_provider(schema_config_provider)
+        .schema_config_provider(schema_config_provider);
+    builder = builder.compaction_runner(local_compaction_runner.expect("Empty 
compaction runner."));
+
+    builder
 }
 
 async fn build_without_meta<T: WalsOpener>(
@@ -394,6 +403,7 @@ async fn build_without_meta<T: WalsOpener>(
         config: &config.analytic,
         engine_runtimes: runtimes.clone(),
         opened_wals: opened_wals.clone(),
+        meta_client: None,
     };
     let TableEngineContext { table_engine, .. } = engine_builder
         .build()
diff --git a/src/meta_client/src/lib.rs b/src/meta_client/src/lib.rs
index a6cb8df6..ba933135 100644
--- a/src/meta_client/src/lib.rs
+++ b/src/meta_client/src/lib.rs
@@ -23,9 +23,9 @@ use macros::define_result;
 use snafu::{Backtrace, Snafu};
 use types::{
     AllocSchemaIdRequest, AllocSchemaIdResponse, CreateTableRequest, 
CreateTableResponse,
-    DropTableRequest, DropTableResponse, GetNodesRequest, GetNodesResponse,
-    GetTablesOfShardsRequest, GetTablesOfShardsResponse, RouteTablesRequest, 
RouteTablesResponse,
-    ShardInfo,
+    DropTableRequest, DropTableResponse, FetchCompactionNodeRequest, 
FetchCompactionNodeResponse,
+    GetNodesRequest, GetNodesResponse, GetTablesOfShardsRequest, 
GetTablesOfShardsResponse,
+    RouteTablesRequest, RouteTablesResponse, ShardInfo,
 };
 
 pub mod meta_impl;
@@ -76,6 +76,9 @@ pub enum Error {
     #[snafu(display("Failed to get tables, err:{}", source))]
     FailGetTables { source: GenericError },
 
+    #[snafu(display("Failed to fetch compaction node, err:{}", source))]
+    FailFetchCompactionNode { source: GenericError },
+
     #[snafu(display("Failed to route tables, err:{}", source))]
     FailRouteTables { source: GenericError },
 
@@ -113,6 +116,11 @@ pub trait MetaClient: Send + Sync {
 
     async fn get_nodes(&self, req: GetNodesRequest) -> 
Result<GetNodesResponse>;
 
+    async fn fetch_compaction_node(
+        &self,
+        req: FetchCompactionNodeRequest,
+    ) -> Result<FetchCompactionNodeResponse>;
+
     async fn send_heartbeat(&self, req: Vec<ShardInfo>) -> Result<()>;
 }
 
diff --git a/src/meta_client/src/meta_impl.rs b/src/meta_client/src/meta_impl.rs
index 5ba98de5..ffe32fae 100644
--- a/src/meta_client/src/meta_impl.rs
+++ b/src/meta_client/src/meta_impl.rs
@@ -31,9 +31,10 @@ use time_ext::ReadableDuration;
 use crate::{
     types::{
         AllocSchemaIdRequest, AllocSchemaIdResponse, CreateTableRequest, 
CreateTableResponse,
-        DropTableRequest, DropTableResponse, GetNodesRequest, GetNodesResponse,
-        GetTablesOfShardsRequest, GetTablesOfShardsResponse, NodeInfo, 
NodeMetaInfo, RequestHeader,
-        RouteTablesRequest, RouteTablesResponse, ShardInfo,
+        DropTableRequest, DropTableResponse, FetchCompactionNodeRequest,
+        FetchCompactionNodeResponse, GetNodesRequest, GetNodesResponse, 
GetTablesOfShardsRequest,
+        GetTablesOfShardsResponse, NodeInfo, NodeMetaInfo, RequestHeader, 
RouteTablesRequest,
+        RouteTablesResponse, ShardInfo,
     },
     BadResponse, FailAllocSchemaId, FailConnect, FailCreateTable, 
FailDropTable, FailGetTables,
     FailRouteTables, FailSendHeartbeat, MetaClient, MetaClientRef, 
MissingHeader, Result,
@@ -236,6 +237,13 @@ impl MetaClient for MetaClientImpl {
         GetNodesResponse::try_from(pb_resp)
     }
 
+    async fn fetch_compaction_node(
+        &self,
+        _req: FetchCompactionNodeRequest,
+    ) -> Result<FetchCompactionNodeResponse> {
+        todo!()
+    }
+
     async fn send_heartbeat(&self, shard_infos: Vec<ShardInfo>) -> Result<()> {
         let node_info = NodeInfo {
             node_meta_info: self.node_meta_info.clone(),
diff --git a/src/meta_client/src/types.rs b/src/meta_client/src/types.rs
index 6a6aba69..52484362 100644
--- a/src/meta_client/src/types.rs
+++ b/src/meta_client/src/types.rs
@@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt, sync::Arc};
 
 pub use common_types::table::{ShardId, ShardVersion};
 use common_types::{
+    cluster::NodeType,
     schema::{SchemaId, SchemaName},
     table::{TableId, TableName},
 };
@@ -163,6 +164,7 @@ pub struct NodeMetaInfo {
     pub zone: String,
     pub idc: String,
     pub binary_version: String,
+    pub node_type: NodeType,
 }
 
 impl NodeMetaInfo {
@@ -589,3 +591,10 @@ impl TryFrom<meta_service_pb::GetNodesResponse> for 
GetNodesResponse {
         })
     }
 }
+
+#[derive(Debug, Clone, Default)]
+pub struct FetchCompactionNodeRequest {}
+
+pub struct FetchCompactionNodeResponse {
+    pub endpoint: String,
+}
diff --git a/src/router/src/cluster_based.rs b/src/router/src/cluster_based.rs
index d9291044..bd5efd8b 100644
--- a/src/router/src/cluster_based.rs
+++ b/src/router/src/cluster_based.rs
@@ -205,7 +205,7 @@ mod tests {
         shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster, 
ClusterNodesResp,
         TableStatus,
     };
-    use common_types::table::ShardId;
+    use common_types::{cluster::NodeType, table::ShardId};
     use horaedbproto::storage::{RequestContext, RouteRequest as 
RouteRequestPb};
     use meta_client::types::{
         NodeShard, RouteEntry, RouteTablesResponse, ShardInfo, 
ShardRole::Leader, TableInfo,
@@ -226,6 +226,10 @@ mod tests {
             unimplemented!();
         }
 
+        fn node_type(&self) -> NodeType {
+            unimplemented!()
+        }
+
         async fn open_shard(&self, _: &ShardInfo) -> cluster::Result<ShardRef> 
{
             unimplemented!();
         }
diff --git a/src/server/src/grpc/compaction_service/error.rs 
b/src/server/src/grpc/compaction_service/error.rs
new file mode 100644
index 00000000..eadb3f24
--- /dev/null
+++ b/src/server/src/grpc/compaction_service/error.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Error definitions for compaction service.
+
+use generic_error::GenericError;
+use horaedbproto::common::ResponseHeader;
+use macros::define_result;
+use snafu::Snafu;
+
+use crate::error_util;
+
+define_result!(Error);
+
+#[derive(Snafu, Debug)]
+#[snafu(visibility(pub))]
+pub enum Error {
+    #[snafu(display("Server error, code:{:?}, message:{}", code, msg))]
+    ErrNoCause { code: StatusCode, msg: String },
+
+    #[snafu(display("Server error, code:{:?}, message:{}, cause:{}", code, 
msg, source))]
+    ErrWithCause {
+        code: StatusCode,
+        msg: String,
+        source: GenericError,
+    },
+}
+
+impl Error {
+    pub fn code(&self) -> StatusCode {
+        match *self {
+            Error::ErrNoCause { code, .. } => code,
+            Error::ErrWithCause { code, .. } => code,
+        }
+    }
+
+    /// Get the error message returned to the user.
+    pub fn error_message(&self) -> String {
+        match self {
+            Error::ErrNoCause { msg, .. } => msg.clone(),
+
+            Error::ErrWithCause { msg, source, .. } => {
+                let err_string = source.to_string();
+                let first_line = 
error_util::remove_backtrace_from_err(&err_string);
+                format!("{msg}. Caused by: {first_line}")
+            }
+        }
+    }
+}
+
+/// A set of codes for compaction service.
+///
+/// Note that such a set of codes is different with the codes (alias to http
+/// status code) used by storage service.
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub enum StatusCode {
+    #[default]
+    Ok = 0,
+    BadRequest = 401,
+    Internal = 500,
+}
+
+impl StatusCode {
+    #[inline]
+    pub fn as_u32(self) -> u32 {
+        self as u32
+    }
+}
+
+pub fn build_err_header(err: Error) -> ResponseHeader {
+    ResponseHeader {
+        code: err.code().as_u32(),
+        error: err.error_message(),
+    }
+}
+
+pub fn build_ok_header() -> ResponseHeader {
+    ResponseHeader {
+        code: StatusCode::Ok.as_u32(),
+        ..Default::default()
+    }
+}
diff --git a/src/server/src/grpc/compaction_service/mod.rs 
b/src/server/src/grpc/compaction_service/mod.rs
new file mode 100644
index 00000000..3954b78a
--- /dev/null
+++ b/src/server/src/grpc/compaction_service/mod.rs
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Compaction rpc service implementation.
+
+use std::sync::Arc;
+
+use analytic_engine::compaction::runner::{CompactionRunnerRef, 
CompactionRunnerTask};
+use async_trait::async_trait;
+use error::{build_err_header, build_ok_header, ErrWithCause, StatusCode};
+use generic_error::BoxError;
+use horaedbproto::compaction_service::{
+    compaction_service_server::CompactionService, ExecResult, 
ExecuteCompactionTaskRequest,
+    ExecuteCompactionTaskResponse,
+};
+use runtime::Runtime;
+use snafu::ResultExt;
+use tonic::{Request, Response, Status};
+
+mod error;
+
+/// Builder for [CompactionServiceImpl]
+pub struct Builder {
+    pub runtime: Arc<Runtime>,
+    pub compaction_runner: CompactionRunnerRef,
+}
+
+impl Builder {
+    pub fn build(self) -> CompactionServiceImpl {
+        let Self {
+            runtime,
+            compaction_runner,
+        } = self;
+
+        CompactionServiceImpl {
+            runtime,
+            compaction_runner,
+        }
+    }
+}
+
+#[derive(Clone)]
+pub struct CompactionServiceImpl {
+    pub runtime: Arc<Runtime>,
+    pub compaction_runner: CompactionRunnerRef,
+}
+
+#[async_trait]
+impl CompactionService for CompactionServiceImpl {
+    async fn execute_compaction_task(
+        &self,
+        request: Request<ExecuteCompactionTaskRequest>,
+    ) -> Result<Response<ExecuteCompactionTaskResponse>, Status> {
+        let request: Result<CompactionRunnerTask, error::Error> = request
+            .into_inner()
+            .try_into()
+            .box_err()
+            .context(ErrWithCause {
+                code: StatusCode::BadRequest,
+                msg: "fail to convert the execute compaction task request",
+            });
+
+        let mut resp: ExecuteCompactionTaskResponse = 
ExecuteCompactionTaskResponse::default();
+        match request {
+            Ok(task) => {
+                let request_id = task.request_id.clone();
+                let res = self
+                    .compaction_runner
+                    .run(task)
+                    .await
+                    .box_err()
+                    .with_context(|| ErrWithCause {
+                        code: StatusCode::Internal,
+                        msg: format!("fail to compact task, 
request:{request_id}"),
+                    });
+
+                match res {
+                    Ok(res) => {
+                        resp.header = Some(build_ok_header());
+                        resp.result = Some(ExecResult {
+                            output_file_path: res.output_file_path.into(),
+                            sst_info: Some(res.sst_info.into()),
+                            sst_meta: Some(res.sst_meta.into()),
+                        });
+                        // TODO(leslie): Add status.
+                    }
+                    Err(e) => {
+                        resp.header = Some(build_err_header(e));
+                    }
+                }
+            }
+            Err(e) => {
+                resp.header = Some(build_err_header(e));
+            }
+        }
+
+        Ok(Response::new(resp))
+    }
+}
diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs
index 7b02a3a2..24a18168 100644
--- a/src/server/src/grpc/mod.rs
+++ b/src/server/src/grpc/mod.rs
@@ -24,11 +24,14 @@ use std::{
     time::Duration,
 };
 
+use analytic_engine::compaction::runner::CompactionRunnerRef;
 use cluster::ClusterRef;
 use common_types::column_schema;
+use compaction_service::CompactionServiceImpl;
 use futures::FutureExt;
 use generic_error::GenericError;
 use horaedbproto::{
+    compaction_service::compaction_service_server::CompactionServiceServer,
     meta_event::meta_event_service_server::MetaEventServiceServer,
     remote_engine::remote_engine_service_server::RemoteEngineServiceServer,
     storage::storage_service_server::StorageServiceServer,
@@ -60,6 +63,7 @@ use crate::{
     },
 };
 
+mod compaction_service;
 mod meta_event_service;
 mod metrics;
 mod remote_engine_service;
@@ -105,6 +109,9 @@ pub enum Error {
     #[snafu(display("Missing wals.\nBacktrace:\n{}", backtrace))]
     MissingWals { backtrace: Backtrace },
 
+    #[snafu(display("Missing compaction runner.\nBacktrace:\n{}", backtrace))]
+    MissingCompactionRunner { backtrace: Backtrace },
+
     #[snafu(display("Missing timeout.\nBacktrace:\n{}", backtrace))]
     MissingTimeout { backtrace: Backtrace },
 
@@ -163,6 +170,7 @@ define_result!(Error);
 pub struct RpcServices {
     serve_addr: SocketAddr,
     rpc_server: InterceptedService<StorageServiceServer<StorageServiceImpl>, 
AuthWithFile>,
+    compaction_rpc_server: 
Option<CompactionServiceServer<CompactionServiceImpl>>,
     meta_rpc_server: Option<MetaEventServiceServer<MetaServiceImpl>>,
     remote_engine_server: RemoteEngineServiceServer<RemoteEngineServiceImpl>,
     runtime: Arc<Runtime>,
@@ -173,6 +181,7 @@ pub struct RpcServices {
 impl RpcServices {
     pub async fn start(&mut self) -> Result<()> {
         let rpc_server = self.rpc_server.clone();
+        let compaction_rpc_server = self.compaction_rpc_server.clone();
         let meta_rpc_server = self.meta_rpc_server.clone();
         let remote_engine_server = self.remote_engine_server.clone();
         let serve_addr = self.serve_addr;
@@ -182,6 +191,11 @@ impl RpcServices {
 
             let mut router = Server::builder().add_service(rpc_server);
 
+            if let Some(s) = compaction_rpc_server {
+                info!("Grpc server serves compaction service");
+                router = router.add_service(s);
+            };
+
             if let Some(s) = meta_rpc_server {
                 info!("Grpc server serves meta rpc service");
                 router = router.add_service(s);
@@ -226,6 +240,7 @@ pub struct Builder {
     proxy: Option<Arc<Proxy>>,
     query_dedup_config: Option<QueryDedupConfig>,
     hotspot_recorder: Option<Arc<HotspotRecorder>>,
+    compaction_runner: Option<CompactionRunnerRef>,
 }
 
 impl Builder {
@@ -241,6 +256,7 @@ impl Builder {
             proxy: None,
             query_dedup_config: None,
             hotspot_recorder: None,
+            compaction_runner: None,
         }
     }
 
@@ -294,6 +310,12 @@ impl Builder {
         self.query_dedup_config = Some(config);
         self
     }
+
+    // Compaction runner is an optional field for building [RpcServices].
+    pub fn compaction_runner(mut self, runner: Option<CompactionRunnerRef>) -> 
Self {
+        self.compaction_runner = runner;
+        self
+    }
 }
 
 impl Builder {
@@ -301,19 +323,39 @@ impl Builder {
         let auth = self.auth.context(MissingAuth)?;
         let runtimes = self.runtimes.context(MissingRuntimes)?;
         let instance = self.instance.context(MissingInstance)?;
-        let opened_wals = self.opened_wals.context(MissingWals)?;
         let proxy = self.proxy.context(MissingProxy)?;
         let hotspot_recorder = 
self.hotspot_recorder.context(MissingHotspotRecorder)?;
-
-        let meta_rpc_server = self.cluster.map(|v| {
-            let builder = meta_event_service::Builder {
-                cluster: v,
-                instance: instance.clone(),
-                runtime: runtimes.meta_runtime.clone(),
-                opened_wals,
-            };
-            MetaEventServiceServer::new(builder.build())
-        });
+        let mut meta_rpc_server: 
Option<MetaEventServiceServer<MetaServiceImpl>> = None;
+        let mut compaction_rpc_server: 
Option<CompactionServiceServer<CompactionServiceImpl>> =
+            None;
+
+        self.cluster
+            .map(|v| {
+                let result: Result<()> = (|| {
+                    // Support meta rpc service.
+                    let opened_wals = self.opened_wals.context(MissingWals)?;
+                    let builder = meta_event_service::Builder {
+                        cluster: v.clone(),
+                        instance: instance.clone(),
+                        runtime: runtimes.meta_runtime.clone(),
+                        opened_wals,
+                    };
+                    meta_rpc_server = 
Some(MetaEventServiceServer::new(builder.build()));
+
+                    // Support remote compaction rpc service.
+                    let compaction_runner =
+                        
self.compaction_runner.context(MissingCompactionRunner)?;
+                    let builder = compaction_service::Builder {
+                        runtime: runtimes.compact_runtime.clone(),
+                        compaction_runner,
+                    };
+                    compaction_rpc_server = 
Some(CompactionServiceServer::new(builder.build()));
+
+                    Ok(())
+                })();
+                result
+            })
+            .transpose()?;
 
         let remote_engine_server = {
             let query_dedup = self
@@ -349,6 +391,7 @@ impl Builder {
         Ok(RpcServices {
             serve_addr,
             rpc_server,
+            compaction_rpc_server,
             meta_rpc_server,
             remote_engine_server,
             runtime,
diff --git a/src/server/src/server.rs b/src/server/src/server.rs
index f7cd72ec..bca6c8d1 100644
--- a/src/server/src/server.rs
+++ b/src/server/src/server.rs
@@ -19,6 +19,7 @@
 
 use std::sync::Arc;
 
+use analytic_engine::compaction::runner::CompactionRunnerRef;
 use catalog::manager::ManagerRef;
 use cluster::ClusterRef;
 use datafusion::execution::{runtime_env::RuntimeConfig, FunctionRegistry};
@@ -251,6 +252,7 @@ pub struct Builder {
     opened_wals: Option<OpenedWals>,
     remote_engine: Option<RemoteEngineRef>,
     datatfusion_context: Option<DatafusionContext>,
+    compaction_runner: Option<CompactionRunnerRef>,
 }
 
 impl Builder {
@@ -274,6 +276,7 @@ impl Builder {
             opened_wals: None,
             remote_engine: None,
             datatfusion_context: None,
+            compaction_runner: None,
         }
     }
 
@@ -368,6 +371,11 @@ impl Builder {
         self
     }
 
+    pub fn compaction_runner(mut self, runner: CompactionRunnerRef) -> Self {
+        self.compaction_runner = Some(runner);
+        self
+    }
+
     /// Build and run the server
     pub fn build(self) -> Result<Server> {
         // Build instance
@@ -527,6 +535,7 @@ impl Builder {
             .proxy(proxy)
             .hotspot_recorder(hotspot_recorder)
             .query_dedup(self.server_config.query_dedup)
+            .compaction_runner(self.compaction_runner.clone())
             .build()
             .context(BuildGrpcService)?;
 
diff --git a/src/table_engine/src/predicate.rs 
b/src/table_engine/src/predicate.rs
index b316b99e..3a3294fc 100644
--- a/src/table_engine/src/predicate.rs
+++ b/src/table_engine/src/predicate.rs
@@ -112,7 +112,7 @@ impl Predicate {
 impl TryFrom<&Predicate> for horaedbproto::remote_engine::Predicate {
     type Error = Error;
 
-    fn try_from(predicate: &Predicate) -> std::result::Result<Self, 
Self::Error> {
+    fn try_from(predicate: &Predicate) -> Result<Self> {
         let time_range = predicate.time_range;
         let mut exprs = Vec::with_capacity(predicate.exprs.len());
         for expr in &predicate.exprs {
@@ -135,9 +135,7 @@ impl TryFrom<&Predicate> for 
horaedbproto::remote_engine::Predicate {
 impl TryFrom<horaedbproto::remote_engine::Predicate> for Predicate {
     type Error = Error;
 
-    fn try_from(
-        pb: horaedbproto::remote_engine::Predicate,
-    ) -> std::result::Result<Self, Self::Error> {
+    fn try_from(pb: horaedbproto::remote_engine::Predicate) -> Result<Self> {
         let time_range = pb.time_range.context(EmptyTimeRange)?;
         let mut exprs = Vec::with_capacity(pb.exprs.len());
         for pb_expr in pb.exprs {
diff --git a/src/table_engine/src/table.rs b/src/table_engine/src/table.rs
index 3c611b43..6526579a 100644
--- a/src/table_engine/src/table.rs
+++ b/src/table_engine/src/table.rs
@@ -307,6 +307,12 @@ impl From<u64> for TableId {
     }
 }
 
+impl From<TableId> for u64 {
+    fn from(id: TableId) -> Self {
+        id.0
+    }
+}
+
 impl fmt::Display for TableId {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         write!(f, "{}", self.0)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to