This is an automated email from the ASF dual-hosted git repository.

taiyangli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b24a0674cc fix issue caused by incomplete line if there is only one 
line in last bzip2 block (#7715)
b24a0674cc is described below

commit b24a0674cc1ca841595df9be78bfd44d3ea56707
Author: 李扬 <[email protected]>
AuthorDate: Thu Oct 31 11:43:46 2024 +0800

    fix issue caused by incomplete line if there is only one line in last bzip2 
block (#7715)
---
 .../local-engine/IO/SplittableBzip2ReadBuffer.cpp  | 77 +++++++++++++++++-----
 cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h |  8 +++
 .../Storages/SubstraitSource/ReadBufferBuilder.cpp |  6 +-
 cpp-ch/local-engine/examples/CMakeLists.txt        |  5 ++
 .../examples/splittable_bzip2_read_buffer.cpp      | 57 ++++++++++++++++
 5 files changed, 133 insertions(+), 20 deletions(-)

diff --git a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp 
b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp
index 701918e38d..779e79416f 100644
--- a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp
+++ b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp
@@ -21,6 +21,8 @@
 #include <IO/SeekableReadBuffer.h>
 #include <IO/VarInt.h>
 #include <base/find_symbols.h>
+#include <Common/logger_useful.h>
+#include <iostream>
 
 
 namespace DB
@@ -188,6 +190,12 @@ SplittableBzip2ReadBuffer::SplittableBzip2ReadBuffer(
         adjusted_start = seekable->getPosition();
     }
     changeStateToProcessABlock();
+    LOG_DEBUG(
+        getLogger("SplittableBzip2ReadBuffer"),
+        "adjusted_start: {} first_block_need_special_process: {} 
last_block_need_special_process: {}",
+        *adjusted_start,
+        first_block_need_special_process,
+        last_block_need_special_process);
 }
 
 Int32 SplittableBzip2ReadBuffer::read(char * dest, size_t dest_size, size_t 
offs, size_t len)
@@ -208,6 +216,9 @@ Int32 SplittableBzip2ReadBuffer::read(char * dest, size_t 
dest_size, size_t offs
     {
         result = b;
         skipResult = 
skipToNextMarker(SplittableBzip2ReadBuffer::BLOCK_DELIMITER, 
DELIMITER_BIT_LENGTH);
+
+        // auto * seekable = dynamic_cast<SeekableReadBuffer*>(in.get());
+        // std::cout << "skipResult:" << skipResult << " position:" << 
seekable->getPosition() << " b:" << b << std::endl;
         changeStateToProcessABlock();
     }
     return result;
@@ -215,16 +226,25 @@ Int32 SplittableBzip2ReadBuffer::read(char * dest, size_t 
dest_size, size_t offs
 
 bool SplittableBzip2ReadBuffer::nextImpl()
 {
-    Position dest = internal_buffer.begin();
-    size_t dest_size = internal_buffer.size();
+    const Position dest = internal_buffer.begin();
+    const size_t dest_size = internal_buffer.size();
     size_t offset = 0;
+
+    if (last_block_need_special_process && !last_incomplete_line.empty())
+    {
+        /// If we have last incomplete line, append it to the beginning of 
internal buffer
+        memcpy(dest, last_incomplete_line.data(), last_incomplete_line.size());
+        offset += last_incomplete_line.size();
+        last_incomplete_line.clear();
+    }
+
     Int32 result;
     do
     {
         result = read(dest, dest_size, offset, dest_size - offset);
         if (result > 0)
             offset += result;
-        else if (result == BZip2Constants::END_OF_BLOCK && is_first_block && 
first_block_need_special_process)
+        else if (first_block_need_special_process && result == 
BZip2Constants::END_OF_BLOCK && is_first_block)
         {
             /// Special processing for the first block
             /// Notice that row delim could be \n (Unix) or \r\n (DOS/Windows) 
or \n\r (Mac OS Classic)
@@ -250,27 +270,50 @@ bool SplittableBzip2ReadBuffer::nextImpl()
                     offset = last_line_size;
                 }
             }
+            LOG_DEBUG(
+                getLogger("SplittableBzip2ReadBuffer"),
+                "Header of first block after special processed:{}",
+                std::string(dest, std::min(offset, 100UL)));
         }
-        else if (result == BZip2Constants::END_OF_STREAM && 
last_block_need_special_process)
+    } while (result != BZip2Constants::END_OF_STREAM && offset < dest_size);
+
+    if (last_block_need_special_process && offset)
+    {
+        /// Trim the last incomplete line from [dest, dest+offset), and record 
it in last_incomplete_line
+        bool reach_eof = (result == BZip2Constants::END_OF_STREAM);
+        if (reach_eof)
         {
-            /// Special processing for the last block
-            Position end = dest + offset;
-            auto * pos = find_last_symbols_or_null<'\n'>(dest, end);
+            LOG_DEBUG(
+                getLogger("SplittableBzip2ReadBuffer"),
+                "Header of last block before special processed:{}",
+                std::string(dest, std::min(offset, 100UL)));
+        }
 
-            if (!pos)
-            {
-                /// Only one incomplete row in the last block, discard it
+        /// Trim the last incomplete line from [dest, dest+offset), and record 
it in last_incomplete_line
+        Position end = dest + offset;
+        auto * pos = find_last_symbols_or_null<'\n'>(dest, end);
+        if (!pos)
+        {
+            if (reach_eof)
                 offset = 0;
-            }
             else
+                throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find row 
delimiter in working buffer with size:{}", offset);
+        }
+        else
+        {
+            /// Discard the last incomplete row(if has), and record it in 
last_incomplete_line
+            size_t old_offset = offset;
+            offset = pos - dest + 1;
+            if (pos + 1 < end && *(pos + 1) == '\r')
+                offset++;
+
+            if (!reach_eof)
             {
-                /// Discard the last incomplete row(if there is) in last block.
-                offset = pos - dest + 1;
-                if (pos + 1 < end && *(pos + 1) == '\r')
-                    offset++;
+                /// Only record last incomplete line when eof not reached
+                last_incomplete_line.assign(&dest[offset], old_offset - 
offset);
             }
         }
-    } while (result != BZip2Constants::END_OF_STREAM && offset < dest_size);
+    }
 
     if (offset)
     {
@@ -278,9 +321,7 @@ bool SplittableBzip2ReadBuffer::nextImpl()
         return true;
     }
     else
-    {
         return false;
-    }
 }
 
 Int32 SplittableBzip2ReadBuffer::read0()
diff --git a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h 
b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h
index e5702c7a7a..93a7ca64df 100644
--- a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h
+++ b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h
@@ -22,6 +22,7 @@
 #if USE_BZIP2
 #include <vector>
 #include <IO/CompressedReadBufferWrapper.h>
+#include <base/StringRef.h>
 
 namespace DB
 {
@@ -201,8 +202,15 @@ private:
     /// Case2:
     /// e.g. "line1 \n line2 \n line3 \n", all lines will be processed because 
we are pretty sure that line3 is a completed line.
     const bool last_block_need_special_process;
+
+    /// Whether the compressed block is the first one. It is used to apply 
special process for the first block.
     bool is_first_block;
 
+    /// Record the last incomplete line in the latest `nextImpl`
+    /// It is excluded from the output of latest `nextImpl` because we are not 
sure if it is completed in the lifetime of the current split until next 
`nextImpl`.
+    String last_incomplete_line;
+
+
     Int32 blockSize100k;
     STATE currentState;
     bool skipResult;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 1bd9a7a78c..2681ed7c1c 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -201,7 +201,8 @@ adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer> 
read_buffer, const s
         start_end.second);
 
     /// If read buffer doesn't support right bounded reads, wrap it with 
BoundedReadBuffer to enable right bounded reads.
-    if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) || 
dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
+    if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) || 
dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(read_buffer.get())
+        || dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
         read_buffer = 
std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
 
     read_buffer->seek(start_end.first, SEEK_SET);
@@ -724,7 +725,8 @@ 
ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const subst
         new_end);
 
     std::unique_ptr<SeekableReadBuffer> bounded_in;
-    if (dynamic_cast<DB::ReadBufferFromHDFS *>(seekable_in.get()) || 
dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
+    if (dynamic_cast<DB::ReadBufferFromHDFS *>(seekable_in.get()) || 
dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(seekable_in.get())
+        || dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
         bounded_in = 
std::make_unique<BoundedReadBuffer>(std::move(seekable_in));
     else
         bounded_in = std::move(seekable_in);
diff --git a/cpp-ch/local-engine/examples/CMakeLists.txt 
b/cpp-ch/local-engine/examples/CMakeLists.txt
index 03cd3bfe3f..04f6f7088f 100644
--- a/cpp-ch/local-engine/examples/CMakeLists.txt
+++ b/cpp-ch/local-engine/examples/CMakeLists.txt
@@ -16,3 +16,8 @@
 clickhouse_add_executable(signal_demo signal_demo.cpp)
 target_link_libraries(signal_demo PRIVATE gluten_clickhouse_backend_libs
                                           loggers)
+
+clickhouse_add_executable(splittable_bzip2_read_buffer
+                          splittable_bzip2_read_buffer.cpp)
+target_link_libraries(splittable_bzip2_read_buffer
+                      PRIVATE gluten_clickhouse_backend_libs loggers)
diff --git a/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp 
b/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp
new file mode 100644
index 0000000000..952a32bd68
--- /dev/null
+++ b/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+
+#if USE_BZIP2
+#include <IO/BoundedReadBuffer.h>
+#include <IO/ReadSettings.h>
+#include <IO/SplittableBzip2ReadBuffer.h>
+#include <IO/WriteBufferFromFile.h>
+#include <IO/copyData.h>
+#include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
+#include <Poco/Util/MapConfiguration.h>
+#include <Common/Config/ConfigProcessor.h>
+#include <Common/LoggerExtend.h>
+
+using namespace DB;
+
+int main()
+{
+    local_engine::LoggerExtend::initConsoleLogger("debug");
+
+    setenv("LIBHDFS3_CONF", "/path/to/hdfs/config", true); /// NOLINT
+    String hdfs_uri = "hdfs://cluster";
+    String hdfs_file_path = "/path/to/bzip2/file";
+    ConfigurationPtr config = Poco::AutoPtr(new 
Poco::Util::MapConfiguration());
+    ReadSettings read_settings;
+    std::unique_ptr<SeekableReadBuffer> in = 
std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_file_path, *config, 
read_settings, 0, true);
+
+    std::unique_ptr<SeekableReadBuffer> bounded_in = 
std::make_unique<BoundedReadBuffer>(std::move(in));
+    size_t start = 0;
+    size_t end = 268660564;
+    bounded_in->seek(start, SEEK_SET);
+    bounded_in->setReadUntilPosition(end);
+
+    std::unique_ptr<ReadBuffer> decompressed = 
std::make_unique<SplittableBzip2ReadBuffer>(std::move(bounded_in), false, true);
+
+    String download_path = "./download_" + std::to_string(start) + "_" + 
std::to_string(end) + ".txt";
+    WriteBufferFromFile write_buffer(download_path);
+    copyData(*decompressed, write_buffer);
+    return 0;
+}
+#endif


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to