This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 61ae6f6bd1 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241101)
(#7762)
61ae6f6bd1 is described below
commit 61ae6f6bd143fca86e72f62e1588872c868ba941
Author: Kyligence Git <[email protected]>
AuthorDate: Fri Nov 1 07:19:42 2024 -0500
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20241101) (#7762)
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241101)
* Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/71105
* Fix assert issue in debug build
* using SPARK_DIR_NAME instead of sparkVersion
* Add UT for https://github.com/ClickHouse/ClickHouse/pull/71105
---------
Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
---
.../GlutenClickHouseFileFormatSuite.scala | 22 ++++++++++++++++++++++
...lutenClickHouseWholeStageTransformerSuite.scala | 14 +++++++-------
.../GlutenClickHouseMergeTreeCacheDataSuite.scala | 10 +++++-----
cpp-ch/clickhouse.version | 4 ++--
.../local-engine/Parser/SerializedPlanParser.cpp | 5 ++---
.../Storages/Output/WriteBufferBuilder.cpp | 10 ++++++----
6 files changed, 44 insertions(+), 21 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala
index e8ddbd12f1..88a34a786a 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala
@@ -1461,4 +1461,26 @@ class GlutenClickHouseFileFormatSuite
spark.createDataFrame(data, schema).toDF().write.parquet(fileName)
fileName
}
+
+ /** TODO: fix the issue and test in spark 3.5 */
+ testSparkVersionLE33("write into hdfs") {
+
+ /**
+ * There is a bug in pipeline write to HDFS; when a pipeline returns
column batch, it doesn't
+ * close the hdfs file, and hence the file is not flushed.HDFS file is
closed when LocalExecutor
+ * is destroyed, but before that, the file moved by spark committer.
+ */
+ val tableName = "write_into_hdfs"
+ val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/"
+ val format = "parquet"
+ val sql =
+ s"""
+ | select *
+ | from $format.`$tablePath`
+ | where long_field > 30
+ |""".stripMargin
+ withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) {
+ testFileFormatBase(tablePath, format, sql, df => {})
+ }
+ }
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index b3e1bd21e9..0bd19dd971 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -35,23 +35,23 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
val DBL_RELAX_EPSILON: Double = Math.pow(10, -11)
val FLT_EPSILON = 1.19209290e-07f
- protected val sparkVersion: String = {
+ private val sparkVersion: String = {
val version = SPARK_VERSION_SHORT.split("\\.")
version(0) + "." + version(1)
}
+ val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")
- val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
- val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
+ val S3_METADATA_PATH = s"/tmp/metadata/s3/$SPARK_DIR_NAME/"
+ val S3_CACHE_PATH = s"/tmp/s3_cache/$SPARK_DIR_NAME/"
val S3_ENDPOINT = "s3://127.0.0.1:9000/"
val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
- val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")
val BUCKET_NAME: String = SPARK_DIR_NAME
val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
- val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/"
- val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/"
+ val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$SPARK_DIR_NAME/"
+ val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$SPARK_DIR_NAME/"
val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
- val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"
+ val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME"
val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E"
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
index bf3be1e529..a85a9094d3 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -134,7 +134,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
- val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
@@ -238,7 +238,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
- val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
eventually(timeout(60.seconds), interval(2.seconds)) {
assertResult(22)(metaPath.list().length)
@@ -346,7 +346,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
- val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
@@ -439,7 +439,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
val dataPath = new File(HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)
- val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
val res1 = spark.sql(s"cache data select * from
lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
assertResult(1)(metaPath.list().length)
@@ -539,7 +539,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
- val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 6bdb05c332..445cd99068 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20241030
-CH_COMMIT=847cfa6237c
\ No newline at end of file
+CH_BRANCH=rebase_ch/20241101
+CH_COMMIT=7cd7bb8ece2
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 32ead10708..9799933b33 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -229,11 +229,10 @@ std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(const substr
QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel,
std::list<const substrait::Rel *> & rel_stack)
{
- DB::QueryPlanPtr query_plan;
auto rel_parser =
RelParserFactory::instance().getBuilder(rel.rel_type_case())(parser_context);
auto all_input_rels = rel_parser->getInputs(rel);
- assert(all_input_rels.size() == 1 || all_input_rels.size() == 2);
+ assert(all_input_rels.size() == 0 || all_input_rels.size() == 1 ||
all_input_rels.size() == 2);
std::vector<DB::QueryPlanPtr> input_query_plans;
rel_stack.push_back(&rel);
for (const auto * input_rel : all_input_rels)
@@ -276,7 +275,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const
substrait::Rel & rel, std::list
}
}
- query_plan = rel_parser->parse(input_query_plans, rel, rel_stack);
+ DB::QueryPlanPtr query_plan = rel_parser->parse(input_query_plans, rel,
rel_stack);
for (auto & extra_plan : rel_parser->extraPlans())
{
extra_plan_holder.push_back(std::move(extra_plan));
diff --git a/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp
b/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp
index ea93480b76..6926b86a34 100644
--- a/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp
@@ -29,7 +29,7 @@ namespace DB
{
namespace ErrorCodes
{
- extern const int BAD_ARGUMENTS;
+extern const int BAD_ARGUMENTS;
}
}
@@ -78,14 +78,16 @@ public:
auto builder = DB::createHDFSBuilder(new_file_uri,
context->getConfigRef());
auto fs = DB::createHDFSFS(builder.get());
- auto first = new_file_uri.find('/', new_file_uri.find("//") + 2);
+ auto begin_of_path = new_file_uri.find('/', new_file_uri.find("//") +
2);
auto last = new_file_uri.find_last_of('/');
- auto dir = new_file_uri.substr(first, last - first);
+ auto dir = new_file_uri.substr(begin_of_path, last - begin_of_path);
if (hdfsCreateDirectory(fs.get(), dir.c_str()))
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot create
dir for {} because {}", dir, std::string(hdfsGetLastError()));
+ const std::string hdfs_file_path = new_file_uri.substr(begin_of_path);
+ const std::string hdfs_uri_without_path = new_file_uri.substr(0,
begin_of_path);
DB::WriteSettings write_settings;
- return std::make_unique<DB::WriteBufferFromHDFS>(new_file_uri,
context->getConfigRef(), 0, write_settings);
+ return
std::make_unique<DB::WriteBufferFromHDFS>(hdfs_uri_without_path,
hdfs_file_path, context->getConfigRef(), 0, write_settings);
}
};
#endif
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]