This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f00a4549 [CH] Support CACHE DATA command for MergeTree table (#6621)
6f00a4549 is described below

commit 6f00a4549654fa85a3d50d3df92ae66dd8407ebf
Author: LiuNeng <[email protected]>
AuthorDate: Sun Aug 4 13:50:46 2024 +0800

    [CH] Support CACHE DATA command for MergeTree table (#6621)
    
    [CH] Support CACHE DATA command for MergeTree table
    
    ---------
    
    Co-authored-by: liuneng1994 <[email protected]>
    Co-authored-by: Zhichao Zhang <[email protected]>
---
 backends-clickhouse/pom.xml                        |  16 +
 .../gluten/sql/parser/GlutenClickhouseSqlBase.g4   | 232 ++++++++++++
 .../gluten/parser/GlutenClickhouseSqlParser.scala  |  61 ++++
 .../org/apache/spark/sql/delta/DeltaAdapter.scala  |  10 +
 .../gluten/parser/GlutenClickhouseSqlParser.scala  |  65 ++++
 .../org/apache/spark/sql/delta/DeltaAdapter.scala  |  10 +
 .../gluten/parser/GlutenClickhouseSqlParser.scala  |  65 ++++
 .../org/apache/spark/sql/delta/DeltaAdapter.scala  |  11 +
 .../gluten/execution/CHNativeCacheManager.java}    |  11 +-
 .../clickhouse/CHSparkPlanExecApi.scala            |   7 +
 .../parser/GlutenClickhouseSqlParserBase.scala     | 276 ++++++++++++++
 .../apache/spark/rpc/GlutenDriverEndpoint.scala    |   4 +-
 .../apache/spark/rpc/GlutenExecutorEndpoint.scala  |  17 +-
 .../org/apache/spark/rpc/GlutenRpcMessages.scala   |   4 +
 .../apache/spark/sql/delta/DeltaAdapterTrait.scala |   9 +
 .../commands/GlutenCHCacheDataCommand.scala        | 287 +++++++++++++++
 .../v2/clickhouse/metadata/AddFileTags.scala       |  11 +-
 .../GlutenClickHouseMergeTreeCacheDataSSuite.scala | 401 +++++++++++++++++++++
 .../GlutenClickHouseNativeWriteTableSuite.scala    |   2 +-
 cpp-ch/local-engine/CMakeLists.txt                 |   1 +
 cpp-ch/local-engine/Common/CHUtil.cpp              |   3 +
 cpp-ch/local-engine/Common/MergeTreeTool.cpp       |  32 ++
 cpp-ch/local-engine/Common/MergeTreeTool.h         |   1 +
 cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp  |  14 +-
 cpp-ch/local-engine/Parser/MergeTreeRelParser.h    |   2 -
 .../local-engine/Storages/Cache/CacheManager.cpp   | 143 ++++++++
 cpp-ch/local-engine/Storages/Cache/CacheManager.h  |  44 +++
 .../local-engine/Storages/CustomMergeTreeSink.cpp  |  30 --
 cpp-ch/local-engine/Storages/CustomMergeTreeSink.h |  43 ---
 .../Storages/StorageMergeTreeFactory.cpp           |  27 +-
 .../Storages/StorageMergeTreeFactory.h             |  12 +-
 cpp-ch/local-engine/local_engine_jni.cpp           |  16 +
 .../gluten/backendsapi/SparkPlanExecApi.scala      |   4 +
 .../extension/OthersExtensionOverrides.scala       |   3 +
 .../extension/GlutenSessionExtensionSuite.scala    |   8 +-
 .../extension/GlutenSessionExtensionSuite.scala    |   8 +-
 .../extension/GlutenSessionExtensionSuite.scala    |   8 +-
 .../extension/GlutenSessionExtensionSuite.scala    |   8 +-
 pom.xml                                            |  10 +
 39 files changed, 1805 insertions(+), 111 deletions(-)

diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index 5672056b4..f2ec45a51 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -365,6 +365,22 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.antlr</groupId>
+        <artifactId>antlr4-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>antlr4</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <visitor>true</visitor>
+          
<sourceDirectory>../backends-clickhouse/src/main/antlr4</sourceDirectory>
+          <treatWarningsAsErrors>true</treatWarningsAsErrors>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git 
a/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4
 
b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4
new file mode 100644
index 000000000..ac4f66a4f
--- /dev/null
+++ 
b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+grammar GlutenClickhouseSqlBase;
+
+@members {
+  /**
+   * Verify whether current token is a valid decimal token (which contains 
dot).
+   * Returns true if the character that follows the token is not a digit or 
letter or underscore.
+   *
+   * For example:
+   * For char stream "2.3", "2." is not a valid decimal token, because it is 
followed by digit '3'.
+   * For char stream "2.3_", "2.3" is not a valid decimal token, because it is 
followed by '_'.
+   * For char stream "2.3W", "2.3" is not a valid decimal token, because it is 
followed by 'W'.
+   * For char stream "12.0D 34.E2+0.12 "  12.0D is a valid decimal token 
because it is folllowed
+   * by a space. 34.E2 is a valid decimal token because it is followed by 
symbol '+'
+   * which is not a digit or letter or underscore.
+   */
+  public boolean isValidDecimal() {
+    int nextChar = _input.LA(1);
+    if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= 
'9' ||
+      nextChar == '_') {
+      return false;
+    } else {
+      return true;
+    }
+  }
+}
+
+tokens {
+    DELIMITER
+}
+
+singleStatement
+    : statement ';'* EOF
+    ;
+
+statement
+    : CACHE META? DATA ASYNC? SELECT selectedColumns=selectedColumnNames
+        FROM (path=STRING | table=qualifiedName) (AFTER filter=filterClause)?
+        (CACHEPROPERTIES cacheProps=propertyList)?                             
  #cacheData
+    | .*?                                                                      
  #passThrough
+    ;
+
+qualifiedName
+    : identifier (DOT identifier)*
+    ;
+
+selectedColumnNames
+    : ASTERISK
+    | identifier (COMMA identifier)*
+    ;
+
+filterClause
+    : TIMESTAMP AS OF timestamp=STRING
+    | datepartition=identifier AS OF datetime=STRING
+    ;
+
+propertyList
+    : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+    ;
+
+property
+    : key=propertyKey (EQ? value=propertyValue)?
+    ;
+
+propertyKey
+    : identifier (DOT identifier)*
+    | stringLit
+    ;
+
+propertyValue
+    : INTEGER_VALUE
+    | DECIMAL_VALUE
+    | booleanValue
+    | identifier LEFT_PAREN stringLit COMMA stringLit RIGHT_PAREN
+    | value=stringLit
+    ;
+
+stringLit
+    : STRING
+    | DOUBLEQUOTED_STRING
+    ;
+
+booleanValue
+    : TRUE | FALSE
+    ;
+
+identifier
+    : IDENTIFIER             #unquotedIdentifier
+    | quotedIdentifier       #quotedIdentifierAlternative
+    | nonReserved            #unquotedIdentifier
+    ;
+
+quotedIdentifier
+    : BACKQUOTED_IDENTIFIER
+    ;
+
+// Add keywords here so that people's queries don't break if they have a 
column name as one of
+// these tokens
+nonReserved
+    : CACHE | META | ASYNC | DATA
+    | SELECT | FOR | AFTER | CACHEPROPERTIES
+    | TIMESTAMP | AS | OF | DATE_PARTITION
+    ;
+
+// Define how the keywords above should appear in a user's SQL statement.
+CACHE: 'CACHE';
+META: 'META';
+ASYNC: 'ASYNC';
+DATA: 'DATA';
+SELECT: 'SELECT';
+COMMA: ',';
+FOR: 'FOR';
+FROM: 'FROM';
+AFTER: 'AFTER';
+CACHEPROPERTIES: 'CACHEPROPERTIES';
+DOT: '.';
+ASTERISK: '*';
+TIMESTAMP: 'TIMESTAMP';
+AS: 'AS';
+OF: 'OF';
+DATE_PARTITION: 'DATE_PARTITION';
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+TRUE: 'TRUE';
+FALSE: 'FALSE';
+
+EQ  : '=' | '==';
+NSEQ: '<=>';
+NEQ : '<>';
+NEQJ: '!=';
+LTE : '<=' | '!>';
+GTE : '>=' | '!<';
+CONCAT_PIPE: '||';
+
+STRING
+    : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
+    | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
+    ;
+
+DOUBLEQUOTED_STRING
+    :'"' ( ~('"'|'\\') | ('\\' .) )* '"'
+    ;
+
+BIGINT_LITERAL
+    : DIGIT+ 'L'
+    ;
+
+SMALLINT_LITERAL
+    : DIGIT+ 'S'
+    ;
+
+TINYINT_LITERAL
+    : DIGIT+ 'Y'
+    ;
+
+INTEGER_VALUE
+    : DIGIT+
+    ;
+
+DECIMAL_VALUE
+    : DIGIT+ EXPONENT
+    | DECIMAL_DIGITS EXPONENT? {isValidDecimal()}?
+    ;
+
+DOUBLE_LITERAL
+    : DIGIT+ EXPONENT? 'D'
+    | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}?
+    ;
+
+BIGDECIMAL_LITERAL
+    : DIGIT+ EXPONENT? 'BD'
+    | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
+    ;
+
+IDENTIFIER
+    : (LETTER | DIGIT | '_')+
+    ;
+
+BACKQUOTED_IDENTIFIER
+    : '`' ( ~'`' | '``' )* '`'
+    ;
+
+fragment DECIMAL_DIGITS
+    : DIGIT+ '.' DIGIT*
+    | '.' DIGIT+
+    ;
+
+fragment EXPONENT
+    : 'E' [+-]? DIGIT+
+    ;
+
+fragment DIGIT
+    : [0-9]
+    ;
+
+fragment LETTER
+    : [A-Z]
+    ;
+
+SIMPLE_COMMENT
+    : '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN)
+    ;
+
+BRACKETED_COMMENT
+    : '/*' .*? '*/' -> channel(HIDDEN)
+    ;
+
+WS  : [ \r\n\t]+ -> channel(HIDDEN)
+    ;
+
+// Catch-all for anything we can't recognize.
+// We use this to be able to ignore and recover all the text
+// when splitting statements with DelimiterLexer
+UNRECOGNIZED
+    : .
+    ;
diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
new file mode 100644
index 000000000..cc4f0bd9f
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenClickhouseSqlParser(spark: SparkSession, delegate: ParserInterface)
+  extends GlutenClickhouseSqlParserBase {
+
+  override def parsePlan(sqlText: String): LogicalPlan =
+    parse(sqlText) {
+      parser =>
+        astBuilder.visit(parser.singleStatement()) match {
+          case plan: LogicalPlan => plan
+          case _ => delegate.parsePlan(sqlText)
+        }
+    }
+
+  override def parseExpression(sqlText: String): Expression = {
+    delegate.parseExpression(sqlText)
+  }
+
+  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+    delegate.parseTableIdentifier(sqlText)
+  }
+
+  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+    delegate.parseFunctionIdentifier(sqlText)
+  }
+
+  override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+    delegate.parseMultipartIdentifier(sqlText)
+  }
+
+  override def parseTableSchema(sqlText: String): StructType = {
+    delegate.parseTableSchema(sqlText)
+  }
+
+  override def parseDataType(sqlText: String): DataType = {
+    delegate.parseDataType(sqlText)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/DeltaAdapter.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/DeltaAdapter.scala
index b6d4c0484..4ffa2e841 100644
--- 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/DeltaAdapter.scala
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/DeltaAdapter.scala
@@ -15,7 +15,17 @@
  * limitations under the License.
  */
 package org.apache.spark.sql.delta
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.stats.DeltaScan
 
 object DeltaAdapter extends DeltaAdapterTrait {
   override def snapshot(deltaLog: DeltaLog): Snapshot = deltaLog.snapshot
+
+  override def snapshotFilesForScan(
+      snapshot: Snapshot,
+      projection: Seq[Attribute],
+      filters: Seq[Expression],
+      keepNumRecords: Boolean): DeltaScan = {
+    snapshot.filesForScan(projection, filters, keepNumRecords)
+  }
 }
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
new file mode 100644
index 000000000..1f2dfe007
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenClickhouseSqlParser(spark: SparkSession, delegate: ParserInterface)
+  extends GlutenClickhouseSqlParserBase {
+
+  override def parsePlan(sqlText: String): LogicalPlan =
+    parse(sqlText) {
+      parser =>
+        astBuilder.visit(parser.singleStatement()) match {
+          case plan: LogicalPlan => plan
+          case _ => delegate.parsePlan(sqlText)
+        }
+    }
+
+  override def parseExpression(sqlText: String): Expression = {
+    delegate.parseExpression(sqlText)
+  }
+
+  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+    delegate.parseTableIdentifier(sqlText)
+  }
+
+  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+    delegate.parseFunctionIdentifier(sqlText)
+  }
+
+  override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+    delegate.parseMultipartIdentifier(sqlText)
+  }
+
+  override def parseTableSchema(sqlText: String): StructType = {
+    delegate.parseTableSchema(sqlText)
+  }
+
+  override def parseDataType(sqlText: String): DataType = {
+    delegate.parseDataType(sqlText)
+  }
+
+  override def parseQuery(sqlText: String): LogicalPlan = {
+    delegate.parseQuery(sqlText)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/DeltaAdapter.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/DeltaAdapter.scala
index 8a9c5585e..58d59aa9d 100644
--- 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/DeltaAdapter.scala
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/DeltaAdapter.scala
@@ -15,7 +15,17 @@
  * limitations under the License.
  */
 package org.apache.spark.sql.delta
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.stats.DeltaScan
 
 object DeltaAdapter extends DeltaAdapterTrait {
   override def snapshot(deltaLog: DeltaLog): Snapshot = 
deltaLog.unsafeVolatileSnapshot
+
+  override def snapshotFilesForScan(
+      snapshot: Snapshot,
+      projection: Seq[Attribute],
+      filters: Seq[Expression],
+      keepNumRecords: Boolean): DeltaScan = {
+    snapshot.filesForScan(filters, keepNumRecords)
+  }
 }
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
new file mode 100644
index 000000000..1f2dfe007
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenClickhouseSqlParser(spark: SparkSession, delegate: ParserInterface)
+  extends GlutenClickhouseSqlParserBase {
+
+  override def parsePlan(sqlText: String): LogicalPlan =
+    parse(sqlText) {
+      parser =>
+        astBuilder.visit(parser.singleStatement()) match {
+          case plan: LogicalPlan => plan
+          case _ => delegate.parsePlan(sqlText)
+        }
+    }
+
+  override def parseExpression(sqlText: String): Expression = {
+    delegate.parseExpression(sqlText)
+  }
+
+  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+    delegate.parseTableIdentifier(sqlText)
+  }
+
+  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+    delegate.parseFunctionIdentifier(sqlText)
+  }
+
+  override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+    delegate.parseMultipartIdentifier(sqlText)
+  }
+
+  override def parseTableSchema(sqlText: String): StructType = {
+    delegate.parseTableSchema(sqlText)
+  }
+
+  override def parseDataType(sqlText: String): DataType = {
+    delegate.parseDataType(sqlText)
+  }
+
+  override def parseQuery(sqlText: String): LogicalPlan = {
+    delegate.parseQuery(sqlText)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaAdapter.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaAdapter.scala
index 8a9c5585e..f414ab8f2 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaAdapter.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaAdapter.scala
@@ -16,6 +16,17 @@
  */
 package org.apache.spark.sql.delta
 
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.stats.DeltaScan
+
 object DeltaAdapter extends DeltaAdapterTrait {
   override def snapshot(deltaLog: DeltaLog): Snapshot = 
deltaLog.unsafeVolatileSnapshot
+
+  override def snapshotFilesForScan(
+      snapshot: Snapshot,
+      projection: Seq[Attribute],
+      filters: Seq[Expression],
+      keepNumRecords: Boolean): DeltaScan = {
+    snapshot.filesForScan(filters, keepNumRecords)
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
similarity index 70%
copy from 
backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
copy to 
backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
index 3ea4af4ae..f5f75dc1d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
@@ -14,9 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.delta
+package org.apache.gluten.execution;
 
-trait DeltaAdapterTrait {
+import java.util.Set;
 
-  def snapshot(deltaLog: DeltaLog): Snapshot
+public class CHNativeCacheManager {
+  public static void cacheParts(String table, Set<String> columns, boolean 
async) {
+    nativeCacheParts(table, String.join(",", columns), async);
+  }
+
+  private static native void nativeCacheParts(String table, String columns, 
boolean async);
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 3069c4a3f..bba5525ed 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -25,6 +25,7 @@ import 
org.apache.gluten.extension.{CountDistinctWithoutExpand, FallbackBroadcas
 import org.apache.gluten.extension.columnar.AddFallbackTagRule
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
 import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.parser.GlutenClickhouseSqlParser
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, WindowFunctionNode}
 import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
@@ -40,6 +41,7 @@ import 
org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRew
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
CollectList, CollectSet}
 import org.apache.spark.sql.catalyst.optimizer.BuildSide
+import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
HashPartitioning, Partitioning, RangePartitioning}
@@ -612,6 +614,11 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
   override def genExtendedStrategies(): List[SparkSession => Strategy] =
     List()
 
+  override def genInjectExtendedParser()
+      : List[(SparkSession, ParserInterface) => ParserInterface] = {
+    List((spark, parserInterface) => new GlutenClickhouseSqlParser(spark, 
parserInterface))
+  }
+
   /** Define backend specfic expression mappings. */
   override def extraExpressionMappings: Seq[Sig] = {
     List(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
new file mode 100644
index 000000000..18fc102be
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.gluten.sql.parser.{GlutenClickhouseSqlBaseBaseListener, 
GlutenClickhouseSqlBaseBaseVisitor, GlutenClickhouseSqlBaseLexer, 
GlutenClickhouseSqlBaseParser}
+import org.apache.gluten.sql.parser.GlutenClickhouseSqlBaseParser._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, 
ParseException, ParserInterface}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand
+import org.apache.spark.sql.internal.VariableSubstitution
+
+import org.antlr.v4.runtime._
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.antlr.v4.runtime.tree.TerminalNodeImpl
+
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+trait GlutenClickhouseSqlParserBase extends ParserInterface {
+
+  protected val astBuilder = new GlutenClickhouseSqlAstBuilder
+  protected val substitution = new VariableSubstitution
+
+  protected def parse[T](command: String)(toResult: 
GlutenClickhouseSqlBaseParser => T): T = {
+    val lexer = new GlutenClickhouseSqlBaseLexer(
+      new 
UpperCaseCharStream(CharStreams.fromString(substitution.substitute(command))))
+    lexer.removeErrorListeners()
+    lexer.addErrorListener(ParseErrorListener)
+
+    val tokenStream = new CommonTokenStream(lexer)
+    val parser = new GlutenClickhouseSqlBaseParser(tokenStream)
+    parser.addParseListener(PostProcessor)
+    parser.removeErrorListeners()
+    parser.addErrorListener(ParseErrorListener)
+
+    try {
+      try {
+        // first, try parsing with potentially faster SLL mode
+        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
+        toResult(parser)
+      } catch {
+        case e: ParseCancellationException =>
+          // if we fail, parse with LL mode
+          tokenStream.seek(0) // rewind input stream
+          parser.reset()
+
+          // Try Again.
+          parser.getInterpreter.setPredictionMode(PredictionMode.LL)
+          toResult(parser)
+      }
+    } catch {
+      case e: ParseException if e.command.isDefined =>
+        throw e
+      case e: ParseException =>
+        throw e.withCommand(command)
+      case e: AnalysisException =>
+        val position = Origin(e.line, e.startPosition)
+        throw new ParseException(
+          command = Option(command),
+          message = e.message,
+          start = position,
+          stop = position,
+          errorClass = Some("GLUTEN_CH_PARSING_ANALYSIS_ERROR"))
+    }
+  }
+}
+
+class GlutenClickhouseSqlAstBuilder extends 
GlutenClickhouseSqlBaseBaseVisitor[AnyRef] {
+
+  import org.apache.spark.sql.catalyst.parser.ParserUtils._
+
+  /** Convert a property list into a key-value map. */
+  override def visitPropertyList(ctx: PropertyListContext): Map[String, 
String] = withOrigin(ctx) {
+    val properties = ctx.property.asScala.map {
+      property =>
+        val key = visitPropertyKey(property.key)
+        val value = visitPropertyValue(property.value)
+        key -> value
+    }
+    // Check for duplicate property names.
+    checkDuplicateKeys(properties.toSeq, ctx)
+    properties.toMap
+  }
+
+  /**
+   * A property key can either be String or a collection of dot separated 
elements. This function
+   * extracts the property key based on whether its a string literal or a 
property identifier.
+   */
+  override def visitPropertyKey(key: PropertyKeyContext): String = {
+    if (key.stringLit() != null) {
+      string(visitStringLit(key.stringLit()))
+    } else {
+      key.getText
+    }
+  }
+
+  /**
+   * A property value can be String, Integer, Boolean or Decimal. This 
function extracts the
+   * property value based on whether its a string, integer, boolean or decimal 
literal.
+   */
+  override def visitPropertyValue(value: PropertyValueContext): String = {
+    if (value == null) {
+      null
+    } else if (value.identifier != null) {
+      value.identifier.getText
+    } else if (value.value != null) {
+      string(visitStringLit(value.value))
+    } else if (value.booleanValue != null) {
+      value.getText.toLowerCase(Locale.ROOT)
+    } else {
+      value.getText
+    }
+  }
+
+  def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+    val props = visitPropertyList(ctx)
+    val badKeys = props.collect { case (key, null) => key }
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values must be specified for key(s): ${badKeys.mkString("[", ",", 
"]")}",
+        ctx)
+    }
+    props
+  }
+
+  override def visitStringLit(ctx: StringLitContext): Token = {
+    if (ctx != null) {
+      if (ctx.STRING != null) {
+        ctx.STRING.getSymbol
+      } else {
+        ctx.DOUBLEQUOTED_STRING.getSymbol
+      }
+    } else {
+      null
+    }
+  }
+
+  override def visitSingleStatement(
+      ctx: GlutenClickhouseSqlBaseParser.SingleStatementContext): AnyRef = 
withOrigin(ctx) {
+    visit(ctx.statement).asInstanceOf[LogicalPlan]
+  }
+
+  override def visitCacheData(ctx: 
GlutenClickhouseSqlBaseParser.CacheDataContext): AnyRef =
+    withOrigin(ctx) {
+      val onlyMetaCache = ctx.META != null
+      val asynExecute = ctx.ASYNC != null
+      val (tsfilter, partitionColumn, partitionValue) = if (ctx.AFTER != null) 
{
+        if (ctx.filter.TIMESTAMP != null) {
+          (Some(string(ctx.filter.timestamp)), None, None)
+        } else if (ctx.filter.datepartition != null && ctx.filter.datetime != 
null) {
+          (None, Some(ctx.filter.datepartition.getText), 
Some(string(ctx.filter.datetime)))
+        } else {
+          throw new ParseException(s"Illegal filter value ${ctx.getText}", ctx)
+        }
+      } else {
+        (None, None, None)
+      }
+      val selectedColuman = visitSelectedColumnNames(ctx.selectedColumns)
+      val tablePropertyOverrides = Option(ctx.cacheProps)
+        .map(visitPropertyKeyValues)
+        .getOrElse(Map.empty[String, String])
+
+      GlutenCHCacheDataCommand(
+        onlyMetaCache,
+        asynExecute,
+        selectedColuman,
+        Option(ctx.path).map(string),
+        Option(ctx.table).map(visitTableIdentifier),
+        tsfilter,
+        partitionColumn,
+        partitionValue,
+        tablePropertyOverrides
+      )
+    }
+
+  override def visitPassThrough(ctx: 
GlutenClickhouseSqlBaseParser.PassThroughContext): AnyRef =
+    null
+
+  protected def visitTableIdentifier(ctx: QualifiedNameContext): 
TableIdentifier = withOrigin(ctx) {
+    ctx.identifier.asScala.toSeq match {
+      case Seq(tbl) => TableIdentifier(tbl.getText)
+      case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText))
+      // TODO: Spark 3.5 supports catalog parameter
+      // case Seq(catalog, db, tbl) =>
+      //   TableIdentifier(tbl.getText, Some(db.getText), 
Some(catalog.getText))
+      case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", 
ctx)
+    }
+  }
+
+  override def visitSelectedColumnNames(ctx: SelectedColumnNamesContext): 
Option[Seq[String]] =
+    withOrigin(ctx) {
+      if (ctx != null) {
+        if (ctx.ASTERISK != null) {
+          // It means select all columns
+          None
+        } else if (ctx.identifier != null && !(ctx.identifier).isEmpty) {
+          Some(ctx.identifier.asScala.map(_.getText).toSeq)
+        } else {
+          throw new ParseException(s"Illegal selected column.", ctx)
+        }
+      } else {
+        throw new ParseException(s"Illegal selected column.", ctx)
+      }
+    }
+}
+
+case object PostProcessor extends GlutenClickhouseSqlBaseBaseListener {
+
+  /** Remove the back ticks from an Identifier. */
+  override def exitQuotedIdentifier(ctx: QuotedIdentifierContext): Unit = {
+    replaceTokenByIdentifier(ctx, 1) {
+      token =>
+        // Remove the double back ticks in the string.
+        token.setText(token.getText.replace("``", "`"))
+        token
+    }
+  }
+
+  /** Treat non-reserved keywords as Identifiers. */
+  override def exitNonReserved(ctx: NonReservedContext): Unit = {
+    replaceTokenByIdentifier(ctx, 0)(identity)
+  }
+
+  private def replaceTokenByIdentifier(ctx: ParserRuleContext, stripMargins: 
Int)(
+      f: CommonToken => CommonToken = identity): Unit = {
+    val parent = ctx.getParent
+    parent.removeLastChild()
+    val token = ctx.getChild(0).getPayload.asInstanceOf[Token]
+    val newToken = new CommonToken(
+      new org.antlr.v4.runtime.misc.Pair(token.getTokenSource, 
token.getInputStream),
+      GlutenClickhouseSqlBaseParser.IDENTIFIER,
+      token.getChannel,
+      token.getStartIndex + stripMargins,
+      token.getStopIndex - stripMargins
+    )
+    parent.addChild(new TerminalNodeImpl(f(newToken)))
+  }
+}
+
+class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
+  override def consume(): Unit = wrapped.consume
+  override def getSourceName(): String = wrapped.getSourceName
+  override def index(): Int = wrapped.index
+  override def mark(): Int = wrapped.mark
+  override def release(marker: Int): Unit = wrapped.release(marker)
+  override def seek(where: Int): Unit = wrapped.seek(where)
+  override def size(): Int = wrapped.size
+
+  override def getText(interval: Interval): String = wrapped.getText(interval)
+
+  override def LA(i: Int): Int = {
+    val la = wrapped.LA(i)
+    if (la == 0 || la == IntStream.EOF) la
+    else Character.toUpperCase(la)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
index 319381f89..a061a620d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
@@ -103,7 +103,7 @@ object GlutenDriverEndpoint extends Logging with 
RemovalListener[String, util.Se
   var glutenDriverEndpointRef: RpcEndpointRef = _
 
   // keep executorRef on memory
-  private val executorDataMap = new ConcurrentHashMap[String, ExecutorData]
+  val executorDataMap = new ConcurrentHashMap[String, ExecutorData]
 
   // If spark.scheduler.listenerbus.eventqueue.capacity is set too small,
   //   the listener may lose messages.
@@ -131,4 +131,4 @@ object GlutenDriverEndpoint extends Logging with 
RemovalListener[String, util.Se
   }
 }
 
-private class ExecutorData(val executorEndpointRef: RpcEndpointRef) {}
+class ExecutorData(val executorEndpointRef: RpcEndpointRef) {}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index f05933ef7..4d90ab653 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.rpc
 
-import org.apache.gluten.execution.CHBroadcastBuildSideCache
+import org.apache.gluten.execution.{CHBroadcastBuildSideCache, 
CHNativeCacheManager}
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.{config, Logging}
@@ -64,10 +64,25 @@ class GlutenExecutorEndpoint(val executorId: String, val 
conf: SparkConf)
         hashIds.forEach(
           resource_id => 
CHBroadcastBuildSideCache.invalidateBroadcastHashtable(resource_id))
       }
+    case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
+      CHNativeCacheManager.cacheParts(mergeTreeTable, columns, true)
 
     case e =>
       logError(s"Received unexpected message. $e")
   }
+
+  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
+    case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
+      try {
+        CHNativeCacheManager.cacheParts(mergeTreeTable, columns, false)
+        context.reply(CacheLoadResult(true))
+      } catch {
+        case _: Exception =>
+          context.reply(CacheLoadResult(false, s"executor: $executorId cache 
data failed."))
+      }
+    case e =>
+      logError(s"Received unexpected message. $e")
+  }
 }
 object GlutenExecutorEndpoint {
   var executorEndpoint: GlutenExecutorEndpoint = _
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
index 43a0b7bd4..d675d705f 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
@@ -35,4 +35,8 @@ object GlutenRpcMessages {
   case class GlutenCleanExecutionResource(executionId: String, 
broadcastHashIds: util.Set[String])
     extends GlutenRpcMessage
 
+  case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: 
util.Set[String])
+    extends GlutenRpcMessage
+
+  case class CacheLoadResult(success: Boolean, reason: String = "") extends 
GlutenRpcMessage
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
index 3ea4af4ae..6f3bb3705 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
@@ -16,7 +16,16 @@
  */
 package org.apache.spark.sql.delta
 
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.stats.DeltaScan
+
 trait DeltaAdapterTrait {
 
   def snapshot(deltaLog: DeltaLog): Snapshot
+
+  def snapshotFilesForScan(
+      snapshot: Snapshot,
+      projection: Seq[Attribute],
+      filters: Seq[Expression],
+      keepNumRecords: Boolean): DeltaScan
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
new file mode 100644
index 000000000..1e6b02406
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.commands
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.substrait.rel.ExtensionTableBuilder
+
+import org.apache.spark.affinity.CHAffinity
+import org.apache.spark.rpc.GlutenDriverEndpoint
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheLoadResult, 
GlutenMergeTreeCacheLoad}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, GreaterThanOrEqual, IsNotNull, Literal}
+import org.apache.spark.sql.delta._
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import 
org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand.toExecutorId
+import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+import org.apache.spark.sql.types.{BooleanType, StringType}
+import org.apache.spark.util.ThreadUtils
+
+import org.apache.hadoop.fs.Path
+
+import java.net.URI
+import java.util.{ArrayList => JList}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+case class GlutenCHCacheDataCommand(
+    onlyMetaCache: Boolean,
+    asynExecute: Boolean,
+    selectedColuman: Option[Seq[String]],
+    path: Option[String],
+    table: Option[TableIdentifier],
+    tsfilter: Option[String],
+    partitionColumn: Option[String],
+    partitionValue: Option[String],
+    tablePropertyOverrides: Map[String, String]
+) extends LeafRunnableCommand {
+
+  override def output: Seq[Attribute] = Seq(
+    AttributeReference("result", BooleanType, nullable = false)(),
+    AttributeReference("reason", StringType, nullable = false)())
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val pathToCache =
+      if (path.nonEmpty) {
+        new Path(path.get)
+      } else if (table.nonEmpty) {
+        DeltaTableIdentifier(sparkSession, table.get) match {
+          case Some(id) if id.path.nonEmpty =>
+            new Path(id.path.get)
+          case _ =>
+            new 
Path(sparkSession.sessionState.catalog.getTableMetadata(table.get).location)
+        }
+      } else {
+        throw DeltaErrors.missingTableIdentifierException("CACHE DATA")
+      }
+
+    val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, 
pathToCache)
+    if (baseDeltaPath.isDefined) {
+      if (baseDeltaPath.get != pathToCache) {
+        throw DeltaErrors.vacuumBasePathMissingException(baseDeltaPath.get)
+      }
+    }
+
+    val deltaLog = DeltaLog.forTable(sparkSession, pathToCache)
+    if (!deltaLog.tableExists) {
+      throw DeltaErrors.notADeltaTableException(
+        "CACHE DATA",
+        DeltaTableIdentifier(path = Some(pathToCache.toString)))
+    }
+
+    val snapshot = deltaLog.update()
+
+    require(
+      snapshot.version >= 0,
+      "No state defined for this table. Is this really " +
+        "a Delta table? Refusing to garbage collect.")
+
+    val allColumns = snapshot.dataSchema.fieldNames.toSeq
+    val selectedColumns = if (selectedColuman.nonEmpty) {
+      selectedColuman.get
+        .filter(allColumns.contains(_))
+        .map(ConverterUtils.normalizeColName)
+        .toSeq
+    } else {
+      allColumns.map(ConverterUtils.normalizeColName)
+    }
+
+    val selectedAddFiles = if (tsfilter.isDefined) {
+      val allParts = DeltaAdapter.snapshotFilesForScan(snapshot, Seq.empty, 
Seq.empty, false)
+      allParts.files.filter(_.modificationTime >= tsfilter.get.toLong).toSeq
+    } else if (partitionColumn.isDefined && partitionValue.isDefined) {
+      val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
+      require(
+        partitionColumns.contains(partitionColumn.get),
+        s"the partition column ${partitionColumn.get} is invalid.")
+      val partitionColumnField = 
snapshot.metadata.partitionSchema(partitionColumn.get)
+
+      val partitionColumnAttr = AttributeReference(
+        ConverterUtils.normalizeColName(partitionColumn.get),
+        partitionColumnField.dataType,
+        partitionColumnField.nullable)()
+      val isNotNullExpr = IsNotNull(partitionColumnAttr)
+      val greaterThanOrEqual = GreaterThanOrEqual(partitionColumnAttr, 
Literal(partitionValue.get))
+      DeltaAdapter
+        .snapshotFilesForScan(
+          snapshot,
+          Seq(partitionColumnAttr),
+          Seq(isNotNullExpr, greaterThanOrEqual),
+          false)
+        .files
+    } else {
+      DeltaAdapter.snapshotFilesForScan(snapshot, Seq.empty, Seq.empty, 
false).files
+    }
+
+    val executorIdsToAddFiles =
+      scala.collection.mutable.Map[String, ArrayBuffer[AddMergeTreeParts]]()
+    val executorIdsToParts = scala.collection.mutable.Map[String, String]()
+    executorIdsToAddFiles.put(
+      GlutenCHCacheDataCommand.ALL_EXECUTORS,
+      new ArrayBuffer[AddMergeTreeParts]())
+    selectedAddFiles.foreach(
+      addFile => {
+        val mergeTreePart = addFile.asInstanceOf[AddMergeTreeParts]
+        val partName = mergeTreePart.name
+        val tableUri = URI.create(mergeTreePart.tablePath)
+        val relativeTablePath = if (tableUri.getPath.startsWith("/")) {
+          tableUri.getPath.substring(1)
+        } else tableUri.getPath
+
+        val locations = CHAffinity.getNativeMergeTreePartLocations(partName, 
relativeTablePath)
+
+        if (locations.isEmpty) {
+          // non soft affinity
+          executorIdsToAddFiles
+            .get(GlutenCHCacheDataCommand.ALL_EXECUTORS)
+            .get
+            .append(mergeTreePart)
+        } else {
+          locations.foreach(
+            executor => {
+              if (!executorIdsToAddFiles.contains(executor)) {
+                executorIdsToAddFiles.put(executor, new 
ArrayBuffer[AddMergeTreeParts]())
+              }
+              executorIdsToAddFiles.get(executor).get.append(mergeTreePart)
+            })
+        }
+      })
+
+    executorIdsToAddFiles.foreach(
+      value => {
+        val parts = value._2
+        val executorId = value._1
+        if (parts.nonEmpty) {
+          val onePart = parts(0)
+          val partNameList = parts.map(_.name).toSeq
+          // starts and lengths is useless for write
+          val partRanges = Seq.range(0L, partNameList.length).map(_ => 
long2Long(0L)).asJava
+
+          val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
+            -1,
+            -1,
+            onePart.database,
+            onePart.table,
+            ClickhouseSnapshot.genSnapshotId(snapshot),
+            onePart.tablePath,
+            pathToCache.toString,
+            snapshot.metadata.configuration.getOrElse("orderByKey", ""),
+            snapshot.metadata.configuration.getOrElse("lowCardKey", ""),
+            snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""),
+            snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", 
""),
+            snapshot.metadata.configuration.getOrElse("setIndexKey", ""),
+            snapshot.metadata.configuration.getOrElse("primaryKey", ""),
+            partNameList.asJava,
+            partRanges,
+            partRanges,
+            ConverterUtils.convertNamedStructJson(snapshot.metadata.schema),
+            snapshot.metadata.configuration.asJava,
+            new JList[String]()
+          )
+
+          executorIdsToParts.put(executorId, 
extensionTableNode.getExtensionTableStr)
+        }
+      })
+
+    // send rpc call
+    if (executorIdsToParts.contains(GlutenCHCacheDataCommand.ALL_EXECUTORS)) {
+      // send all parts to all executors
+      val tableMessage = 
executorIdsToParts.get(GlutenCHCacheDataCommand.ALL_EXECUTORS).get
+      if (asynExecute) {
+        GlutenDriverEndpoint.executorDataMap.forEach(
+          (executorId, executor) => {
+            executor.executorEndpointRef.send(
+              GlutenMergeTreeCacheLoad(tableMessage, 
selectedColumns.toSet.asJava))
+          })
+        Seq(Row(true, ""))
+      } else {
+        val futureList = ArrayBuffer[Future[CacheLoadResult]]()
+        val resultList = ArrayBuffer[CacheLoadResult]()
+        GlutenDriverEndpoint.executorDataMap.forEach(
+          (executorId, executor) => {
+            futureList.append(
+              executor.executorEndpointRef.ask[CacheLoadResult](
+                GlutenMergeTreeCacheLoad(tableMessage, 
selectedColumns.toSet.asJava)
+              ))
+          })
+        futureList.foreach(
+          f => {
+            resultList.append(ThreadUtils.awaitResult(f, Duration.Inf))
+          })
+        if (resultList.exists(!_.success)) {
+          Seq(Row(false, 
resultList.filter(!_.success).map(_.reason).mkString(";")))
+        } else {
+          Seq(Row(true, ""))
+        }
+      }
+    } else {
+      if (asynExecute) {
+        executorIdsToParts.foreach(
+          value => {
+            val executorData = 
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
+            if (executorData != null) {
+              executorData.executorEndpointRef.send(
+                GlutenMergeTreeCacheLoad(value._2, 
selectedColumns.toSet.asJava))
+            } else {
+              throw new GlutenException(
+                s"executor ${value._1} not found," +
+                  s" all executors are 
${GlutenDriverEndpoint.executorDataMap.toString}")
+            }
+          })
+        Seq(Row(true, ""))
+      } else {
+        val futureList = ArrayBuffer[Future[CacheLoadResult]]()
+        val resultList = ArrayBuffer[CacheLoadResult]()
+        executorIdsToParts.foreach(
+          value => {
+            val executorData = 
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
+            if (executorData != null) {
+              futureList.append(
+                executorData.executorEndpointRef.ask[CacheLoadResult](
+                  GlutenMergeTreeCacheLoad(value._2, 
selectedColumns.toSet.asJava)
+                ))
+            } else {
+              throw new GlutenException(
+                s"executor ${value._1} not found," +
+                  s" all executors are 
${GlutenDriverEndpoint.executorDataMap.toString}")
+            }
+          })
+        futureList.foreach(
+          f => {
+            resultList.append(ThreadUtils.awaitResult(f, Duration.Inf))
+          })
+        if (resultList.exists(!_.success)) {
+          Seq(Row(false, 
resultList.filter(!_.success).map(_.reason).mkString(";")))
+        } else {
+          Seq(Row(true, ""))
+        }
+      }
+    }
+  }
+}
+
+object GlutenCHCacheDataCommand {
+  val ALL_EXECUTORS = "allExecutors"
+
+  private def toExecutorId(executorId: String): String =
+    executorId.split("_").last
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
index 8acc23aec..71d5c5431 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
@@ -33,7 +33,7 @@ class AddMergeTreeParts(
     val database: String,
     val table: String,
     val engine: String, // default is "MergeTree"
-    override val path: String, // table path
+    val tablePath: String, // table path
     val targetNode: String, // the node which the current part is generated
     val name: String, // part name
     val uuid: String,
@@ -98,7 +98,7 @@ object AddFileTags {
       database: String,
       table: String,
       engine: String,
-      path: String,
+      tablePath: String,
       targetNode: String,
       name: String,
       uuid: String,
@@ -125,7 +125,7 @@ object AddFileTags {
       "database" -> database,
       "table" -> table,
       "engine" -> engine,
-      "path" -> path,
+      "path" -> tablePath,
       "targetNode" -> targetNode,
       "partition" -> partition,
       "uuid" -> uuid,
@@ -161,7 +161,7 @@ object AddFileTags {
       addFile.tags.get("database").get,
       addFile.tags.get("table").get,
       addFile.tags.get("engine").get,
-      addFile.path,
+      addFile.tags.get("path").get,
       addFile.tags.get("targetNode").get,
       addFile.path,
       addFile.tags.get("uuid").get,
@@ -199,6 +199,7 @@ object AddFileTags {
         mapper.readValue(returnedMetrics, new 
TypeReference[JList[WriteReturnedMetric]]() {})
       var addFiles = new ArrayBuffer[AddFile]()
       val path = new Path(originPathStr)
+      val modificationTime = System.currentTimeMillis()
       addFiles.appendAll(values.asScala.map {
         value =>
           AddFileTags.partsInfoToAddFile(
@@ -213,7 +214,7 @@ object AddFileTags {
             value.getDiskSize,
             -1L,
             -1L,
-            -1L,
+            modificationTime,
             "",
             -1L,
             -1L,
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
new file mode 100644
index 000000000..960c92178
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+
+import java.io.File
+
+import scala.concurrent.duration.DurationInt
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeCacheDataSSuite
+  extends GlutenClickHouseTPCHAbstractSuite
+  with AdaptiveSparkPlanHelper {
+
+  override protected val needCopyParquetToTablePath = true
+
+  override protected val tablesPath: String = basePath + "/tpch-data"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
+
+  override protected def createTPCHNotNullTables(): Unit = {
+    createNotNullTPCHTablesInParquet(tablesPath)
+  }
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.io.compression.codec", "LZ4")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", 
"error")
+      .set("spark.gluten.soft-affinity.enabled", "true")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+        "false")
+  }
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    val conf = new Configuration
+    conf.set("fs.defaultFS", HDFS_URL)
+    val fs = FileSystem.get(conf)
+    fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+    FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
+    FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
+  }
+
+  def countFiles(directory: File): Int = {
+    if (directory.exists && directory.isDirectory) {
+      val files = directory.listFiles
+      val count = files
+        .count(_.isFile) + files.filter(_.isDirectory).map(countFiles).sum
+      count
+    } else {
+      0
+    }
+  }
+
+  test("test cache mergetree data sync") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (l_shipdate)
+                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |TBLPROPERTIES (storage_policy='__hdfs_main',
+                 |               orderByKey='l_linenumber,l_orderkey')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_hdfs
+                 | select * from lineitem a
+                 | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
+                 |""".stripMargin)
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+    val dataPath = new File(HDFS_CACHE_PATH)
+    val initial_cache_files = countFiles(dataPath)
+
+    val res = spark
+      .sql(s"""
+              |cache data
+              |  select l_orderkey, l_partkey from lineitem_mergetree_hdfs
+              |  after l_shipdate AS OF '1995-01-10'
+              |  CACHEPROPERTIES(storage_policy='__hdfs_main',
+              |                aaa='ccc')""".stripMargin)
+      .collect()
+    assertResult(true)(res(0).getBoolean(0))
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    assertResult(true)(metaPath.exists() && metaPath.isDirectory)
+    assertResult(22)(metaPath.list().length)
+    assert(countFiles(dataPath) > initial_cache_files)
+
+    val first_cache_files = countFiles(dataPath)
+    val res1 = spark.sql(s"cache data select * from 
lineitem_mergetree_hdfs").collect()
+    assertResult(true)(res1(0).getBoolean(0))
+    assertResult(31)(metaPath.list().length)
+    assert(countFiles(dataPath) > first_cache_files)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_hdfs
+         |WHERE
+         |    l_shipdate >= date'1995-01-10'
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runSql(sqlStr)(
+      df => {
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assertResult(1)(scanExec.size)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assertResult(7898)(addFiles.map(_.rows).sum)
+      })
+    spark.sql("drop table lineitem_mergetree_hdfs purge")
+  }
+
+  test("test cache mergetree data async") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (l_shipdate)
+                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |TBLPROPERTIES (storage_policy='__hdfs_main',
+                 |               orderByKey='l_linenumber,l_orderkey')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_hdfs
+                 | select * from lineitem a
+                 | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
+                 |""".stripMargin)
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+    val dataPath = new File(HDFS_CACHE_PATH)
+    val initial_cache_files = countFiles(dataPath)
+
+    val res = spark
+      .sql(s"""
+              |cache data async
+              |  select * from lineitem_mergetree_hdfs
+              |  after l_shipdate AS OF '1995-01-10'
+              |  CACHEPROPERTIES(storage_policy='__hdfs_main',
+              |                aaa='ccc')""".stripMargin)
+      .collect()
+    assertResult(true)(res(0).getBoolean(0))
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    assertResult(true)(metaPath.exists() && metaPath.isDirectory)
+    eventually(timeout(60.seconds), interval(2.seconds)) {
+      assertResult(22)(metaPath.list().length)
+      assert(countFiles(dataPath) > initial_cache_files)
+    }
+
+    val first_cache_files = countFiles(dataPath)
+    val res1 = spark.sql(s"cache data async select * from 
lineitem_mergetree_hdfs").collect()
+    assertResult(true)(res1(0).getBoolean(0))
+    eventually(timeout(60.seconds), interval(2.seconds)) {
+      assertResult(31)(metaPath.list().length)
+      assert(countFiles(dataPath) > first_cache_files)
+    }
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_hdfs
+         |WHERE
+         |    l_shipdate >= date'1995-01-10'
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runSql(sqlStr)(
+      df => {
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assertResult(1)(scanExec.size)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assertResult(7898)(addFiles.map(_.rows).sum)
+      })
+    spark.sql("drop table lineitem_mergetree_hdfs purge")
+  }
+
+  test("test cache mergetree data with the path") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (l_shipdate)
+                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |TBLPROPERTIES (storage_policy='__hdfs_main',
+                 |               orderByKey='l_linenumber,l_orderkey')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_hdfs
+                 | select * from lineitem a
+                 | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
+                 |""".stripMargin)
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+    val dataPath = new File(HDFS_CACHE_PATH)
+    val initial_cache_files = countFiles(dataPath)
+
+    val res = spark
+      .sql(s"""
+              |cache data
+              |  select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
+              |  after l_shipdate AS OF '1995-01-10'
+              |  CACHEPROPERTIES(storage_policy='__hdfs_main',
+              |                aaa='ccc')""".stripMargin)
+      .collect()
+    assertResult(true)(res(0).getBoolean(0))
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    assertResult(true)(metaPath.exists() && metaPath.isDirectory)
+    assertResult(22)(metaPath.list().length)
+    assert(countFiles(dataPath) > initial_cache_files)
+    val first_cache_files = countFiles(dataPath)
+    val res1 = spark.sql(s"cache data select * from 
lineitem_mergetree_hdfs").collect()
+    assertResult(true)(res1(0).getBoolean(0))
+    assertResult(31)(metaPath.list().length)
+    assert(countFiles(dataPath) > first_cache_files)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_hdfs
+         |WHERE
+         |    l_shipdate >= date'1995-01-10'
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runSql(sqlStr)(
+      df => {
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assertResult(1)(scanExec.size)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assertResult(7898)(addFiles.map(_.rows).sum)
+      })
+    spark.sql("drop table lineitem_mergetree_hdfs purge")
+  }
+}
+// scalastyle:off line.size.limit
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
index 0f642dfa8..2fec68a49 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
@@ -65,7 +65,7 @@ class GlutenClickHouseNativeWriteTableSuite
       // TODO: support default ANSI policy
       .set("spark.sql.storeAssignmentPolicy", "legacy")
       .set("spark.sql.warehouse.dir", getWarehouseDir)
-      .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", 
"debug")
+      .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", 
"error")
       .setMaster("local[1]")
   }
 
diff --git a/cpp-ch/local-engine/CMakeLists.txt 
b/cpp-ch/local-engine/CMakeLists.txt
index 93ee4b821..2bf99a494 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -53,6 +53,7 @@ add_headers_and_sources(storages Storages/Output)
 add_headers_and_sources(storages Storages/Serializations)
 add_headers_and_sources(storages Storages/IO)
 add_headers_and_sources(storages Storages/Mergetree)
+add_headers_and_sources(storages Storages/Cache)
 add_headers_and_sources(common Common)
 add_headers_and_sources(external External)
 add_headers_and_sources(shuffle Shuffle)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 88867e290..12bf7ed59 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -71,6 +71,7 @@
 #include <Common/LoggerExtend.h>
 #include <Common/logger_useful.h>
 #include <Common/typeid_cast.h>
+#include <Storages/Cache/CacheManager.h>
 
 namespace DB
 {
@@ -975,6 +976,8 @@ void BackendInitializerUtil::init(const std::string_view 
plan)
     // Init the table metadata cache map
     StorageMergeTreeFactory::init_cache_map();
 
+    CacheManager::initialize(SerializedPlanParser::global_context);
+
     std::call_once(
         init_flag,
         [&]
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp 
b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
index d3b7d7b22..31994170f 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
@@ -231,4 +231,36 @@ RangesInDataParts 
MergeTreeTable::extractRange(DataPartsVector parts_vector) con
         });
     return ranges_in_data_parts;
 }
+
+bool sameColumns(const substrait::NamedStruct & left, const 
substrait::NamedStruct & right)
+{
+    if (left.names_size() != right.names_size())
+        return false;
+    std::unordered_map<String, substrait::Type::KindCase> map;
+    for (size_t i = 0; i < left.names_size(); i++)
+        map.emplace(left.names(i), left.struct_().types(i).kind_case());
+    for (size_t i = 0; i < right.names_size(); i++)
+    {
+        if (!map.contains(right.names(i)) || map[right.names(i)] != 
right.struct_().types(i).kind_case())
+            return false;
+    }
+    return true;
+}
+
+bool MergeTreeTable::sameStructWith(const MergeTreeTable & other)
+{
+    return database == other.database &&
+        table == other.table &&
+        snapshot_id == other.snapshot_id &&
+        sameColumns(schema, other.schema) &&
+        order_by_key == other.order_by_key &&
+        low_card_key == other.low_card_key &&
+        minmax_index_key == other.minmax_index_key &&
+        bf_index_key == other.bf_index_key &&
+        set_index_key == other.set_index_key &&
+        primary_key == other.primary_key &&
+        relative_path == other.relative_path &&
+        absolute_path == other.absolute_path &&
+        table_configs.storage_policy == other.table_configs.storage_policy;
+}
 }
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.h 
b/cpp-ch/local-engine/Common/MergeTreeTool.h
index 0f0a1c1c7..fc312eba9 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.h
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.h
@@ -67,6 +67,7 @@ struct MergeTreeTable
     std::vector<MergeTreePart> parts;
     std::unordered_set<String> getPartNames() const;
     RangesInDataParts extractRange(DataPartsVector parts_vector) const;
+    bool sameStructWith(const MergeTreeTable& other);
 };
 
 std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const 
DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &);
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp 
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 967f8ba70..b1b024ce5 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -78,6 +78,7 @@ MergeTreeRelParser::parseStorage(const MergeTreeTable & 
merge_tree_table, Contex
     auto global_storage = StorageMergeTreeFactory::getStorage(
         StorageID(merge_tree_table.database, merge_tree_table.table),
         merge_tree_table.snapshot_id,
+        merge_tree_table,
         [&]() -> CustomStorageMergeTreePtr
         {
             auto custom_storage_merge_tree = 
std::make_shared<CustomStorageMergeTree>(
@@ -98,13 +99,6 @@ MergeTreeRelParser::parseStorage(const MergeTreeTable & 
merge_tree_table, Contex
     return global_storage;
 }
 
-CustomStorageMergeTreePtr
-MergeTreeRelParser::parseStorage(const substrait::ReadRel::ExtensionTable & 
extension_table, ContextMutablePtr context)
-{
-    auto merge_tree_table = parseMergeTreeTable(extension_table);
-    return parseStorage(merge_tree_table, context, true);
-}
-
 CustomStorageMergeTreePtr
 MergeTreeRelParser::copyToDefaultPolicyStorage(MergeTreeTable 
merge_tree_table, ContextMutablePtr context)
 {
@@ -131,7 +125,9 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
     DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const 
substrait::ReadRel::ExtensionTable & extension_table)
 {
     auto merge_tree_table = parseMergeTreeTable(extension_table);
-    auto storage = parseStorage(extension_table, global_context);
+    // ignore snapshot id for query
+    merge_tree_table.snapshot_id = "";
+    auto storage = parseStorage(merge_tree_table, global_context, true);
 
     DB::Block input;
     if (rel.has_base_schema() && rel.base_schema().names_size())
@@ -392,6 +388,8 @@ String MergeTreeRelParser::filterRangesOnDriver(const 
substrait::ReadRel & read_
     google::protobuf::StringValue table;
     table.ParseFromString(read_rel.advanced_extension().enhancement().value());
     auto merge_tree_table = parseMergeTreeTableString(table.value());
+    // ignore snapshot id for query
+    merge_tree_table.snapshot_id = "";
     auto custom_storage_mergetree = parseStorage(merge_tree_table, 
global_context, true);
 
     auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h 
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
index 1c9ea736c..b26239dc4 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
@@ -40,8 +40,6 @@ using namespace DB;
 class MergeTreeRelParser : public RelParser
 {
 public:
-    static CustomStorageMergeTreePtr
-    parseStorage(const substrait::ReadRel::ExtensionTable & extension_table, 
ContextMutablePtr context);
     static CustomStorageMergeTreePtr parseStorage(
         const MergeTreeTable & merge_tree_table, ContextMutablePtr context, 
bool restore = false);
 
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp 
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
new file mode 100644
index 000000000..d2c7b0681
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CacheManager.h"
+
+#include <Core/Settings.h>
+#include <Disks/IStoragePolicy.h>
+#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
+#include <Interpreters/Context.h>
+#include <Interpreters/Cache/FileCacheFactory.h>
+#include <Storages/Mergetree/MetaDataHelper.h>
+#include <Common/ThreadPool.h>
+#include <Parser/MergeTreeRelParser.h>
+#include <Processors/Executors/PipelineExecutor.h>
+#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
+#include <Processors/Sinks/NullSink.h>
+#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Common/Logger.h>
+#include <Common/logger_useful.h>
+#include <ranges>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int INVALID_STATE;
+}
+}
+
+namespace CurrentMetrics
+{
+extern const Metric LocalThread;
+extern const Metric LocalThreadActive;
+extern const Metric LocalThreadScheduled;
+}
+
+namespace local_engine
+{
+CacheManager & CacheManager::instance()
+{
+    static CacheManager cache_manager;
+    return cache_manager;
+}
+
+void CacheManager::initialize(DB::ContextMutablePtr context_)
+{
+    auto & manager = instance();
+    manager.context = context_;
+    manager.thread_pool = std::make_unique<ThreadPool>(
+        CurrentMetrics::LocalThread,
+        CurrentMetrics::LocalThreadActive,
+        CurrentMetrics::LocalThreadScheduled,
+        manager.context->getConfigRef().getInt("cache_sync_max_threads", 10),
+        0,
+        0);
+}
+
+struct CacheJobContext
+{
+    MergeTreeTable table;
+};
+
+void CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& 
part, const std::unordered_set<String> & columns, std::shared_ptr<std::latch> 
latch)
+{
+    CacheJobContext job_context{table};
+    job_context.table.parts.clear();
+    job_context.table.parts.push_back(part);
+    job_context.table.snapshot_id = "";
+    auto job = [job_detail = job_context, context = this->context, 
read_columns = columns, latch = latch]()
+    {
+        try
+        {
+            SCOPE_EXIT({ if (latch) latch->count_down();});
+            auto storage = MergeTreeRelParser::parseStorage(job_detail.table, 
context, true);
+            auto storage_snapshot = 
std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
+            NamesAndTypesList names_and_types_list;
+            auto meta_columns = storage->getInMemoryMetadata().getColumns();
+            for (const auto & column : meta_columns)
+            {
+                if (read_columns.contains(column.name))
+                    
names_and_types_list.push_back(NameAndTypePair(column.name, column.type));
+            }
+            auto query_info = buildQueryInfo(names_and_types_list);
+            std::vector<DataPartPtr> selected_parts
+                = 
StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", 
{job_detail.table.parts.front().name});
+            auto read_step = storage->reader.readFromParts(
+                selected_parts,
+                /* alter_conversions = */
+                {},
+                names_and_types_list.getNames(),
+                storage_snapshot,
+                *query_info,
+                context,
+                context->getSettingsRef().max_block_size,
+                1);
+            QueryPlan plan;
+            plan.addStep(std::move(read_step));
+            auto pipeline_builder = plan.buildQueryPipeline({}, {});
+            auto pipeline = 
QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder.get()));
+            PullingPipelineExecutor executor(pipeline);
+            while (true)
+            {
+                Chunk chunk;
+                if (!executor.pull(chunk))
+                    break;
+            }
+            LOG_INFO(getLogger("CacheManager"), "Load cache of table {}.{} 
part {} success.", job_detail.table.database, job_detail.table.table, 
job_detail.table.parts.front().name);
+        }
+        catch (std::exception& e)
+        {
+            LOG_ERROR(getLogger("CacheManager"), "Load cache of table {}.{} 
part {} failed.\n {}", job_detail.table.database, job_detail.table.table, 
job_detail.table.parts.front().name, e.what());
+        }
+    };
+    LOG_INFO(getLogger("CacheManager"), "Loading cache of table {}.{} part 
{}", job_context.table.database, job_context.table.table, 
job_context.table.parts.front().name);
+    thread_pool->scheduleOrThrowOnError(std::move(job));
+}
+
+void CacheManager::cacheParts(const String& table_def, const 
std::unordered_set<String>& columns, bool async)
+{
+    auto table = parseMergeTreeTableString(table_def);
+    std::shared_ptr<std::latch> latch = nullptr;
+    if (!async) latch = std::make_shared<std::latch>(table.parts.size());
+    for (const auto & part : table.parts)
+    {
+        cachePart(table, part, columns, latch);
+    }
+    if (latch)
+        latch->wait();
+}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h 
b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
new file mode 100644
index 000000000..a303b7b7f
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <Disks/IDisk.h>
+#include <latch>
+
+
+namespace local_engine
+{
+struct MergeTreePart;
+struct MergeTreeTable;
+/***
+ * Manage the cache of the MergeTree, mainly including meta.bin, data.bin, 
metadata.gluten
+ */
+class CacheManager {
+public:
+    static CacheManager & instance();
+    static void initialize(DB::ContextMutablePtr context);
+    void cachePart(const MergeTreeTable& table, const MergeTreePart& part, 
const std::unordered_set<String>& columns, std::shared_ptr<std::latch> latch = 
nullptr);
+    void cacheParts(const String& table_def, const std::unordered_set<String>& 
columns, bool async = true);
+private:
+    CacheManager() = default;
+
+    std::unique_ptr<ThreadPool> thread_pool;
+    DB::ContextMutablePtr context;
+    std::unordered_map<String, DB::DiskPtr> policy_to_disk;
+    std::unordered_map<DB::DiskPtr, DB::DiskPtr> disk_to_metadisk;
+    std::unordered_map<String, DB::FileCachePtr> policy_to_cache;
+};
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/CustomMergeTreeSink.cpp 
b/cpp-ch/local-engine/Storages/CustomMergeTreeSink.cpp
deleted file mode 100644
index 4ec946c94..000000000
--- a/cpp-ch/local-engine/Storages/CustomMergeTreeSink.cpp
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "CustomMergeTreeSink.h"
-
-void local_engine::CustomMergeTreeSink::consume(Chunk chunk)
-{
-    auto block = 
metadata_snapshot->getSampleBlock().cloneWithColumns(chunk.detachColumns());
-    DB::BlockWithPartition block_with_partition(Block(block), DB::Row{});
-    auto part = storage.writer.writeTempPart(block_with_partition, 
metadata_snapshot, context);
-    MergeTreeData::Transaction transaction(storage, NO_TRANSACTION_RAW);
-    {
-        auto lock = storage.lockParts();
-        storage.renameTempPartAndAdd(part.part, transaction, lock);
-        transaction.commit(&lock);
-    }
-}
diff --git a/cpp-ch/local-engine/Storages/CustomMergeTreeSink.h 
b/cpp-ch/local-engine/Storages/CustomMergeTreeSink.h
deleted file mode 100644
index acb9702c3..000000000
--- a/cpp-ch/local-engine/Storages/CustomMergeTreeSink.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <Processors/ISink.h>
-#include <Storages/MergeTree/MergeTreeDataWriter.h>
-#include <Storages/StorageInMemoryMetadata.h>
-#include "CustomStorageMergeTree.h"
-
-namespace local_engine
-{
-class CustomMergeTreeSink : public ISink
-{
-public:
-    CustomMergeTreeSink(CustomStorageMergeTree & storage_, const 
StorageMetadataPtr metadata_snapshot_, ContextPtr context_)
-        : ISink(metadata_snapshot_->getSampleBlock()), storage(storage_), 
metadata_snapshot(metadata_snapshot_), context(context_)
-    {
-    }
-
-    String getName() const override { return "CustomMergeTreeSink"; }
-    void consume(Chunk chunk) override;
-
-private:
-    CustomStorageMergeTree & storage;
-    StorageMetadataPtr metadata_snapshot;
-    ContextPtr context;
-};
-
-}
diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp 
b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
index c59d6ddb4..3f7aac872 100644
--- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
+++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
@@ -18,6 +18,9 @@
 
 #include <Common/GlutenConfig.h>
 
+#include <Common/MergeTreeTool.h>
+#include <Storages/CustomStorageMergeTree.h>
+
 namespace local_engine
 {
 
@@ -55,14 +58,24 @@ void StorageMergeTreeFactory::freeStorage(const StorageID & 
id, const String & s
     }
 }
 
+
 CustomStorageMergeTreePtr
-StorageMergeTreeFactory::getStorage(StorageID id, const String & snapshot_id, 
std::function<CustomStorageMergeTreePtr()> creator)
+StorageMergeTreeFactory::getStorage(const StorageID& id, const String & 
snapshot_id, MergeTreeTable merge_tree_table, 
std::function<CustomStorageMergeTreePtr()> creator)
 {
     auto table_name = getTableName(id, snapshot_id);
     std::lock_guard lock(storage_map_mutex);
+
+    merge_tree_table.parts.clear();
+    if (storage_map->has(table_name) && 
!storage_map->get(table_name)->second.sameStructWith(merge_tree_table))
+    {
+        freeStorage(id);
+        std::lock_guard lock_datapart(datapart_mutex);
+        if (datapart_map->has(table_name))
+            datapart_map->remove(table_name);
+    }
     if (!storage_map->has(table_name))
-        storage_map->add(table_name, creator());
-    return *(storage_map->get(table_name));
+        storage_map->add(table_name, {creator(), merge_tree_table});
+    return storage_map->get(table_name)->first;
 }
 
 DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & 
id, const String & snapshot_id, std::unordered_set<String> part_name)
@@ -96,7 +109,7 @@ DataPartsVector 
StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i
         CustomStorageMergeTreePtr storage_merge_tree;
         {
             std::lock_guard storage_lock(storage_map_mutex);
-            storage_merge_tree = *(storage_map->get(table_name));
+            storage_merge_tree = storage_map->get(table_name)->first;
         }
         auto missing_parts = 
storage_merge_tree->loadDataPartsWithNames(missing_names);
         for (const auto & part : missing_parts)
@@ -108,9 +121,9 @@ DataPartsVector 
StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i
     return res;
 }
 // will be inited in native init phase
-std::unique_ptr<Poco::LRUCache<std::string, CustomStorageMergeTreePtr>> 
StorageMergeTreeFactory::storage_map = nullptr;
+std::unique_ptr<Poco::LRUCache<std::string, 
std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>> 
StorageMergeTreeFactory::storage_map = nullptr;
 std::unique_ptr<Poco::LRUCache<std::string, 
std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>> 
StorageMergeTreeFactory::datapart_map = nullptr;
-std::mutex StorageMergeTreeFactory::storage_map_mutex;
-std::mutex StorageMergeTreeFactory::datapart_mutex;
+std::recursive_mutex StorageMergeTreeFactory::storage_map_mutex;
+std::recursive_mutex StorageMergeTreeFactory::datapart_mutex;
 
 }
diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h 
b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
index f372175bb..3fa8c6285 100644
--- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
+++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
@@ -16,11 +16,13 @@
  */
 #pragma once
 #include <Common/GlutenConfig.h>
+#include <Common/MergeTreeTool.h>
 #include <Poco/LRUCache.h>
 #include <Parser/SerializedPlanParser.h>
 #include <Storages/CustomStorageMergeTree.h>
 #include <Interpreters/MergeTreeTransaction.h>
 
+
 namespace local_engine
 {
 using CustomStorageMergeTreePtr = std::shared_ptr<CustomStorageMergeTree>;
@@ -31,7 +33,7 @@ public:
     static StorageMergeTreeFactory & instance();
     static void freeStorage(const StorageID & id, const String & snapshot_id = 
"");
     static CustomStorageMergeTreePtr
-    getStorage(StorageID id, const String & snapshot_id, 
std::function<CustomStorageMergeTreePtr()> creator);
+    getStorage(const StorageID& id, const String & snapshot_id, MergeTreeTable 
merge_tree_table, std::function<CustomStorageMergeTreePtr()> creator);
     static DataPartsVector getDataPartsByNames(const StorageID & id, const 
String & snapshot_id, std::unordered_set<String> part_name);
     static void init_cache_map()
     {
@@ -39,7 +41,7 @@ public:
         auto & storage_map_v = storage_map;
         if (!storage_map_v)
         {
-            storage_map_v = std::make_unique<Poco::LRUCache<std::string, 
CustomStorageMergeTreePtr>>(config.table_metadata_cache_max_count);
+            storage_map_v = std::make_unique<Poco::LRUCache<std::string, 
std::pair<CustomStorageMergeTreePtr, 
MergeTreeTable>>>(config.table_metadata_cache_max_count);
         }
         else
         {
@@ -65,10 +67,10 @@ public:
     static String getTableName(const StorageID & id, const String & 
snapshot_id);
 
 private:
-    static std::unique_ptr<Poco::LRUCache<std::string, 
CustomStorageMergeTreePtr>> storage_map;
+    static std::unique_ptr<Poco::LRUCache<std::string, 
std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>> storage_map;
     static std::unique_ptr<Poco::LRUCache<std::string, 
std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>> datapart_map;
-    static std::mutex storage_map_mutex;
-    static std::mutex datapart_mutex;
+    static std::recursive_mutex storage_map_mutex;
+    static std::recursive_mutex datapart_mutex;
 };
 
 struct TempStorageFreer
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 3a18771cd..8807a0f63 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -55,6 +55,7 @@
 #include <Common/ExceptionUtils.h>
 #include <Common/JNIUtils.h>
 #include <Common/QueryContext.h>
+#include <Storages/Cache/CacheManager.h>
 
 
 #ifdef __cplusplus
@@ -1252,6 +1253,21 @@ JNIEXPORT void 
Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeExc
 
 
 
+JNIEXPORT void 
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * 
env, jobject, jstring table_, jstring columns_, jboolean async_)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    auto table_def = jstring2string(env, table_);
+    auto columns = jstring2string(env, columns_);
+    Poco::StringTokenizer tokenizer(columns, ",");
+    std::unordered_set<String> column_set;
+    for (const auto & col : tokenizer)
+    {
+        column_set.insert(col);
+    }
+    local_engine::CacheManager::instance().cacheParts(table_def, column_set, 
async_);
+    LOCAL_ENGINE_JNI_METHOD_END(env, );
+}
+
 #ifdef __cplusplus
 }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 273443f64..8f24afae1 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.BuildSide
+import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
Partitioning}
@@ -468,6 +469,9 @@ trait SparkPlanExecApi {
 
   def genInjectPostHocResolutionRules(): List[SparkSession => 
Rule[LogicalPlan]]
 
+  def genInjectExtendedParser(): List[(SparkSession, ParserInterface) => 
ParserInterface] =
+    List.empty
+
   def genGetStructFieldTransformer(
       substraitExprName: String,
       childTransformer: ExpressionTransformer,
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala
index 0897f411f..f2ccf6e81 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala
@@ -23,6 +23,9 @@ import org.apache.spark.sql.SparkSessionExtensions
 
 object OthersExtensionOverrides extends GlutenSparkExtensionsInjector {
   override def inject(extensions: SparkSessionExtensions): Unit = {
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .genInjectExtendedParser()
+      .foreach(extensions.injectParser)
     BackendsApiManager.getSparkPlanExecApiInstance
       .genExtendedAnalyzers()
       .foreach(extensions.injectResolutionRule)
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 4df9c63b3..681653409 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.extension
 
 import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
@@ -37,7 +38,12 @@ class GlutenSessionExtensionSuite extends 
GlutenSQLTestsTrait {
     
assert(spark.sessionState.analyzer.postHocResolutionRules.contains(MyRule(spark)))
     
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
     
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
-    assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+    if (BackendTestUtils.isCHBackendLoaded()) {
+      assert(
+        
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+    } else {
+      assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+    }
     assert(
       spark.sessionState.functionRegistry
         .lookupFunction(MyExtensions.myFunction._1)
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 4df9c63b3..681653409 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.extension
 
 import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
@@ -37,7 +38,12 @@ class GlutenSessionExtensionSuite extends 
GlutenSQLTestsTrait {
     
assert(spark.sessionState.analyzer.postHocResolutionRules.contains(MyRule(spark)))
     
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
     
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
-    assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+    if (BackendTestUtils.isCHBackendLoaded()) {
+      assert(
+        
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+    } else {
+      assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+    }
     assert(
       spark.sessionState.functionRegistry
         .lookupFunction(MyExtensions.myFunction._1)
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 4df9c63b3..681653409 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.extension
 
 import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
@@ -37,7 +38,12 @@ class GlutenSessionExtensionSuite extends 
GlutenSQLTestsTrait {
     
assert(spark.sessionState.analyzer.postHocResolutionRules.contains(MyRule(spark)))
     
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
     
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
-    assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+    if (BackendTestUtils.isCHBackendLoaded()) {
+      assert(
+        
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+    } else {
+      assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+    }
     assert(
       spark.sessionState.functionRegistry
         .lookupFunction(MyExtensions.myFunction._1)
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 4df9c63b3..681653409 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.extension
 
 import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
@@ -37,7 +38,12 @@ class GlutenSessionExtensionSuite extends 
GlutenSQLTestsTrait {
     
assert(spark.sessionState.analyzer.postHocResolutionRules.contains(MyRule(spark)))
     
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
     
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
-    assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+    if (BackendTestUtils.isCHBackendLoaded()) {
+      assert(
+        
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+    } else {
+      assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+    }
     assert(
       spark.sessionState.functionRegistry
         .lookupFunction(MyExtensions.myFunction._1)
diff --git a/pom.xml b/pom.xml
index cbec5befb..6f6b2cd57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@
     <hadoop.version>2.7.4</hadoop.version>
     <slf4j.version>2.0.7</slf4j.version>
     <log4j.version>2.20.0</log4j.version>
+    <antlr4.version>4.9.3</antlr4.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <project.prefix>spark-sql-columnar</project.prefix>
@@ -261,6 +262,7 @@
         <delta.package.name>delta-core</delta.package.name>
         <delta.version>2.0.1</delta.version>
         <delta.binary.version>20</delta.binary.version>
+        <antlr4.version>4.8</antlr4.version>
       </properties>
     </profile>
     <profile>
@@ -275,6 +277,7 @@
         <delta.package.name>delta-core</delta.package.name>
         <delta.version>2.3.0</delta.version>
         <delta.binary.version>23</delta.binary.version>
+        <antlr4.version>4.8</antlr4.version>
       </properties>
     </profile>
     <profile>
@@ -288,6 +291,7 @@
         <delta.package.name>delta-core</delta.package.name>
         <delta.version>2.4.0</delta.version>
         <delta.binary.version>24</delta.binary.version>
+        <antlr4.version>4.9.3</antlr4.version>
       </properties>
     </profile>
     <profile>
@@ -303,6 +307,7 @@
         <delta.binary.version>32</delta.binary.version>
         <fasterxml.version>2.15.1</fasterxml.version>
         <hadoop.version>3.3.4</hadoop.version>
+        <antlr4.version>4.9.3</antlr4.version>
       </properties>
       <dependencies>
         <dependency>
@@ -989,6 +994,11 @@
           <artifactId>protobuf-maven-plugin</artifactId>
           <version>0.5.1</version>
         </plugin>
+        <plugin>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-maven-plugin</artifactId>
+          <version>${antlr4.version}</version>
+        </plugin>
       </plugins>
     </pluginManagement>
     <plugins>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to