This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6f00a4549 [CH] Support CACHE DATA command for MergeTree table (#6621)
6f00a4549 is described below
commit 6f00a4549654fa85a3d50d3df92ae66dd8407ebf
Author: LiuNeng <[email protected]>
AuthorDate: Sun Aug 4 13:50:46 2024 +0800
[CH] Support CACHE DATA command for MergeTree table (#6621)
[CH] Support CACHE DATA command for MergeTree table
---------
Co-authored-by: liuneng1994 <[email protected]>
Co-authored-by: Zhichao Zhang <[email protected]>
---
backends-clickhouse/pom.xml | 16 +
.../gluten/sql/parser/GlutenClickhouseSqlBase.g4 | 232 ++++++++++++
.../gluten/parser/GlutenClickhouseSqlParser.scala | 61 ++++
.../org/apache/spark/sql/delta/DeltaAdapter.scala | 10 +
.../gluten/parser/GlutenClickhouseSqlParser.scala | 65 ++++
.../org/apache/spark/sql/delta/DeltaAdapter.scala | 10 +
.../gluten/parser/GlutenClickhouseSqlParser.scala | 65 ++++
.../org/apache/spark/sql/delta/DeltaAdapter.scala | 11 +
.../gluten/execution/CHNativeCacheManager.java} | 11 +-
.../clickhouse/CHSparkPlanExecApi.scala | 7 +
.../parser/GlutenClickhouseSqlParserBase.scala | 276 ++++++++++++++
.../apache/spark/rpc/GlutenDriverEndpoint.scala | 4 +-
.../apache/spark/rpc/GlutenExecutorEndpoint.scala | 17 +-
.../org/apache/spark/rpc/GlutenRpcMessages.scala | 4 +
.../apache/spark/sql/delta/DeltaAdapterTrait.scala | 9 +
.../commands/GlutenCHCacheDataCommand.scala | 287 +++++++++++++++
.../v2/clickhouse/metadata/AddFileTags.scala | 11 +-
.../GlutenClickHouseMergeTreeCacheDataSSuite.scala | 401 +++++++++++++++++++++
.../GlutenClickHouseNativeWriteTableSuite.scala | 2 +-
cpp-ch/local-engine/CMakeLists.txt | 1 +
cpp-ch/local-engine/Common/CHUtil.cpp | 3 +
cpp-ch/local-engine/Common/MergeTreeTool.cpp | 32 ++
cpp-ch/local-engine/Common/MergeTreeTool.h | 1 +
cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 14 +-
cpp-ch/local-engine/Parser/MergeTreeRelParser.h | 2 -
.../local-engine/Storages/Cache/CacheManager.cpp | 143 ++++++++
cpp-ch/local-engine/Storages/Cache/CacheManager.h | 44 +++
.../local-engine/Storages/CustomMergeTreeSink.cpp | 30 --
cpp-ch/local-engine/Storages/CustomMergeTreeSink.h | 43 ---
.../Storages/StorageMergeTreeFactory.cpp | 27 +-
.../Storages/StorageMergeTreeFactory.h | 12 +-
cpp-ch/local-engine/local_engine_jni.cpp | 16 +
.../gluten/backendsapi/SparkPlanExecApi.scala | 4 +
.../extension/OthersExtensionOverrides.scala | 3 +
.../extension/GlutenSessionExtensionSuite.scala | 8 +-
.../extension/GlutenSessionExtensionSuite.scala | 8 +-
.../extension/GlutenSessionExtensionSuite.scala | 8 +-
.../extension/GlutenSessionExtensionSuite.scala | 8 +-
pom.xml | 10 +
39 files changed, 1805 insertions(+), 111 deletions(-)
diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index 5672056b4..f2ec45a51 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -365,6 +365,22 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>antlr4</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <visitor>true</visitor>
+
<sourceDirectory>../backends-clickhouse/src/main/antlr4</sourceDirectory>
+ <treatWarningsAsErrors>true</treatWarningsAsErrors>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4
b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4
new file mode 100644
index 000000000..ac4f66a4f
--- /dev/null
+++
b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+grammar GlutenClickhouseSqlBase;
+
+@members {
+ /**
+ * Verify whether current token is a valid decimal token (which contains
dot).
+ * Returns true if the character that follows the token is not a digit or
letter or underscore.
+ *
+ * For example:
+ * For char stream "2.3", "2." is not a valid decimal token, because it is
followed by digit '3'.
+ * For char stream "2.3_", "2.3" is not a valid decimal token, because it is
followed by '_'.
+ * For char stream "2.3W", "2.3" is not a valid decimal token, because it is
followed by 'W'.
+ * For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token
because it is folllowed
+ * by a space. 34.E2 is a valid decimal token because it is followed by
symbol '+'
+ * which is not a digit or letter or underscore.
+ */
+ public boolean isValidDecimal() {
+ int nextChar = _input.LA(1);
+ if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <=
'9' ||
+ nextChar == '_') {
+ return false;
+ } else {
+ return true;
+ }
+ }
+}
+
+tokens {
+ DELIMITER
+}
+
+singleStatement
+ : statement ';'* EOF
+ ;
+
+statement
+ : CACHE META? DATA ASYNC? SELECT selectedColumns=selectedColumnNames
+ FROM (path=STRING | table=qualifiedName) (AFTER filter=filterClause)?
+ (CACHEPROPERTIES cacheProps=propertyList)?
#cacheData
+ | .*?
#passThrough
+ ;
+
+qualifiedName
+ : identifier (DOT identifier)*
+ ;
+
+selectedColumnNames
+ : ASTERISK
+ | identifier (COMMA identifier)*
+ ;
+
+filterClause
+ : TIMESTAMP AS OF timestamp=STRING
+ | datepartition=identifier AS OF datetime=STRING
+ ;
+
+propertyList
+ : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+ ;
+
+property
+ : key=propertyKey (EQ? value=propertyValue)?
+ ;
+
+propertyKey
+ : identifier (DOT identifier)*
+ | stringLit
+ ;
+
+propertyValue
+ : INTEGER_VALUE
+ | DECIMAL_VALUE
+ | booleanValue
+ | identifier LEFT_PAREN stringLit COMMA stringLit RIGHT_PAREN
+ | value=stringLit
+ ;
+
+stringLit
+ : STRING
+ | DOUBLEQUOTED_STRING
+ ;
+
+booleanValue
+ : TRUE | FALSE
+ ;
+
+identifier
+ : IDENTIFIER #unquotedIdentifier
+ | quotedIdentifier #quotedIdentifierAlternative
+ | nonReserved #unquotedIdentifier
+ ;
+
+quotedIdentifier
+ : BACKQUOTED_IDENTIFIER
+ ;
+
+// Add keywords here so that people's queries don't break if they have a
column name as one of
+// these tokens
+nonReserved
+ : CACHE | META | ASYNC | DATA
+ | SELECT | FOR | AFTER | CACHEPROPERTIES
+ | TIMESTAMP | AS | OF | DATE_PARTITION
+ ;
+
+// Define how the keywords above should appear in a user's SQL statement.
+CACHE: 'CACHE';
+META: 'META';
+ASYNC: 'ASYNC';
+DATA: 'DATA';
+SELECT: 'SELECT';
+COMMA: ',';
+FOR: 'FOR';
+FROM: 'FROM';
+AFTER: 'AFTER';
+CACHEPROPERTIES: 'CACHEPROPERTIES';
+DOT: '.';
+ASTERISK: '*';
+TIMESTAMP: 'TIMESTAMP';
+AS: 'AS';
+OF: 'OF';
+DATE_PARTITION: 'DATE_PARTITION';
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+TRUE: 'TRUE';
+FALSE: 'FALSE';
+
+EQ : '=' | '==';
+NSEQ: '<=>';
+NEQ : '<>';
+NEQJ: '!=';
+LTE : '<=' | '!>';
+GTE : '>=' | '!<';
+CONCAT_PIPE: '||';
+
+STRING
+ : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
+ | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
+ ;
+
+DOUBLEQUOTED_STRING
+ :'"' ( ~('"'|'\\') | ('\\' .) )* '"'
+ ;
+
+BIGINT_LITERAL
+ : DIGIT+ 'L'
+ ;
+
+SMALLINT_LITERAL
+ : DIGIT+ 'S'
+ ;
+
+TINYINT_LITERAL
+ : DIGIT+ 'Y'
+ ;
+
+INTEGER_VALUE
+ : DIGIT+
+ ;
+
+DECIMAL_VALUE
+ : DIGIT+ EXPONENT
+ | DECIMAL_DIGITS EXPONENT? {isValidDecimal()}?
+ ;
+
+DOUBLE_LITERAL
+ : DIGIT+ EXPONENT? 'D'
+ | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}?
+ ;
+
+BIGDECIMAL_LITERAL
+ : DIGIT+ EXPONENT? 'BD'
+ | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
+ ;
+
+IDENTIFIER
+ : (LETTER | DIGIT | '_')+
+ ;
+
+BACKQUOTED_IDENTIFIER
+ : '`' ( ~'`' | '``' )* '`'
+ ;
+
+fragment DECIMAL_DIGITS
+ : DIGIT+ '.' DIGIT*
+ | '.' DIGIT+
+ ;
+
+fragment EXPONENT
+ : 'E' [+-]? DIGIT+
+ ;
+
+fragment DIGIT
+ : [0-9]
+ ;
+
+fragment LETTER
+ : [A-Z]
+ ;
+
+SIMPLE_COMMENT
+ : '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN)
+ ;
+
+BRACKETED_COMMENT
+ : '/*' .*? '*/' -> channel(HIDDEN)
+ ;
+
+WS : [ \r\n\t]+ -> channel(HIDDEN)
+ ;
+
+// Catch-all for anything we can't recognize.
+// We use this to be able to ignore and recover all the text
+// when splitting statements with DelimiterLexer
+UNRECOGNIZED
+ : .
+ ;
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
new file mode 100644
index 000000000..cc4f0bd9f
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenClickhouseSqlParser(spark: SparkSession, delegate: ParserInterface)
+ extends GlutenClickhouseSqlParserBase {
+
+ override def parsePlan(sqlText: String): LogicalPlan =
+ parse(sqlText) {
+ parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+}
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/DeltaAdapter.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/DeltaAdapter.scala
index b6d4c0484..4ffa2e841 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/DeltaAdapter.scala
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/DeltaAdapter.scala
@@ -15,7 +15,17 @@
* limitations under the License.
*/
package org.apache.spark.sql.delta
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.stats.DeltaScan
object DeltaAdapter extends DeltaAdapterTrait {
override def snapshot(deltaLog: DeltaLog): Snapshot = deltaLog.snapshot
+
+ override def snapshotFilesForScan(
+ snapshot: Snapshot,
+ projection: Seq[Attribute],
+ filters: Seq[Expression],
+ keepNumRecords: Boolean): DeltaScan = {
+ snapshot.filesForScan(projection, filters, keepNumRecords)
+ }
}
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
new file mode 100644
index 000000000..1f2dfe007
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenClickhouseSqlParser(spark: SparkSession, delegate: ParserInterface)
+ extends GlutenClickhouseSqlParserBase {
+
+ override def parsePlan(sqlText: String): LogicalPlan =
+ parse(sqlText) {
+ parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+
+ override def parseQuery(sqlText: String): LogicalPlan = {
+ delegate.parseQuery(sqlText)
+ }
+}
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/DeltaAdapter.scala
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/DeltaAdapter.scala
index 8a9c5585e..58d59aa9d 100644
---
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/DeltaAdapter.scala
+++
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/DeltaAdapter.scala
@@ -15,7 +15,17 @@
* limitations under the License.
*/
package org.apache.spark.sql.delta
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.stats.DeltaScan
object DeltaAdapter extends DeltaAdapterTrait {
override def snapshot(deltaLog: DeltaLog): Snapshot =
deltaLog.unsafeVolatileSnapshot
+
+ override def snapshotFilesForScan(
+ snapshot: Snapshot,
+ projection: Seq[Attribute],
+ filters: Seq[Expression],
+ keepNumRecords: Boolean): DeltaScan = {
+ snapshot.filesForScan(filters, keepNumRecords)
+ }
}
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
new file mode 100644
index 000000000..1f2dfe007
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenClickhouseSqlParser(spark: SparkSession, delegate: ParserInterface)
+ extends GlutenClickhouseSqlParserBase {
+
+ override def parsePlan(sqlText: String): LogicalPlan =
+ parse(sqlText) {
+ parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+
+ override def parseQuery(sqlText: String): LogicalPlan = {
+ delegate.parseQuery(sqlText)
+ }
+}
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaAdapter.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaAdapter.scala
index 8a9c5585e..f414ab8f2 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaAdapter.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaAdapter.scala
@@ -16,6 +16,17 @@
*/
package org.apache.spark.sql.delta
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.stats.DeltaScan
+
object DeltaAdapter extends DeltaAdapterTrait {
override def snapshot(deltaLog: DeltaLog): Snapshot =
deltaLog.unsafeVolatileSnapshot
+
+ override def snapshotFilesForScan(
+ snapshot: Snapshot,
+ projection: Seq[Attribute],
+ filters: Seq[Expression],
+ keepNumRecords: Boolean): DeltaScan = {
+ snapshot.filesForScan(filters, keepNumRecords)
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
similarity index 70%
copy from
backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
copy to
backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
index 3ea4af4ae..f5f75dc1d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
@@ -14,9 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.delta
+package org.apache.gluten.execution;
-trait DeltaAdapterTrait {
+import java.util.Set;
- def snapshot(deltaLog: DeltaLog): Snapshot
+public class CHNativeCacheManager {
+ public static void cacheParts(String table, Set<String> columns, boolean
async) {
+ nativeCacheParts(table, String.join(",", columns), async);
+ }
+
+ private static native void nativeCacheParts(String table, String columns,
boolean async);
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 3069c4a3f..bba5525ed 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -25,6 +25,7 @@ import
org.apache.gluten.extension.{CountDistinctWithoutExpand, FallbackBroadcas
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.parser.GlutenClickhouseSqlParser
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
@@ -40,6 +41,7 @@ import
org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRew
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
CollectList, CollectSet}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
+import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode,
HashPartitioning, Partitioning, RangePartitioning}
@@ -612,6 +614,11 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
override def genExtendedStrategies(): List[SparkSession => Strategy] =
List()
+ override def genInjectExtendedParser()
+ : List[(SparkSession, ParserInterface) => ParserInterface] = {
+ List((spark, parserInterface) => new GlutenClickhouseSqlParser(spark,
parserInterface))
+ }
+
/** Define backend specfic expression mappings. */
override def extraExpressionMappings: Seq[Sig] = {
List(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
new file mode 100644
index 000000000..18fc102be
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.gluten.sql.parser.{GlutenClickhouseSqlBaseBaseListener,
GlutenClickhouseSqlBaseBaseVisitor, GlutenClickhouseSqlBaseLexer,
GlutenClickhouseSqlBaseParser}
+import org.apache.gluten.sql.parser.GlutenClickhouseSqlBaseParser._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.parser.{ParseErrorListener,
ParseException, ParserInterface}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand
+import org.apache.spark.sql.internal.VariableSubstitution
+
+import org.antlr.v4.runtime._
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.antlr.v4.runtime.tree.TerminalNodeImpl
+
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+trait GlutenClickhouseSqlParserBase extends ParserInterface {
+
+ protected val astBuilder = new GlutenClickhouseSqlAstBuilder
+ protected val substitution = new VariableSubstitution
+
+ protected def parse[T](command: String)(toResult:
GlutenClickhouseSqlBaseParser => T): T = {
+ val lexer = new GlutenClickhouseSqlBaseLexer(
+ new
UpperCaseCharStream(CharStreams.fromString(substitution.substitute(command))))
+ lexer.removeErrorListeners()
+ lexer.addErrorListener(ParseErrorListener)
+
+ val tokenStream = new CommonTokenStream(lexer)
+ val parser = new GlutenClickhouseSqlBaseParser(tokenStream)
+ parser.addParseListener(PostProcessor)
+ parser.removeErrorListeners()
+ parser.addErrorListener(ParseErrorListener)
+
+ try {
+ try {
+ // first, try parsing with potentially faster SLL mode
+ parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
+ toResult(parser)
+ } catch {
+ case e: ParseCancellationException =>
+ // if we fail, parse with LL mode
+ tokenStream.seek(0) // rewind input stream
+ parser.reset()
+
+ // Try Again.
+ parser.getInterpreter.setPredictionMode(PredictionMode.LL)
+ toResult(parser)
+ }
+ } catch {
+ case e: ParseException if e.command.isDefined =>
+ throw e
+ case e: ParseException =>
+ throw e.withCommand(command)
+ case e: AnalysisException =>
+ val position = Origin(e.line, e.startPosition)
+ throw new ParseException(
+ command = Option(command),
+ message = e.message,
+ start = position,
+ stop = position,
+ errorClass = Some("GLUTEN_CH_PARSING_ANALYSIS_ERROR"))
+ }
+ }
+}
+
+class GlutenClickhouseSqlAstBuilder extends
GlutenClickhouseSqlBaseBaseVisitor[AnyRef] {
+
+ import org.apache.spark.sql.catalyst.parser.ParserUtils._
+
+ /** Convert a property list into a key-value map. */
+ override def visitPropertyList(ctx: PropertyListContext): Map[String,
String] = withOrigin(ctx) {
+ val properties = ctx.property.asScala.map {
+ property =>
+ val key = visitPropertyKey(property.key)
+ val value = visitPropertyValue(property.value)
+ key -> value
+ }
+ // Check for duplicate property names.
+ checkDuplicateKeys(properties.toSeq, ctx)
+ properties.toMap
+ }
+
+ /**
+ * A property key can either be String or a collection of dot separated
elements. This function
+ * extracts the property key based on whether its a string literal or a
property identifier.
+ */
+ override def visitPropertyKey(key: PropertyKeyContext): String = {
+ if (key.stringLit() != null) {
+ string(visitStringLit(key.stringLit()))
+ } else {
+ key.getText
+ }
+ }
+
+ /**
+ * A property value can be String, Integer, Boolean or Decimal. This
function extracts the
+ * property value based on whether its a string, integer, boolean or decimal
literal.
+ */
+ override def visitPropertyValue(value: PropertyValueContext): String = {
+ if (value == null) {
+ null
+ } else if (value.identifier != null) {
+ value.identifier.getText
+ } else if (value.value != null) {
+ string(visitStringLit(value.value))
+ } else if (value.booleanValue != null) {
+ value.getText.toLowerCase(Locale.ROOT)
+ } else {
+ value.getText
+ }
+ }
+
+ def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.collect { case (key, null) => key }
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values must be specified for key(s): ${badKeys.mkString("[", ",",
"]")}",
+ ctx)
+ }
+ props
+ }
+
+ override def visitStringLit(ctx: StringLitContext): Token = {
+ if (ctx != null) {
+ if (ctx.STRING != null) {
+ ctx.STRING.getSymbol
+ } else {
+ ctx.DOUBLEQUOTED_STRING.getSymbol
+ }
+ } else {
+ null
+ }
+ }
+
+ override def visitSingleStatement(
+ ctx: GlutenClickhouseSqlBaseParser.SingleStatementContext): AnyRef =
withOrigin(ctx) {
+ visit(ctx.statement).asInstanceOf[LogicalPlan]
+ }
+
+ override def visitCacheData(ctx:
GlutenClickhouseSqlBaseParser.CacheDataContext): AnyRef =
+ withOrigin(ctx) {
+ val onlyMetaCache = ctx.META != null
+ val asynExecute = ctx.ASYNC != null
+ val (tsfilter, partitionColumn, partitionValue) = if (ctx.AFTER != null)
{
+ if (ctx.filter.TIMESTAMP != null) {
+ (Some(string(ctx.filter.timestamp)), None, None)
+ } else if (ctx.filter.datepartition != null && ctx.filter.datetime !=
null) {
+ (None, Some(ctx.filter.datepartition.getText),
Some(string(ctx.filter.datetime)))
+ } else {
+ throw new ParseException(s"Illegal filter value ${ctx.getText}", ctx)
+ }
+ } else {
+ (None, None, None)
+ }
+ val selectedColuman = visitSelectedColumnNames(ctx.selectedColumns)
+ val tablePropertyOverrides = Option(ctx.cacheProps)
+ .map(visitPropertyKeyValues)
+ .getOrElse(Map.empty[String, String])
+
+ GlutenCHCacheDataCommand(
+ onlyMetaCache,
+ asynExecute,
+ selectedColuman,
+ Option(ctx.path).map(string),
+ Option(ctx.table).map(visitTableIdentifier),
+ tsfilter,
+ partitionColumn,
+ partitionValue,
+ tablePropertyOverrides
+ )
+ }
+
+ override def visitPassThrough(ctx:
GlutenClickhouseSqlBaseParser.PassThroughContext): AnyRef =
+ null
+
+ protected def visitTableIdentifier(ctx: QualifiedNameContext):
TableIdentifier = withOrigin(ctx) {
+ ctx.identifier.asScala.toSeq match {
+ case Seq(tbl) => TableIdentifier(tbl.getText)
+ case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText))
+ // TODO: Spark 3.5 supports catalog parameter
+ // case Seq(catalog, db, tbl) =>
+ // TableIdentifier(tbl.getText, Some(db.getText),
Some(catalog.getText))
+ case _ => throw new ParseException(s"Illegal table name ${ctx.getText}",
ctx)
+ }
+ }
+
+ override def visitSelectedColumnNames(ctx: SelectedColumnNamesContext):
Option[Seq[String]] =
+ withOrigin(ctx) {
+ if (ctx != null) {
+ if (ctx.ASTERISK != null) {
+ // It means select all columns
+ None
+ } else if (ctx.identifier != null && !(ctx.identifier).isEmpty) {
+ Some(ctx.identifier.asScala.map(_.getText).toSeq)
+ } else {
+ throw new ParseException(s"Illegal selected column.", ctx)
+ }
+ } else {
+ throw new ParseException(s"Illegal selected column.", ctx)
+ }
+ }
+}
+
+case object PostProcessor extends GlutenClickhouseSqlBaseBaseListener {
+
+ /** Remove the back ticks from an Identifier. */
+ override def exitQuotedIdentifier(ctx: QuotedIdentifierContext): Unit = {
+ replaceTokenByIdentifier(ctx, 1) {
+ token =>
+ // Remove the double back ticks in the string.
+ token.setText(token.getText.replace("``", "`"))
+ token
+ }
+ }
+
+ /** Treat non-reserved keywords as Identifiers. */
+ override def exitNonReserved(ctx: NonReservedContext): Unit = {
+ replaceTokenByIdentifier(ctx, 0)(identity)
+ }
+
+ private def replaceTokenByIdentifier(ctx: ParserRuleContext, stripMargins:
Int)(
+ f: CommonToken => CommonToken = identity): Unit = {
+ val parent = ctx.getParent
+ parent.removeLastChild()
+ val token = ctx.getChild(0).getPayload.asInstanceOf[Token]
+ val newToken = new CommonToken(
+ new org.antlr.v4.runtime.misc.Pair(token.getTokenSource,
token.getInputStream),
+ GlutenClickhouseSqlBaseParser.IDENTIFIER,
+ token.getChannel,
+ token.getStartIndex + stripMargins,
+ token.getStopIndex - stripMargins
+ )
+ parent.addChild(new TerminalNodeImpl(f(newToken)))
+ }
+}
+
+class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
+ override def consume(): Unit = wrapped.consume
+ override def getSourceName(): String = wrapped.getSourceName
+ override def index(): Int = wrapped.index
+ override def mark(): Int = wrapped.mark
+ override def release(marker: Int): Unit = wrapped.release(marker)
+ override def seek(where: Int): Unit = wrapped.seek(where)
+ override def size(): Int = wrapped.size
+
+ override def getText(interval: Interval): String = wrapped.getText(interval)
+
+ override def LA(i: Int): Int = {
+ val la = wrapped.LA(i)
+ if (la == 0 || la == IntStream.EOF) la
+ else Character.toUpperCase(la)
+ }
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
index 319381f89..a061a620d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
@@ -103,7 +103,7 @@ object GlutenDriverEndpoint extends Logging with
RemovalListener[String, util.Se
var glutenDriverEndpointRef: RpcEndpointRef = _
// keep executorRef on memory
- private val executorDataMap = new ConcurrentHashMap[String, ExecutorData]
+ val executorDataMap = new ConcurrentHashMap[String, ExecutorData]
// If spark.scheduler.listenerbus.eventqueue.capacity is set too small,
// the listener may lose messages.
@@ -131,4 +131,4 @@ object GlutenDriverEndpoint extends Logging with
RemovalListener[String, util.Se
}
}
-private class ExecutorData(val executorEndpointRef: RpcEndpointRef) {}
+class ExecutorData(val executorEndpointRef: RpcEndpointRef) {}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index f05933ef7..4d90ab653 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.rpc
-import org.apache.gluten.execution.CHBroadcastBuildSideCache
+import org.apache.gluten.execution.{CHBroadcastBuildSideCache,
CHNativeCacheManager}
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.{config, Logging}
@@ -64,10 +64,25 @@ class GlutenExecutorEndpoint(val executorId: String, val
conf: SparkConf)
hashIds.forEach(
resource_id =>
CHBroadcastBuildSideCache.invalidateBroadcastHashtable(resource_id))
}
+ case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
+ CHNativeCacheManager.cacheParts(mergeTreeTable, columns, true)
case e =>
logError(s"Received unexpected message. $e")
}
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
Unit] = {
+ case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
+ try {
+ CHNativeCacheManager.cacheParts(mergeTreeTable, columns, false)
+ context.reply(CacheLoadResult(true))
+ } catch {
+ case _: Exception =>
+ context.reply(CacheLoadResult(false, s"executor: $executorId cache
data failed."))
+ }
+ case e =>
+ logError(s"Received unexpected message. $e")
+ }
}
object GlutenExecutorEndpoint {
var executorEndpoint: GlutenExecutorEndpoint = _
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
index 43a0b7bd4..d675d705f 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
@@ -35,4 +35,8 @@ object GlutenRpcMessages {
case class GlutenCleanExecutionResource(executionId: String,
broadcastHashIds: util.Set[String])
extends GlutenRpcMessage
+ case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns:
util.Set[String])
+ extends GlutenRpcMessage
+
+ case class CacheLoadResult(success: Boolean, reason: String = "") extends
GlutenRpcMessage
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
index 3ea4af4ae..6f3bb3705 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/DeltaAdapterTrait.scala
@@ -16,7 +16,16 @@
*/
package org.apache.spark.sql.delta
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.stats.DeltaScan
+
trait DeltaAdapterTrait {
def snapshot(deltaLog: DeltaLog): Snapshot
+
+ def snapshotFilesForScan(
+ snapshot: Snapshot,
+ projection: Seq[Attribute],
+ filters: Seq[Expression],
+ keepNumRecords: Boolean): DeltaScan
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
new file mode 100644
index 000000000..1e6b02406
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.commands
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.substrait.rel.ExtensionTableBuilder
+
+import org.apache.spark.affinity.CHAffinity
+import org.apache.spark.rpc.GlutenDriverEndpoint
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheLoadResult,
GlutenMergeTreeCacheLoad}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, GreaterThanOrEqual, IsNotNull, Literal}
+import org.apache.spark.sql.delta._
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import
org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand.toExecutorId
+import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+import org.apache.spark.sql.types.{BooleanType, StringType}
+import org.apache.spark.util.ThreadUtils
+
+import org.apache.hadoop.fs.Path
+
+import java.net.URI
+import java.util.{ArrayList => JList}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+case class GlutenCHCacheDataCommand(
+ onlyMetaCache: Boolean,
+ asynExecute: Boolean,
+ selectedColuman: Option[Seq[String]],
+ path: Option[String],
+ table: Option[TableIdentifier],
+ tsfilter: Option[String],
+ partitionColumn: Option[String],
+ partitionValue: Option[String],
+ tablePropertyOverrides: Map[String, String]
+) extends LeafRunnableCommand {
+
+ override def output: Seq[Attribute] = Seq(
+ AttributeReference("result", BooleanType, nullable = false)(),
+ AttributeReference("reason", StringType, nullable = false)())
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val pathToCache =
+ if (path.nonEmpty) {
+ new Path(path.get)
+ } else if (table.nonEmpty) {
+ DeltaTableIdentifier(sparkSession, table.get) match {
+ case Some(id) if id.path.nonEmpty =>
+ new Path(id.path.get)
+ case _ =>
+ new
Path(sparkSession.sessionState.catalog.getTableMetadata(table.get).location)
+ }
+ } else {
+ throw DeltaErrors.missingTableIdentifierException("CACHE DATA")
+ }
+
+ val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession,
pathToCache)
+ if (baseDeltaPath.isDefined) {
+ if (baseDeltaPath.get != pathToCache) {
+ throw DeltaErrors.vacuumBasePathMissingException(baseDeltaPath.get)
+ }
+ }
+
+ val deltaLog = DeltaLog.forTable(sparkSession, pathToCache)
+ if (!deltaLog.tableExists) {
+ throw DeltaErrors.notADeltaTableException(
+ "CACHE DATA",
+ DeltaTableIdentifier(path = Some(pathToCache.toString)))
+ }
+
+ val snapshot = deltaLog.update()
+
+ require(
+ snapshot.version >= 0,
+ "No state defined for this table. Is this really " +
+ "a Delta table? Refusing to garbage collect.")
+
+ val allColumns = snapshot.dataSchema.fieldNames.toSeq
+ val selectedColumns = if (selectedColuman.nonEmpty) {
+ selectedColuman.get
+ .filter(allColumns.contains(_))
+ .map(ConverterUtils.normalizeColName)
+ .toSeq
+ } else {
+ allColumns.map(ConverterUtils.normalizeColName)
+ }
+
+ val selectedAddFiles = if (tsfilter.isDefined) {
+ val allParts = DeltaAdapter.snapshotFilesForScan(snapshot, Seq.empty,
Seq.empty, false)
+ allParts.files.filter(_.modificationTime >= tsfilter.get.toLong).toSeq
+ } else if (partitionColumn.isDefined && partitionValue.isDefined) {
+ val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
+ require(
+ partitionColumns.contains(partitionColumn.get),
+ s"the partition column ${partitionColumn.get} is invalid.")
+ val partitionColumnField =
snapshot.metadata.partitionSchema(partitionColumn.get)
+
+ val partitionColumnAttr = AttributeReference(
+ ConverterUtils.normalizeColName(partitionColumn.get),
+ partitionColumnField.dataType,
+ partitionColumnField.nullable)()
+ val isNotNullExpr = IsNotNull(partitionColumnAttr)
+ val greaterThanOrEqual = GreaterThanOrEqual(partitionColumnAttr,
Literal(partitionValue.get))
+ DeltaAdapter
+ .snapshotFilesForScan(
+ snapshot,
+ Seq(partitionColumnAttr),
+ Seq(isNotNullExpr, greaterThanOrEqual),
+ false)
+ .files
+ } else {
+ DeltaAdapter.snapshotFilesForScan(snapshot, Seq.empty, Seq.empty,
false).files
+ }
+
+ val executorIdsToAddFiles =
+ scala.collection.mutable.Map[String, ArrayBuffer[AddMergeTreeParts]]()
+ val executorIdsToParts = scala.collection.mutable.Map[String, String]()
+ executorIdsToAddFiles.put(
+ GlutenCHCacheDataCommand.ALL_EXECUTORS,
+ new ArrayBuffer[AddMergeTreeParts]())
+ selectedAddFiles.foreach(
+ addFile => {
+ val mergeTreePart = addFile.asInstanceOf[AddMergeTreeParts]
+ val partName = mergeTreePart.name
+ val tableUri = URI.create(mergeTreePart.tablePath)
+ val relativeTablePath = if (tableUri.getPath.startsWith("/")) {
+ tableUri.getPath.substring(1)
+ } else tableUri.getPath
+
+ val locations = CHAffinity.getNativeMergeTreePartLocations(partName,
relativeTablePath)
+
+ if (locations.isEmpty) {
+ // non soft affinity
+ executorIdsToAddFiles
+ .get(GlutenCHCacheDataCommand.ALL_EXECUTORS)
+ .get
+ .append(mergeTreePart)
+ } else {
+ locations.foreach(
+ executor => {
+ if (!executorIdsToAddFiles.contains(executor)) {
+ executorIdsToAddFiles.put(executor, new
ArrayBuffer[AddMergeTreeParts]())
+ }
+ executorIdsToAddFiles.get(executor).get.append(mergeTreePart)
+ })
+ }
+ })
+
+ executorIdsToAddFiles.foreach(
+ value => {
+ val parts = value._2
+ val executorId = value._1
+ if (parts.nonEmpty) {
+ val onePart = parts(0)
+ val partNameList = parts.map(_.name).toSeq
+ // starts and lengths is useless for write
+ val partRanges = Seq.range(0L, partNameList.length).map(_ =>
long2Long(0L)).asJava
+
+ val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
+ -1,
+ -1,
+ onePart.database,
+ onePart.table,
+ ClickhouseSnapshot.genSnapshotId(snapshot),
+ onePart.tablePath,
+ pathToCache.toString,
+ snapshot.metadata.configuration.getOrElse("orderByKey", ""),
+ snapshot.metadata.configuration.getOrElse("lowCardKey", ""),
+ snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""),
+ snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey",
""),
+ snapshot.metadata.configuration.getOrElse("setIndexKey", ""),
+ snapshot.metadata.configuration.getOrElse("primaryKey", ""),
+ partNameList.asJava,
+ partRanges,
+ partRanges,
+ ConverterUtils.convertNamedStructJson(snapshot.metadata.schema),
+ snapshot.metadata.configuration.asJava,
+ new JList[String]()
+ )
+
+ executorIdsToParts.put(executorId,
extensionTableNode.getExtensionTableStr)
+ }
+ })
+
+ // send rpc call
+ if (executorIdsToParts.contains(GlutenCHCacheDataCommand.ALL_EXECUTORS)) {
+ // send all parts to all executors
+ val tableMessage =
executorIdsToParts.get(GlutenCHCacheDataCommand.ALL_EXECUTORS).get
+ if (asynExecute) {
+ GlutenDriverEndpoint.executorDataMap.forEach(
+ (executorId, executor) => {
+ executor.executorEndpointRef.send(
+ GlutenMergeTreeCacheLoad(tableMessage,
selectedColumns.toSet.asJava))
+ })
+ Seq(Row(true, ""))
+ } else {
+ val futureList = ArrayBuffer[Future[CacheLoadResult]]()
+ val resultList = ArrayBuffer[CacheLoadResult]()
+ GlutenDriverEndpoint.executorDataMap.forEach(
+ (executorId, executor) => {
+ futureList.append(
+ executor.executorEndpointRef.ask[CacheLoadResult](
+ GlutenMergeTreeCacheLoad(tableMessage,
selectedColumns.toSet.asJava)
+ ))
+ })
+ futureList.foreach(
+ f => {
+ resultList.append(ThreadUtils.awaitResult(f, Duration.Inf))
+ })
+ if (resultList.exists(!_.success)) {
+ Seq(Row(false,
resultList.filter(!_.success).map(_.reason).mkString(";")))
+ } else {
+ Seq(Row(true, ""))
+ }
+ }
+ } else {
+ if (asynExecute) {
+ executorIdsToParts.foreach(
+ value => {
+ val executorData =
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
+ if (executorData != null) {
+ executorData.executorEndpointRef.send(
+ GlutenMergeTreeCacheLoad(value._2,
selectedColumns.toSet.asJava))
+ } else {
+ throw new GlutenException(
+ s"executor ${value._1} not found," +
+ s" all executors are
${GlutenDriverEndpoint.executorDataMap.toString}")
+ }
+ })
+ Seq(Row(true, ""))
+ } else {
+ val futureList = ArrayBuffer[Future[CacheLoadResult]]()
+ val resultList = ArrayBuffer[CacheLoadResult]()
+ executorIdsToParts.foreach(
+ value => {
+ val executorData =
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
+ if (executorData != null) {
+ futureList.append(
+ executorData.executorEndpointRef.ask[CacheLoadResult](
+ GlutenMergeTreeCacheLoad(value._2,
selectedColumns.toSet.asJava)
+ ))
+ } else {
+ throw new GlutenException(
+ s"executor ${value._1} not found," +
+ s" all executors are
${GlutenDriverEndpoint.executorDataMap.toString}")
+ }
+ })
+ futureList.foreach(
+ f => {
+ resultList.append(ThreadUtils.awaitResult(f, Duration.Inf))
+ })
+ if (resultList.exists(!_.success)) {
+ Seq(Row(false,
resultList.filter(!_.success).map(_.reason).mkString(";")))
+ } else {
+ Seq(Row(true, ""))
+ }
+ }
+ }
+ }
+}
+
+object GlutenCHCacheDataCommand {
+ val ALL_EXECUTORS = "allExecutors"
+
+ private def toExecutorId(executorId: String): String =
+ executorId.split("_").last
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
index 8acc23aec..71d5c5431 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala
@@ -33,7 +33,7 @@ class AddMergeTreeParts(
val database: String,
val table: String,
val engine: String, // default is "MergeTree"
- override val path: String, // table path
+ val tablePath: String, // table path
val targetNode: String, // the node which the current part is generated
val name: String, // part name
val uuid: String,
@@ -98,7 +98,7 @@ object AddFileTags {
database: String,
table: String,
engine: String,
- path: String,
+ tablePath: String,
targetNode: String,
name: String,
uuid: String,
@@ -125,7 +125,7 @@ object AddFileTags {
"database" -> database,
"table" -> table,
"engine" -> engine,
- "path" -> path,
+ "path" -> tablePath,
"targetNode" -> targetNode,
"partition" -> partition,
"uuid" -> uuid,
@@ -161,7 +161,7 @@ object AddFileTags {
addFile.tags.get("database").get,
addFile.tags.get("table").get,
addFile.tags.get("engine").get,
- addFile.path,
+ addFile.tags.get("path").get,
addFile.tags.get("targetNode").get,
addFile.path,
addFile.tags.get("uuid").get,
@@ -199,6 +199,7 @@ object AddFileTags {
mapper.readValue(returnedMetrics, new
TypeReference[JList[WriteReturnedMetric]]() {})
var addFiles = new ArrayBuffer[AddFile]()
val path = new Path(originPathStr)
+ val modificationTime = System.currentTimeMillis()
addFiles.appendAll(values.asScala.map {
value =>
AddFileTags.partsInfoToAddFile(
@@ -213,7 +214,7 @@ object AddFileTags {
value.getDiskSize,
-1L,
-1L,
- -1L,
+ modificationTime,
"",
-1L,
-1L,
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
new file mode 100644
index 000000000..960c92178
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+
+import java.io.File
+
+import scala.concurrent.duration.DurationInt
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeCacheDataSSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val needCopyParquetToTablePath = true
+
+ override protected val tablesPath: String = basePath + "/tpch-data"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
+ .set("spark.gluten.soft-affinity.enabled", "true")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+ "false")
+ }
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ val conf = new Configuration
+ conf.set("fs.defaultFS", HDFS_URL)
+ val fs = FileSystem.get(conf)
+ fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
+ FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
+ }
+
+ def countFiles(directory: File): Int = {
+ if (directory.exists && directory.isDirectory) {
+ val files = directory.listFiles
+ val count = files
+ .count(_.isFile) + files.filter(_.isDirectory).map(countFiles).sum
+ count
+ } else {
+ 0
+ }
+ }
+
+ test("test cache mergetree data sync") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_shipdate)
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |TBLPROPERTIES (storage_policy='__hdfs_main',
+ | orderByKey='l_linenumber,l_orderkey')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_hdfs
+ | select * from lineitem a
+ | where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
+ |""".stripMargin)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ val dataPath = new File(HDFS_CACHE_PATH)
+ val initial_cache_files = countFiles(dataPath)
+
+ val res = spark
+ .sql(s"""
+ |cache data
+ | select l_orderkey, l_partkey from lineitem_mergetree_hdfs
+ | after l_shipdate AS OF '1995-01-10'
+ | CACHEPROPERTIES(storage_policy='__hdfs_main',
+ | aaa='ccc')""".stripMargin)
+ .collect()
+ assertResult(true)(res(0).getBoolean(0))
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ assertResult(true)(metaPath.exists() && metaPath.isDirectory)
+ assertResult(22)(metaPath.list().length)
+ assert(countFiles(dataPath) > initial_cache_files)
+
+ val first_cache_files = countFiles(dataPath)
+ val res1 = spark.sql(s"cache data select * from
lineitem_mergetree_hdfs").collect()
+ assertResult(true)(res1(0).getBoolean(0))
+ assertResult(31)(metaPath.list().length)
+ assert(countFiles(dataPath) > first_cache_files)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_hdfs
+ |WHERE
+ | l_shipdate >= date'1995-01-10'
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runSql(sqlStr)(
+ df => {
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assertResult(7898)(addFiles.map(_.rows).sum)
+ })
+ spark.sql("drop table lineitem_mergetree_hdfs purge")
+ }
+
+ test("test cache mergetree data async") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_shipdate)
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |TBLPROPERTIES (storage_policy='__hdfs_main',
+ | orderByKey='l_linenumber,l_orderkey')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_hdfs
+ | select * from lineitem a
+ | where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
+ |""".stripMargin)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ val dataPath = new File(HDFS_CACHE_PATH)
+ val initial_cache_files = countFiles(dataPath)
+
+ val res = spark
+ .sql(s"""
+ |cache data async
+ | select * from lineitem_mergetree_hdfs
+ | after l_shipdate AS OF '1995-01-10'
+ | CACHEPROPERTIES(storage_policy='__hdfs_main',
+ | aaa='ccc')""".stripMargin)
+ .collect()
+ assertResult(true)(res(0).getBoolean(0))
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ assertResult(true)(metaPath.exists() && metaPath.isDirectory)
+ eventually(timeout(60.seconds), interval(2.seconds)) {
+ assertResult(22)(metaPath.list().length)
+ assert(countFiles(dataPath) > initial_cache_files)
+ }
+
+ val first_cache_files = countFiles(dataPath)
+ val res1 = spark.sql(s"cache data async select * from
lineitem_mergetree_hdfs").collect()
+ assertResult(true)(res1(0).getBoolean(0))
+ eventually(timeout(60.seconds), interval(2.seconds)) {
+ assertResult(31)(metaPath.list().length)
+ assert(countFiles(dataPath) > first_cache_files)
+ }
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_hdfs
+ |WHERE
+ | l_shipdate >= date'1995-01-10'
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runSql(sqlStr)(
+ df => {
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assertResult(7898)(addFiles.map(_.rows).sum)
+ })
+ spark.sql("drop table lineitem_mergetree_hdfs purge")
+ }
+
+ test("test cache mergetree data with the path") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_shipdate)
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |TBLPROPERTIES (storage_policy='__hdfs_main',
+ | orderByKey='l_linenumber,l_orderkey')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_hdfs
+ | select * from lineitem a
+ | where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
+ |""".stripMargin)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ val dataPath = new File(HDFS_CACHE_PATH)
+ val initial_cache_files = countFiles(dataPath)
+
+ val res = spark
+ .sql(s"""
+ |cache data
+ | select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ | after l_shipdate AS OF '1995-01-10'
+ | CACHEPROPERTIES(storage_policy='__hdfs_main',
+ | aaa='ccc')""".stripMargin)
+ .collect()
+ assertResult(true)(res(0).getBoolean(0))
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ assertResult(true)(metaPath.exists() && metaPath.isDirectory)
+ assertResult(22)(metaPath.list().length)
+ assert(countFiles(dataPath) > initial_cache_files)
+ val first_cache_files = countFiles(dataPath)
+ val res1 = spark.sql(s"cache data select * from
lineitem_mergetree_hdfs").collect()
+ assertResult(true)(res1(0).getBoolean(0))
+ assertResult(31)(metaPath.list().length)
+ assert(countFiles(dataPath) > first_cache_files)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_hdfs
+ |WHERE
+ | l_shipdate >= date'1995-01-10'
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runSql(sqlStr)(
+ df => {
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assertResult(7898)(addFiles.map(_.rows).sum)
+ })
+ spark.sql("drop table lineitem_mergetree_hdfs purge")
+ }
+}
+// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
index 0f642dfa8..2fec68a49 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
@@ -65,7 +65,7 @@ class GlutenClickHouseNativeWriteTableSuite
// TODO: support default ANSI policy
.set("spark.sql.storeAssignmentPolicy", "legacy")
.set("spark.sql.warehouse.dir", getWarehouseDir)
- .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"debug")
+ .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
.setMaster("local[1]")
}
diff --git a/cpp-ch/local-engine/CMakeLists.txt
b/cpp-ch/local-engine/CMakeLists.txt
index 93ee4b821..2bf99a494 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -53,6 +53,7 @@ add_headers_and_sources(storages Storages/Output)
add_headers_and_sources(storages Storages/Serializations)
add_headers_and_sources(storages Storages/IO)
add_headers_and_sources(storages Storages/Mergetree)
+add_headers_and_sources(storages Storages/Cache)
add_headers_and_sources(common Common)
add_headers_and_sources(external External)
add_headers_and_sources(shuffle Shuffle)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 88867e290..12bf7ed59 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -71,6 +71,7 @@
#include <Common/LoggerExtend.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
+#include <Storages/Cache/CacheManager.h>
namespace DB
{
@@ -975,6 +976,8 @@ void BackendInitializerUtil::init(const std::string_view
plan)
// Init the table metadata cache map
StorageMergeTreeFactory::init_cache_map();
+ CacheManager::initialize(SerializedPlanParser::global_context);
+
std::call_once(
init_flag,
[&]
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
index d3b7d7b22..31994170f 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
@@ -231,4 +231,36 @@ RangesInDataParts
MergeTreeTable::extractRange(DataPartsVector parts_vector) con
});
return ranges_in_data_parts;
}
+
+bool sameColumns(const substrait::NamedStruct & left, const
substrait::NamedStruct & right)
+{
+ if (left.names_size() != right.names_size())
+ return false;
+ std::unordered_map<String, substrait::Type::KindCase> map;
+ for (size_t i = 0; i < left.names_size(); i++)
+ map.emplace(left.names(i), left.struct_().types(i).kind_case());
+ for (size_t i = 0; i < right.names_size(); i++)
+ {
+ if (!map.contains(right.names(i)) || map[right.names(i)] !=
right.struct_().types(i).kind_case())
+ return false;
+ }
+ return true;
+}
+
+bool MergeTreeTable::sameStructWith(const MergeTreeTable & other)
+{
+ return database == other.database &&
+ table == other.table &&
+ snapshot_id == other.snapshot_id &&
+ sameColumns(schema, other.schema) &&
+ order_by_key == other.order_by_key &&
+ low_card_key == other.low_card_key &&
+ minmax_index_key == other.minmax_index_key &&
+ bf_index_key == other.bf_index_key &&
+ set_index_key == other.set_index_key &&
+ primary_key == other.primary_key &&
+ relative_path == other.relative_path &&
+ absolute_path == other.absolute_path &&
+ table_configs.storage_policy == other.table_configs.storage_policy;
+}
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.h
b/cpp-ch/local-engine/Common/MergeTreeTool.h
index 0f0a1c1c7..fc312eba9 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.h
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.h
@@ -67,6 +67,7 @@ struct MergeTreeTable
std::vector<MergeTreePart> parts;
std::unordered_set<String> getPartNames() const;
RangesInDataParts extractRange(DataPartsVector parts_vector) const;
+ bool sameStructWith(const MergeTreeTable& other);
};
std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const
DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &);
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 967f8ba70..b1b024ce5 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -78,6 +78,7 @@ MergeTreeRelParser::parseStorage(const MergeTreeTable &
merge_tree_table, Contex
auto global_storage = StorageMergeTreeFactory::getStorage(
StorageID(merge_tree_table.database, merge_tree_table.table),
merge_tree_table.snapshot_id,
+ merge_tree_table,
[&]() -> CustomStorageMergeTreePtr
{
auto custom_storage_merge_tree =
std::make_shared<CustomStorageMergeTree>(
@@ -98,13 +99,6 @@ MergeTreeRelParser::parseStorage(const MergeTreeTable &
merge_tree_table, Contex
return global_storage;
}
-CustomStorageMergeTreePtr
-MergeTreeRelParser::parseStorage(const substrait::ReadRel::ExtensionTable &
extension_table, ContextMutablePtr context)
-{
- auto merge_tree_table = parseMergeTreeTable(extension_table);
- return parseStorage(merge_tree_table, context, true);
-}
-
CustomStorageMergeTreePtr
MergeTreeRelParser::copyToDefaultPolicyStorage(MergeTreeTable
merge_tree_table, ContextMutablePtr context)
{
@@ -131,7 +125,9 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const
substrait::ReadRel::ExtensionTable & extension_table)
{
auto merge_tree_table = parseMergeTreeTable(extension_table);
- auto storage = parseStorage(extension_table, global_context);
+ // ignore snapshot id for query
+ merge_tree_table.snapshot_id = "";
+ auto storage = parseStorage(merge_tree_table, global_context, true);
DB::Block input;
if (rel.has_base_schema() && rel.base_schema().names_size())
@@ -392,6 +388,8 @@ String MergeTreeRelParser::filterRangesOnDriver(const
substrait::ReadRel & read_
google::protobuf::StringValue table;
table.ParseFromString(read_rel.advanced_extension().enhancement().value());
auto merge_tree_table = parseMergeTreeTableString(table.value());
+ // ignore snapshot id for query
+ merge_tree_table.snapshot_id = "";
auto custom_storage_mergetree = parseStorage(merge_tree_table,
global_context, true);
auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
index 1c9ea736c..b26239dc4 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
@@ -40,8 +40,6 @@ using namespace DB;
class MergeTreeRelParser : public RelParser
{
public:
- static CustomStorageMergeTreePtr
- parseStorage(const substrait::ReadRel::ExtensionTable & extension_table,
ContextMutablePtr context);
static CustomStorageMergeTreePtr parseStorage(
const MergeTreeTable & merge_tree_table, ContextMutablePtr context,
bool restore = false);
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
new file mode 100644
index 000000000..d2c7b0681
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CacheManager.h"
+
+#include <Core/Settings.h>
+#include <Disks/IStoragePolicy.h>
+#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
+#include <Interpreters/Context.h>
+#include <Interpreters/Cache/FileCacheFactory.h>
+#include <Storages/Mergetree/MetaDataHelper.h>
+#include <Common/ThreadPool.h>
+#include <Parser/MergeTreeRelParser.h>
+#include <Processors/Executors/PipelineExecutor.h>
+#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
+#include <Processors/Sinks/NullSink.h>
+#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Common/Logger.h>
+#include <Common/logger_useful.h>
+#include <ranges>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int INVALID_STATE;
+}
+}
+
+namespace CurrentMetrics
+{
+extern const Metric LocalThread;
+extern const Metric LocalThreadActive;
+extern const Metric LocalThreadScheduled;
+}
+
+namespace local_engine
+{
+CacheManager & CacheManager::instance()
+{
+ static CacheManager cache_manager;
+ return cache_manager;
+}
+
+void CacheManager::initialize(DB::ContextMutablePtr context_)
+{
+ auto & manager = instance();
+ manager.context = context_;
+ manager.thread_pool = std::make_unique<ThreadPool>(
+ CurrentMetrics::LocalThread,
+ CurrentMetrics::LocalThreadActive,
+ CurrentMetrics::LocalThreadScheduled,
+ manager.context->getConfigRef().getInt("cache_sync_max_threads", 10),
+ 0,
+ 0);
+}
+
+struct CacheJobContext
+{
+ MergeTreeTable table;
+};
+
+void CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart&
part, const std::unordered_set<String> & columns, std::shared_ptr<std::latch>
latch)
+{
+ CacheJobContext job_context{table};
+ job_context.table.parts.clear();
+ job_context.table.parts.push_back(part);
+ job_context.table.snapshot_id = "";
+ auto job = [job_detail = job_context, context = this->context,
read_columns = columns, latch = latch]()
+ {
+ try
+ {
+ SCOPE_EXIT({ if (latch) latch->count_down();});
+ auto storage = MergeTreeRelParser::parseStorage(job_detail.table,
context, true);
+ auto storage_snapshot =
std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
+ NamesAndTypesList names_and_types_list;
+ auto meta_columns = storage->getInMemoryMetadata().getColumns();
+ for (const auto & column : meta_columns)
+ {
+ if (read_columns.contains(column.name))
+
names_and_types_list.push_back(NameAndTypePair(column.name, column.type));
+ }
+ auto query_info = buildQueryInfo(names_and_types_list);
+ std::vector<DataPartPtr> selected_parts
+ =
StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "",
{job_detail.table.parts.front().name});
+ auto read_step = storage->reader.readFromParts(
+ selected_parts,
+ /* alter_conversions = */
+ {},
+ names_and_types_list.getNames(),
+ storage_snapshot,
+ *query_info,
+ context,
+ context->getSettingsRef().max_block_size,
+ 1);
+ QueryPlan plan;
+ plan.addStep(std::move(read_step));
+ auto pipeline_builder = plan.buildQueryPipeline({}, {});
+ auto pipeline =
QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder.get()));
+ PullingPipelineExecutor executor(pipeline);
+ while (true)
+ {
+ Chunk chunk;
+ if (!executor.pull(chunk))
+ break;
+ }
+ LOG_INFO(getLogger("CacheManager"), "Load cache of table {}.{}
part {} success.", job_detail.table.database, job_detail.table.table,
job_detail.table.parts.front().name);
+ }
+ catch (std::exception& e)
+ {
+ LOG_ERROR(getLogger("CacheManager"), "Load cache of table {}.{}
part {} failed.\n {}", job_detail.table.database, job_detail.table.table,
job_detail.table.parts.front().name, e.what());
+ }
+ };
+ LOG_INFO(getLogger("CacheManager"), "Loading cache of table {}.{} part
{}", job_context.table.database, job_context.table.table,
job_context.table.parts.front().name);
+ thread_pool->scheduleOrThrowOnError(std::move(job));
+}
+
+void CacheManager::cacheParts(const String& table_def, const
std::unordered_set<String>& columns, bool async)
+{
+ auto table = parseMergeTreeTableString(table_def);
+ std::shared_ptr<std::latch> latch = nullptr;
+ if (!async) latch = std::make_shared<std::latch>(table.parts.size());
+ for (const auto & part : table.parts)
+ {
+ cachePart(table, part, columns, latch);
+ }
+ if (latch)
+ latch->wait();
+}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
new file mode 100644
index 000000000..a303b7b7f
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <Disks/IDisk.h>
+#include <latch>
+
+
+namespace local_engine
+{
+struct MergeTreePart;
+struct MergeTreeTable;
+/***
+ * Manage the cache of the MergeTree, mainly including meta.bin, data.bin,
metadata.gluten
+ */
+class CacheManager {
+public:
+ static CacheManager & instance();
+ static void initialize(DB::ContextMutablePtr context);
+ void cachePart(const MergeTreeTable& table, const MergeTreePart& part,
const std::unordered_set<String>& columns, std::shared_ptr<std::latch> latch =
nullptr);
+ void cacheParts(const String& table_def, const std::unordered_set<String>&
columns, bool async = true);
+private:
+ CacheManager() = default;
+
+ std::unique_ptr<ThreadPool> thread_pool;
+ DB::ContextMutablePtr context;
+ std::unordered_map<String, DB::DiskPtr> policy_to_disk;
+ std::unordered_map<DB::DiskPtr, DB::DiskPtr> disk_to_metadisk;
+ std::unordered_map<String, DB::FileCachePtr> policy_to_cache;
+};
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/CustomMergeTreeSink.cpp
b/cpp-ch/local-engine/Storages/CustomMergeTreeSink.cpp
deleted file mode 100644
index 4ec946c94..000000000
--- a/cpp-ch/local-engine/Storages/CustomMergeTreeSink.cpp
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "CustomMergeTreeSink.h"
-
-void local_engine::CustomMergeTreeSink::consume(Chunk chunk)
-{
- auto block =
metadata_snapshot->getSampleBlock().cloneWithColumns(chunk.detachColumns());
- DB::BlockWithPartition block_with_partition(Block(block), DB::Row{});
- auto part = storage.writer.writeTempPart(block_with_partition,
metadata_snapshot, context);
- MergeTreeData::Transaction transaction(storage, NO_TRANSACTION_RAW);
- {
- auto lock = storage.lockParts();
- storage.renameTempPartAndAdd(part.part, transaction, lock);
- transaction.commit(&lock);
- }
-}
diff --git a/cpp-ch/local-engine/Storages/CustomMergeTreeSink.h
b/cpp-ch/local-engine/Storages/CustomMergeTreeSink.h
deleted file mode 100644
index acb9702c3..000000000
--- a/cpp-ch/local-engine/Storages/CustomMergeTreeSink.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <Processors/ISink.h>
-#include <Storages/MergeTree/MergeTreeDataWriter.h>
-#include <Storages/StorageInMemoryMetadata.h>
-#include "CustomStorageMergeTree.h"
-
-namespace local_engine
-{
-class CustomMergeTreeSink : public ISink
-{
-public:
- CustomMergeTreeSink(CustomStorageMergeTree & storage_, const
StorageMetadataPtr metadata_snapshot_, ContextPtr context_)
- : ISink(metadata_snapshot_->getSampleBlock()), storage(storage_),
metadata_snapshot(metadata_snapshot_), context(context_)
- {
- }
-
- String getName() const override { return "CustomMergeTreeSink"; }
- void consume(Chunk chunk) override;
-
-private:
- CustomStorageMergeTree & storage;
- StorageMetadataPtr metadata_snapshot;
- ContextPtr context;
-};
-
-}
diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
index c59d6ddb4..3f7aac872 100644
--- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
+++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
@@ -18,6 +18,9 @@
#include <Common/GlutenConfig.h>
+#include <Common/MergeTreeTool.h>
+#include <Storages/CustomStorageMergeTree.h>
+
namespace local_engine
{
@@ -55,14 +58,24 @@ void StorageMergeTreeFactory::freeStorage(const StorageID &
id, const String & s
}
}
+
CustomStorageMergeTreePtr
-StorageMergeTreeFactory::getStorage(StorageID id, const String & snapshot_id,
std::function<CustomStorageMergeTreePtr()> creator)
+StorageMergeTreeFactory::getStorage(const StorageID& id, const String &
snapshot_id, MergeTreeTable merge_tree_table,
std::function<CustomStorageMergeTreePtr()> creator)
{
auto table_name = getTableName(id, snapshot_id);
std::lock_guard lock(storage_map_mutex);
+
+ merge_tree_table.parts.clear();
+ if (storage_map->has(table_name) &&
!storage_map->get(table_name)->second.sameStructWith(merge_tree_table))
+ {
+ freeStorage(id);
+ std::lock_guard lock_datapart(datapart_mutex);
+ if (datapart_map->has(table_name))
+ datapart_map->remove(table_name);
+ }
if (!storage_map->has(table_name))
- storage_map->add(table_name, creator());
- return *(storage_map->get(table_name));
+ storage_map->add(table_name, {creator(), merge_tree_table});
+ return storage_map->get(table_name)->first;
}
DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID &
id, const String & snapshot_id, std::unordered_set<String> part_name)
@@ -96,7 +109,7 @@ DataPartsVector
StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i
CustomStorageMergeTreePtr storage_merge_tree;
{
std::lock_guard storage_lock(storage_map_mutex);
- storage_merge_tree = *(storage_map->get(table_name));
+ storage_merge_tree = storage_map->get(table_name)->first;
}
auto missing_parts =
storage_merge_tree->loadDataPartsWithNames(missing_names);
for (const auto & part : missing_parts)
@@ -108,9 +121,9 @@ DataPartsVector
StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i
return res;
}
// will be inited in native init phase
-std::unique_ptr<Poco::LRUCache<std::string, CustomStorageMergeTreePtr>>
StorageMergeTreeFactory::storage_map = nullptr;
+std::unique_ptr<Poco::LRUCache<std::string,
std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>>
StorageMergeTreeFactory::storage_map = nullptr;
std::unique_ptr<Poco::LRUCache<std::string,
std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>>
StorageMergeTreeFactory::datapart_map = nullptr;
-std::mutex StorageMergeTreeFactory::storage_map_mutex;
-std::mutex StorageMergeTreeFactory::datapart_mutex;
+std::recursive_mutex StorageMergeTreeFactory::storage_map_mutex;
+std::recursive_mutex StorageMergeTreeFactory::datapart_mutex;
}
diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
index f372175bb..3fa8c6285 100644
--- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
+++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
@@ -16,11 +16,13 @@
*/
#pragma once
#include <Common/GlutenConfig.h>
+#include <Common/MergeTreeTool.h>
#include <Poco/LRUCache.h>
#include <Parser/SerializedPlanParser.h>
#include <Storages/CustomStorageMergeTree.h>
#include <Interpreters/MergeTreeTransaction.h>
+
namespace local_engine
{
using CustomStorageMergeTreePtr = std::shared_ptr<CustomStorageMergeTree>;
@@ -31,7 +33,7 @@ public:
static StorageMergeTreeFactory & instance();
static void freeStorage(const StorageID & id, const String & snapshot_id =
"");
static CustomStorageMergeTreePtr
- getStorage(StorageID id, const String & snapshot_id,
std::function<CustomStorageMergeTreePtr()> creator);
+ getStorage(const StorageID& id, const String & snapshot_id, MergeTreeTable
merge_tree_table, std::function<CustomStorageMergeTreePtr()> creator);
static DataPartsVector getDataPartsByNames(const StorageID & id, const
String & snapshot_id, std::unordered_set<String> part_name);
static void init_cache_map()
{
@@ -39,7 +41,7 @@ public:
auto & storage_map_v = storage_map;
if (!storage_map_v)
{
- storage_map_v = std::make_unique<Poco::LRUCache<std::string,
CustomStorageMergeTreePtr>>(config.table_metadata_cache_max_count);
+ storage_map_v = std::make_unique<Poco::LRUCache<std::string,
std::pair<CustomStorageMergeTreePtr,
MergeTreeTable>>>(config.table_metadata_cache_max_count);
}
else
{
@@ -65,10 +67,10 @@ public:
static String getTableName(const StorageID & id, const String &
snapshot_id);
private:
- static std::unique_ptr<Poco::LRUCache<std::string,
CustomStorageMergeTreePtr>> storage_map;
+ static std::unique_ptr<Poco::LRUCache<std::string,
std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>> storage_map;
static std::unique_ptr<Poco::LRUCache<std::string,
std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>> datapart_map;
- static std::mutex storage_map_mutex;
- static std::mutex datapart_mutex;
+ static std::recursive_mutex storage_map_mutex;
+ static std::recursive_mutex datapart_mutex;
};
struct TempStorageFreer
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 3a18771cd..8807a0f63 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -55,6 +55,7 @@
#include <Common/ExceptionUtils.h>
#include <Common/JNIUtils.h>
#include <Common/QueryContext.h>
+#include <Storages/Cache/CacheManager.h>
#ifdef __cplusplus
@@ -1252,6 +1253,21 @@ JNIEXPORT void
Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeExc
+JNIEXPORT void
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv *
env, jobject, jstring table_, jstring columns_, jboolean async_)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ auto table_def = jstring2string(env, table_);
+ auto columns = jstring2string(env, columns_);
+ Poco::StringTokenizer tokenizer(columns, ",");
+ std::unordered_set<String> column_set;
+ for (const auto & col : tokenizer)
+ {
+ column_set.insert(col);
+ }
+ local_engine::CacheManager::instance().cacheParts(table_def, column_set,
async_);
+ LOCAL_ENGINE_JNI_METHOD_END(env, );
+}
+
#ifdef __cplusplus
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 273443f64..8f24afae1 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -35,6 +35,7 @@ import
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
+import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode,
Partitioning}
@@ -468,6 +469,9 @@ trait SparkPlanExecApi {
def genInjectPostHocResolutionRules(): List[SparkSession =>
Rule[LogicalPlan]]
+ def genInjectExtendedParser(): List[(SparkSession, ParserInterface) =>
ParserInterface] =
+ List.empty
+
def genGetStructFieldTransformer(
substraitExprName: String,
childTransformer: ExpressionTransformer,
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala
index 0897f411f..f2ccf6e81 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala
@@ -23,6 +23,9 @@ import org.apache.spark.sql.SparkSessionExtensions
object OthersExtensionOverrides extends GlutenSparkExtensionsInjector {
override def inject(extensions: SparkSessionExtensions): Unit = {
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .genInjectExtendedParser()
+ .foreach(extensions.injectParser)
BackendsApiManager.getSparkPlanExecApiInstance
.genExtendedAnalyzers()
.foreach(extensions.injectResolutionRule)
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 4df9c63b3..681653409 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.extension
import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@@ -37,7 +38,12 @@ class GlutenSessionExtensionSuite extends
GlutenSQLTestsTrait {
assert(spark.sessionState.analyzer.postHocResolutionRules.contains(MyRule(spark)))
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
- assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+ if (BackendTestUtils.isCHBackendLoaded()) {
+ assert(
+
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+ } else {
+ assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+ }
assert(
spark.sessionState.functionRegistry
.lookupFunction(MyExtensions.myFunction._1)
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 4df9c63b3..681653409 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.extension
import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@@ -37,7 +38,12 @@ class GlutenSessionExtensionSuite extends
GlutenSQLTestsTrait {
assert(spark.sessionState.analyzer.postHocResolutionRules.contains(MyRule(spark)))
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
- assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+ if (BackendTestUtils.isCHBackendLoaded()) {
+ assert(
+
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+ } else {
+ assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+ }
assert(
spark.sessionState.functionRegistry
.lookupFunction(MyExtensions.myFunction._1)
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 4df9c63b3..681653409 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.extension
import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@@ -37,7 +38,12 @@ class GlutenSessionExtensionSuite extends
GlutenSQLTestsTrait {
assert(spark.sessionState.analyzer.postHocResolutionRules.contains(MyRule(spark)))
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
- assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+ if (BackendTestUtils.isCHBackendLoaded()) {
+ assert(
+
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+ } else {
+ assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+ }
assert(
spark.sessionState.functionRegistry
.lookupFunction(MyExtensions.myFunction._1)
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 4df9c63b3..681653409 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.extension
import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@@ -37,7 +38,12 @@ class GlutenSessionExtensionSuite extends
GlutenSQLTestsTrait {
assert(spark.sessionState.analyzer.postHocResolutionRules.contains(MyRule(spark)))
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
- assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+ if (BackendTestUtils.isCHBackendLoaded()) {
+ assert(
+
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+ } else {
+ assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
+ }
assert(
spark.sessionState.functionRegistry
.lookupFunction(MyExtensions.myFunction._1)
diff --git a/pom.xml b/pom.xml
index cbec5befb..6f6b2cd57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@
<hadoop.version>2.7.4</hadoop.version>
<slf4j.version>2.0.7</slf4j.version>
<log4j.version>2.20.0</log4j.version>
+ <antlr4.version>4.9.3</antlr4.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.prefix>spark-sql-columnar</project.prefix>
@@ -261,6 +262,7 @@
<delta.package.name>delta-core</delta.package.name>
<delta.version>2.0.1</delta.version>
<delta.binary.version>20</delta.binary.version>
+ <antlr4.version>4.8</antlr4.version>
</properties>
</profile>
<profile>
@@ -275,6 +277,7 @@
<delta.package.name>delta-core</delta.package.name>
<delta.version>2.3.0</delta.version>
<delta.binary.version>23</delta.binary.version>
+ <antlr4.version>4.8</antlr4.version>
</properties>
</profile>
<profile>
@@ -288,6 +291,7 @@
<delta.package.name>delta-core</delta.package.name>
<delta.version>2.4.0</delta.version>
<delta.binary.version>24</delta.binary.version>
+ <antlr4.version>4.9.3</antlr4.version>
</properties>
</profile>
<profile>
@@ -303,6 +307,7 @@
<delta.binary.version>32</delta.binary.version>
<fasterxml.version>2.15.1</fasterxml.version>
<hadoop.version>3.3.4</hadoop.version>
+ <antlr4.version>4.9.3</antlr4.version>
</properties>
<dependencies>
<dependency>
@@ -989,6 +994,11 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
</plugin>
+ <plugin>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-maven-plugin</artifactId>
+ <version>${antlr4.version}</version>
+ </plugin>
</plugins>
</pluginManagement>
<plugins>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]