This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 61ae6f6bd1 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241101) 
(#7762)
61ae6f6bd1 is described below

commit 61ae6f6bd143fca86e72f62e1588872c868ba941
Author: Kyligence Git <[email protected]>
AuthorDate: Fri Nov 1 07:19:42 2024 -0500

    [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241101) (#7762)
    
    * [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241101)
    
    * Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/71105
    
    * Fix assert issue  in debug build
    
    * using SPARK_DIR_NAME instead of sparkVersion
    
    * Add UT for https://github.com/ClickHouse/ClickHouse/pull/71105
    
    ---------
    
    Co-authored-by: kyligence-git <[email protected]>
    Co-authored-by: Chang Chen <[email protected]>
---
 .../GlutenClickHouseFileFormatSuite.scala          | 22 ++++++++++++++++++++++
 ...lutenClickHouseWholeStageTransformerSuite.scala | 14 +++++++-------
 .../GlutenClickHouseMergeTreeCacheDataSuite.scala  | 10 +++++-----
 cpp-ch/clickhouse.version                          |  4 ++--
 .../local-engine/Parser/SerializedPlanParser.cpp   |  5 ++---
 .../Storages/Output/WriteBufferBuilder.cpp         | 10 ++++++----
 6 files changed, 44 insertions(+), 21 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala
index e8ddbd12f1..88a34a786a 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala
@@ -1461,4 +1461,26 @@ class GlutenClickHouseFileFormatSuite
     spark.createDataFrame(data, schema).toDF().write.parquet(fileName)
     fileName
   }
+
+  /** TODO: fix the issue and test in spark 3.5 */
+  testSparkVersionLE33("write into hdfs") {
+
+    /**
+     * There is a bug in pipeline write to HDFS; when a pipeline returns 
column batch, it doesn't
+     * close the hdfs file, and hence the file is not flushed.HDFS file is 
closed when LocalExecutor
+     * is destroyed, but before that, the file moved by spark committer.
+     */
+    val tableName = "write_into_hdfs"
+    val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/"
+    val format = "parquet"
+    val sql =
+      s"""
+         | select *
+         | from $format.`$tablePath`
+         | where long_field > 30
+         |""".stripMargin
+    withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) {
+      testFileFormatBase(tablePath, format, sql, df => {})
+    }
+  }
 }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index b3e1bd21e9..0bd19dd971 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -35,23 +35,23 @@ class GlutenClickHouseWholeStageTransformerSuite extends 
WholeStageTransformerSu
   val DBL_RELAX_EPSILON: Double = Math.pow(10, -11)
   val FLT_EPSILON = 1.19209290e-07f
 
-  protected val sparkVersion: String = {
+  private val sparkVersion: String = {
     val version = SPARK_VERSION_SHORT.split("\\.")
     version(0) + "." + version(1)
   }
+  val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")
 
-  val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
-  val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
+  val S3_METADATA_PATH = s"/tmp/metadata/s3/$SPARK_DIR_NAME/"
+  val S3_CACHE_PATH = s"/tmp/s3_cache/$SPARK_DIR_NAME/"
   val S3_ENDPOINT = "s3://127.0.0.1:9000/"
   val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
-  val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")
   val BUCKET_NAME: String = SPARK_DIR_NAME
   val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
 
-  val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/"
-  val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/"
+  val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$SPARK_DIR_NAME/"
+  val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$SPARK_DIR_NAME/"
   val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
-  val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"
+  val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME"
 
   val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
   val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E"
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
index bf3be1e529..a85a9094d3 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -134,7 +134,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
               |                aaa='ccc')""".stripMargin)
       .collect()
     assertResult(true)(res(0).getBoolean(0))
-    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
     assertResult(true)(metaPath.exists() && metaPath.isDirectory)
     assertResult(22)(metaPath.list().length)
     assert(countFiles(dataPath) > initial_cache_files)
@@ -238,7 +238,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
               |                aaa='ccc')""".stripMargin)
       .collect()
     assertResult(true)(res(0).getBoolean(0))
-    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
     assertResult(true)(metaPath.exists() && metaPath.isDirectory)
     eventually(timeout(60.seconds), interval(2.seconds)) {
       assertResult(22)(metaPath.list().length)
@@ -346,7 +346,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
               |                aaa='ccc')""".stripMargin)
       .collect()
     assertResult(true)(res(0).getBoolean(0))
-    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
     assertResult(true)(metaPath.exists() && metaPath.isDirectory)
     assertResult(22)(metaPath.list().length)
     assert(countFiles(dataPath) > initial_cache_files)
@@ -439,7 +439,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
     val dataPath = new File(HDFS_CACHE_PATH)
     val initial_cache_files = countFiles(dataPath)
 
-    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
     val res1 = spark.sql(s"cache data select * from 
lineitem_mergetree_hdfs").collect()
     assertResult(true)(res1(0).getBoolean(0))
     assertResult(1)(metaPath.list().length)
@@ -539,7 +539,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
               |                aaa='ccc')""".stripMargin)
       .collect()
     assertResult(true)(res(0).getBoolean(0))
-    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
     assertResult(true)(metaPath.exists() && metaPath.isDirectory)
     assertResult(22)(metaPath.list().length)
     assert(countFiles(dataPath) > initial_cache_files)
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 6bdb05c332..445cd99068 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20241030
-CH_COMMIT=847cfa6237c
\ No newline at end of file
+CH_BRANCH=rebase_ch/20241101
+CH_COMMIT=7cd7bb8ece2
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 32ead10708..9799933b33 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -229,11 +229,10 @@ std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(const substr
 
 QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack)
 {
-    DB::QueryPlanPtr query_plan;
     auto rel_parser = 
RelParserFactory::instance().getBuilder(rel.rel_type_case())(parser_context);
 
     auto all_input_rels = rel_parser->getInputs(rel);
-    assert(all_input_rels.size() == 1 || all_input_rels.size() == 2);
+    assert(all_input_rels.size() == 0 || all_input_rels.size() == 1 || 
all_input_rels.size() == 2);
     std::vector<DB::QueryPlanPtr> input_query_plans;
     rel_stack.push_back(&rel);
     for (const auto * input_rel : all_input_rels)
@@ -276,7 +275,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const 
substrait::Rel & rel, std::list
         }
     }
 
-    query_plan = rel_parser->parse(input_query_plans, rel, rel_stack);
+    DB::QueryPlanPtr query_plan = rel_parser->parse(input_query_plans, rel, 
rel_stack);
     for (auto & extra_plan : rel_parser->extraPlans())
     {
         extra_plan_holder.push_back(std::move(extra_plan));
diff --git a/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp 
b/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp
index ea93480b76..6926b86a34 100644
--- a/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp
@@ -29,7 +29,7 @@ namespace DB
 {
 namespace ErrorCodes
 {
-    extern const int BAD_ARGUMENTS;
+extern const int BAD_ARGUMENTS;
 }
 }
 
@@ -78,14 +78,16 @@ public:
 
         auto builder = DB::createHDFSBuilder(new_file_uri, 
context->getConfigRef());
         auto fs = DB::createHDFSFS(builder.get());
-        auto first = new_file_uri.find('/', new_file_uri.find("//") + 2);
+        auto begin_of_path = new_file_uri.find('/', new_file_uri.find("//") + 
2);
         auto last = new_file_uri.find_last_of('/');
-        auto dir = new_file_uri.substr(first, last - first);
+        auto dir = new_file_uri.substr(begin_of_path, last - begin_of_path);
         if (hdfsCreateDirectory(fs.get(), dir.c_str()))
             throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot create 
dir for {} because {}", dir, std::string(hdfsGetLastError()));
 
+        const std::string hdfs_file_path = new_file_uri.substr(begin_of_path);
+        const std::string hdfs_uri_without_path = new_file_uri.substr(0, 
begin_of_path);
         DB::WriteSettings write_settings;
-        return std::make_unique<DB::WriteBufferFromHDFS>(new_file_uri, 
context->getConfigRef(), 0, write_settings);
+        return 
std::make_unique<DB::WriteBufferFromHDFS>(hdfs_uri_without_path, 
hdfs_file_path, context->getConfigRef(), 0, write_settings);
     }
 };
 #endif


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to