This is an automated email from the ASF dual-hosted git repository.
taiyangli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b24a0674cc fix issue caused by incomplete line if there is only one
line in last bzip2 block (#7715)
b24a0674cc is described below
commit b24a0674cc1ca841595df9be78bfd44d3ea56707
Author: 李扬 <[email protected]>
AuthorDate: Thu Oct 31 11:43:46 2024 +0800
fix issue caused by incomplete line if there is only one line in last bzip2
block (#7715)
---
.../local-engine/IO/SplittableBzip2ReadBuffer.cpp | 77 +++++++++++++++++-----
cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h | 8 +++
.../Storages/SubstraitSource/ReadBufferBuilder.cpp | 6 +-
cpp-ch/local-engine/examples/CMakeLists.txt | 5 ++
.../examples/splittable_bzip2_read_buffer.cpp | 57 ++++++++++++++++
5 files changed, 133 insertions(+), 20 deletions(-)
diff --git a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp
b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp
index 701918e38d..779e79416f 100644
--- a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp
+++ b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp
@@ -21,6 +21,8 @@
#include <IO/SeekableReadBuffer.h>
#include <IO/VarInt.h>
#include <base/find_symbols.h>
+#include <Common/logger_useful.h>
+#include <iostream>
namespace DB
@@ -188,6 +190,12 @@ SplittableBzip2ReadBuffer::SplittableBzip2ReadBuffer(
adjusted_start = seekable->getPosition();
}
changeStateToProcessABlock();
+ LOG_DEBUG(
+ getLogger("SplittableBzip2ReadBuffer"),
+ "adjusted_start: {} first_block_need_special_process: {}
last_block_need_special_process: {}",
+ *adjusted_start,
+ first_block_need_special_process,
+ last_block_need_special_process);
}
Int32 SplittableBzip2ReadBuffer::read(char * dest, size_t dest_size, size_t
offs, size_t len)
@@ -208,6 +216,9 @@ Int32 SplittableBzip2ReadBuffer::read(char * dest, size_t
dest_size, size_t offs
{
result = b;
skipResult =
skipToNextMarker(SplittableBzip2ReadBuffer::BLOCK_DELIMITER,
DELIMITER_BIT_LENGTH);
+
+ // auto * seekable = dynamic_cast<SeekableReadBuffer*>(in.get());
+ // std::cout << "skipResult:" << skipResult << " position:" <<
seekable->getPosition() << " b:" << b << std::endl;
changeStateToProcessABlock();
}
return result;
@@ -215,16 +226,25 @@ Int32 SplittableBzip2ReadBuffer::read(char * dest, size_t
dest_size, size_t offs
bool SplittableBzip2ReadBuffer::nextImpl()
{
- Position dest = internal_buffer.begin();
- size_t dest_size = internal_buffer.size();
+ const Position dest = internal_buffer.begin();
+ const size_t dest_size = internal_buffer.size();
size_t offset = 0;
+
+ if (last_block_need_special_process && !last_incomplete_line.empty())
+ {
+ /// If we have last incomplete line, append it to the beginning of
internal buffer
+ memcpy(dest, last_incomplete_line.data(), last_incomplete_line.size());
+ offset += last_incomplete_line.size();
+ last_incomplete_line.clear();
+ }
+
Int32 result;
do
{
result = read(dest, dest_size, offset, dest_size - offset);
if (result > 0)
offset += result;
- else if (result == BZip2Constants::END_OF_BLOCK && is_first_block &&
first_block_need_special_process)
+ else if (first_block_need_special_process && result ==
BZip2Constants::END_OF_BLOCK && is_first_block)
{
/// Special processing for the first block
/// Notice that row delim could be \n (Unix) or \r\n (DOS/Windows)
or \n\r (Mac OS Classic)
@@ -250,27 +270,50 @@ bool SplittableBzip2ReadBuffer::nextImpl()
offset = last_line_size;
}
}
+ LOG_DEBUG(
+ getLogger("SplittableBzip2ReadBuffer"),
+ "Header of first block after special processed:{}",
+ std::string(dest, std::min(offset, 100UL)));
}
- else if (result == BZip2Constants::END_OF_STREAM &&
last_block_need_special_process)
+ } while (result != BZip2Constants::END_OF_STREAM && offset < dest_size);
+
+ if (last_block_need_special_process && offset)
+ {
+ /// Trim the last incomplete line from [dest, dest+offset), and record
it in last_incomplete_line
+ bool reach_eof = (result == BZip2Constants::END_OF_STREAM);
+ if (reach_eof)
{
- /// Special processing for the last block
- Position end = dest + offset;
- auto * pos = find_last_symbols_or_null<'\n'>(dest, end);
+ LOG_DEBUG(
+ getLogger("SplittableBzip2ReadBuffer"),
+ "Header of last block before special processed:{}",
+ std::string(dest, std::min(offset, 100UL)));
+ }
- if (!pos)
- {
- /// Only one incomplete row in the last block, discard it
+ /// Trim the last incomplete line from [dest, dest+offset), and record
it in last_incomplete_line
+ Position end = dest + offset;
+ auto * pos = find_last_symbols_or_null<'\n'>(dest, end);
+ if (!pos)
+ {
+ if (reach_eof)
offset = 0;
- }
else
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find row
delimiter in working buffer with size:{}", offset);
+ }
+ else
+ {
+ /// Discard the last incomplete row(if has), and record it in
last_incomplete_line
+ size_t old_offset = offset;
+ offset = pos - dest + 1;
+ if (pos + 1 < end && *(pos + 1) == '\r')
+ offset++;
+
+ if (!reach_eof)
{
- /// Discard the last incomplete row(if there is) in last block.
- offset = pos - dest + 1;
- if (pos + 1 < end && *(pos + 1) == '\r')
- offset++;
+ /// Only record last incomplete line when eof not reached
+ last_incomplete_line.assign(&dest[offset], old_offset -
offset);
}
}
- } while (result != BZip2Constants::END_OF_STREAM && offset < dest_size);
+ }
if (offset)
{
@@ -278,9 +321,7 @@ bool SplittableBzip2ReadBuffer::nextImpl()
return true;
}
else
- {
return false;
- }
}
Int32 SplittableBzip2ReadBuffer::read0()
diff --git a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h
b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h
index e5702c7a7a..93a7ca64df 100644
--- a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h
+++ b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h
@@ -22,6 +22,7 @@
#if USE_BZIP2
#include <vector>
#include <IO/CompressedReadBufferWrapper.h>
+#include <base/StringRef.h>
namespace DB
{
@@ -201,8 +202,15 @@ private:
/// Case2:
/// e.g. "line1 \n line2 \n line3 \n", all lines will be processed because
we are pretty sure that line3 is a completed line.
const bool last_block_need_special_process;
+
+ /// Whether the compressed block is the first one. It is used to apply
special process for the first block.
bool is_first_block;
+ /// Record the last incomplete line in the latest `nextImpl`
+ /// It is excluded from the output of latest `nextImpl` because we are not
sure if it is completed in the lifetime of the current split until next
`nextImpl`.
+ String last_incomplete_line;
+
+
Int32 blockSize100k;
STATE currentState;
bool skipResult;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 1bd9a7a78c..2681ed7c1c 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -201,7 +201,8 @@ adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer>
read_buffer, const s
start_end.second);
/// If read buffer doesn't support right bounded reads, wrap it with
BoundedReadBuffer to enable right bounded reads.
- if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) ||
dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
+ if (dynamic_cast<DB::ReadBufferFromHDFS *>(read_buffer.get()) ||
dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(read_buffer.get())
+ || dynamic_cast<DB::ReadBufferFromFile *>(read_buffer.get()))
read_buffer =
std::make_unique<DB::BoundedReadBuffer>(std::move(read_buffer));
read_buffer->seek(start_end.first, SEEK_SET);
@@ -724,7 +725,8 @@
ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const subst
new_end);
std::unique_ptr<SeekableReadBuffer> bounded_in;
- if (dynamic_cast<DB::ReadBufferFromHDFS *>(seekable_in.get()) ||
dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
+ if (dynamic_cast<DB::ReadBufferFromHDFS *>(seekable_in.get()) ||
dynamic_cast<DB::AsynchronousReadBufferFromHDFS *>(seekable_in.get())
+ || dynamic_cast<DB::ReadBufferFromFile *>(seekable_in.get()))
bounded_in =
std::make_unique<BoundedReadBuffer>(std::move(seekable_in));
else
bounded_in = std::move(seekable_in);
diff --git a/cpp-ch/local-engine/examples/CMakeLists.txt
b/cpp-ch/local-engine/examples/CMakeLists.txt
index 03cd3bfe3f..04f6f7088f 100644
--- a/cpp-ch/local-engine/examples/CMakeLists.txt
+++ b/cpp-ch/local-engine/examples/CMakeLists.txt
@@ -16,3 +16,8 @@
clickhouse_add_executable(signal_demo signal_demo.cpp)
target_link_libraries(signal_demo PRIVATE gluten_clickhouse_backend_libs
loggers)
+
+clickhouse_add_executable(splittable_bzip2_read_buffer
+ splittable_bzip2_read_buffer.cpp)
+target_link_libraries(splittable_bzip2_read_buffer
+ PRIVATE gluten_clickhouse_backend_libs loggers)
diff --git a/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp
b/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp
new file mode 100644
index 0000000000..952a32bd68
--- /dev/null
+++ b/cpp-ch/local-engine/examples/splittable_bzip2_read_buffer.cpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+
+#if USE_BZIP2
+#include <IO/BoundedReadBuffer.h>
+#include <IO/ReadSettings.h>
+#include <IO/SplittableBzip2ReadBuffer.h>
+#include <IO/WriteBufferFromFile.h>
+#include <IO/copyData.h>
+#include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
+#include <Poco/Util/MapConfiguration.h>
+#include <Common/Config/ConfigProcessor.h>
+#include <Common/LoggerExtend.h>
+
+using namespace DB;
+
+int main()
+{
+ local_engine::LoggerExtend::initConsoleLogger("debug");
+
+ setenv("LIBHDFS3_CONF", "/path/to/hdfs/config", true); /// NOLINT
+ String hdfs_uri = "hdfs://cluster";
+ String hdfs_file_path = "/path/to/bzip2/file";
+ ConfigurationPtr config = Poco::AutoPtr(new
Poco::Util::MapConfiguration());
+ ReadSettings read_settings;
+ std::unique_ptr<SeekableReadBuffer> in =
std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_file_path, *config,
read_settings, 0, true);
+
+ std::unique_ptr<SeekableReadBuffer> bounded_in =
std::make_unique<BoundedReadBuffer>(std::move(in));
+ size_t start = 0;
+ size_t end = 268660564;
+ bounded_in->seek(start, SEEK_SET);
+ bounded_in->setReadUntilPosition(end);
+
+ std::unique_ptr<ReadBuffer> decompressed =
std::make_unique<SplittableBzip2ReadBuffer>(std::move(bounded_in), false, true);
+
+ String download_path = "./download_" + std::to_string(start) + "_" +
std::to_string(end) + ".txt";
+ WriteBufferFromFile write_buffer(download_path);
+ copyData(*decompressed, write_buffer);
+ return 0;
+}
+#endif
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]