This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0d32aeeaf6b24ee4b219e3251ad38c4e8938acb4
Author: HowardQin <[email protected]>
AuthorDate: Mon Feb 5 17:35:00 2024 +0800

    [improvement](load) Enable lzo & Remove dependency on Markus F.X.J. 
Oberhumer's lzo library (#30573)
    
    Issue Number: close #29406
    
    1. increase lzop version to 0x1040,
        I set to 0x1040 only for decompressing lzo files compressed by higher 
version of lzop,
            no change of decompressing logic,
            actully, 0x1040 should have "F_H_FILTER" feature,
            but it mainly for audio and image data, so we do not support it.
    2. use orc::lzoDecompress() instead of lzo1x_decompress_safe() to 
decompress lzo data
    3. use crc32c::Extend() instead of lzo_crc32()
    4. use olap_adler32() instead of lzo_adler32()
    5. thus, remove dependency of Markus F.X.J. Oberhumer's lzo library
    6. remove DORIS_WITH_LZO, so lzo file are supported by stream and broker 
load by default
    7. add some regression test
---
 be/CMakeLists.txt                                  |  8 ---
 be/cmake/thirdparty.cmake                          |  4 --
 be/src/exec/decompressor.cpp                       |  2 -
 be/src/exec/decompressor.h                         |  7 --
 be/src/exec/lzo_decompressor.cpp                   | 53 +++++++++++-----
 be/src/olap/utils.cpp                              | 11 +---
 be/src/pch/pch.h                                   |  4 --
 be/src/vec/exec/format/csv/csv_reader.cpp          |  1 +
 build.sh                                           |  5 --
 .../load_p0/stream_load/test_compress_type.out     |  2 +-
 .../load_p0/stream_load/test_compress_type.groovy  | 74 +++++++++++++++++++++-
 11 files changed, 112 insertions(+), 59 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index bba09ebc2a6..9b72bc61777 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -323,10 +323,6 @@ if (WITH_MYSQL)
     add_compile_options(-DDORIS_WITH_MYSQL)
 endif()
 
-if (WITH_LZO)
-    add_compile_options(-DDORIS_WITH_LZO)
-endif()
-
 # Enable memory tracker, which allows BE to limit the memory of tasks such as 
query, load,
 # and compaction,and observe the memory of BE through 
be_ip:http_port/MemTracker.
 # Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn 
off the memory tracker,
@@ -547,10 +543,6 @@ set(DORIS_DEPENDENCIES
     ${KRB5_LIBS}
 )
 
-if(WITH_LZO)
-    set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} lzo)
-endif()
-
 if (WITH_MYSQL)
     set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} mysql)
 endif()
diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake
index 5c07f79d6e4..3dca9252faa 100644
--- a/be/cmake/thirdparty.cmake
+++ b/be/cmake/thirdparty.cmake
@@ -70,10 +70,6 @@ add_thirdparty(lz4)
 add_thirdparty(thrift)
 add_thirdparty(thriftnb)
 
-if(WITH_LZO)
-    add_thirdparty(lzo LIBNAME "lib/liblzo2.a")
-endif()
-
 add_thirdparty(libevent LIBNAME "lib/libevent.a")
 add_thirdparty(libevent_pthreads LIBNAME "lib/libevent_pthreads.a")
 add_thirdparty(libbz2 LIBNAME "lib/libbz2.a")
diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp
index 1e7e3482234..c0dd7554942 100644
--- a/be/src/exec/decompressor.cpp
+++ b/be/src/exec/decompressor.cpp
@@ -50,11 +50,9 @@ Status Decompressor::create_decompressor(CompressType type, 
Decompressor** decom
     case CompressType::SNAPPYBLOCK:
         *decompressor = new SnappyBlockDecompressor();
         break;
-#ifdef DORIS_WITH_LZO
     case CompressType::LZOP:
         *decompressor = new LzopDecompressor();
         break;
-#endif
     default:
         return Status::InternalError("Unknown compress type: {}", type);
     }
diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h
index 713c5db2bf1..45653fd8b04 100644
--- a/be/src/exec/decompressor.h
+++ b/be/src/exec/decompressor.h
@@ -28,11 +28,6 @@
 
 #include <string>
 
-#ifdef DORIS_WITH_LZO
-#include <lzo/lzo1x.h>
-#include <lzo/lzoconf.h>
-#endif
-
 #include "common/status.h"
 
 namespace doris {
@@ -177,7 +172,6 @@ private:
     Status init() override;
 };
 
-#ifdef DORIS_WITH_LZO
 class LzopDecompressor : public Decompressor {
 public:
     ~LzopDecompressor() override = default;
@@ -271,6 +265,5 @@ private:
     const static uint64_t F_CRC32_D;
     const static uint64_t F_ADLER32_D;
 };
-#endif // DORIS_WITH_LZO
 
 } // namespace doris
diff --git a/be/src/exec/lzo_decompressor.cpp b/be/src/exec/lzo_decompressor.cpp
index a2af9e94fd0..c8cf0499508 100644
--- a/be/src/exec/lzo_decompressor.cpp
+++ b/be/src/exec/lzo_decompressor.cpp
@@ -16,15 +16,30 @@
 // under the License.
 
 #include "exec/decompressor.h"
+#include "olap/utils.h"
+#include "orc/Exceptions.hh"
+#include "util/crc32c.h"
+
+namespace orc {
+/**
+ * Decompress the bytes in to the output buffer.
+ * @param inputAddress the start of the input
+ * @param inputLimit one past the last byte of the input
+ * @param outputAddress the start of the output buffer
+ * @param outputLimit one past the last byte of the output buffer
+ * @result the number of bytes decompressed
+ */
+uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* 
outputAddress,
+                       char* outputLimit);
+} // namespace orc
 
 namespace doris {
 
-#ifdef DORIS_WITH_LZO
 // Lzop
 const uint8_t LzopDecompressor::LZOP_MAGIC[9] = {0x89, 0x4c, 0x5a, 0x4f, 0x00,
                                                  0x0d, 0x0a, 0x1a, 0x0a};
 
-const uint64_t LzopDecompressor::LZOP_VERSION = 0x1030;
+const uint64_t LzopDecompressor::LZOP_VERSION = 0x1040;
 const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100;
 // magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1)
 // + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1)
@@ -153,19 +168,18 @@ Status LzopDecompressor::decompress(uint8_t* input, 
size_t input_len, size_t* in
         memmove(output, ptr, compressed_size);
         ptr += compressed_size;
     } else {
-        // decompress
-        *decompressed_len = uncompressed_size;
-        int ret = lzo1x_decompress_safe(ptr, compressed_size, output,
-                                        
reinterpret_cast<lzo_uint*>(&uncompressed_size), nullptr);
-        if (ret != LZO_E_OK || uncompressed_size != *decompressed_len) {
+        try {
+            *decompressed_len =
+                    orc::lzoDecompress((const char*)ptr, (const char*)(ptr + 
compressed_size),
+                                       (char*)output, (char*)(output + 
uncompressed_size));
+        } catch (const orc::ParseError& err) {
             std::stringstream ss;
-            ss << "Lzo decompression failed with ret: " << ret
-               << " decompressed len: " << uncompressed_size << " expected: " 
<< *decompressed_len;
+            ss << "Lzo decompression failed: " << err.what();
             return Status::InternalError(ss.str());
         }
 
         RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, 
"decompressed", out_checksum,
-                                 output, uncompressed_size));
+                                 output, *decompressed_len));
         ptr += compressed_size;
     }
 
@@ -260,8 +274,14 @@ Status LzopDecompressor::parse_header_info(uint8_t* input, 
size_t input_len,
         return Status::InternalError(ss.str());
     }
 
-    // 6. skip level
-    ++ptr;
+    // 6. unsupported level: 7, 8, 9
+    uint8_t level;
+    ptr = get_uint8(ptr, &level);
+    if (level > 6) {
+        std::stringstream ss;
+        ss << "unsupported lzo level: " << level;
+        return Status::InternalError(ss.str());
+    }
 
     // 7. flags
     uint32_t flags;
@@ -305,10 +325,10 @@ Status LzopDecompressor::parse_header_info(uint8_t* 
input, size_t input_len,
     uint32_t computed_checksum;
     if (_header_info.header_checksum_type == CHECK_CRC32) {
         computed_checksum = CRC32_INIT_VALUE;
-        computed_checksum = lzo_crc32(computed_checksum, header, cur - header);
+        computed_checksum = crc32c::Extend(computed_checksum, (const 
char*)header, cur - header);
     } else {
         computed_checksum = ADLER32_INIT_VALUE;
-        computed_checksum = lzo_adler32(computed_checksum, header, cur - 
header);
+        computed_checksum = olap_adler32(computed_checksum, (const 
char*)header, cur - header);
     }
 
     if (computed_checksum != expected_checksum) {
@@ -354,10 +374,10 @@ Status LzopDecompressor::checksum(LzoChecksum type, const 
std::string& source, u
     case CHECK_NONE:
         return Status::OK();
     case CHECK_CRC32:
-        computed_checksum = lzo_crc32(CRC32_INIT_VALUE, ptr, len);
+        computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const char*)ptr, 
len);
         break;
     case CHECK_ADLER:
-        computed_checksum = lzo_adler32(ADLER32_INIT_VALUE, ptr, len);
+        computed_checksum = olap_adler32(ADLER32_INIT_VALUE, (const char*)ptr, 
len);
         break;
     default:
         std::stringstream ss;
@@ -387,6 +407,5 @@ std::string LzopDecompressor::debug_info() {
        << " output checksum type: " << _header_info.output_checksum_type;
     return ss.str();
 }
-#endif // DORIS_WITH_LZO
 
 } // namespace doris
diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp
index e6958343c17..49c1d53ae35 100644
--- a/be/src/olap/utils.cpp
+++ b/be/src/olap/utils.cpp
@@ -19,6 +19,7 @@
 
 // IWYU pragma: no_include <bthread/errno.h>
 #include <errno.h> // IWYU pragma: keep
+#include <stdarg.h>
 #include <time.h>
 #include <unistd.h>
 #include <zconf.h>
@@ -33,21 +34,13 @@
 #include <string>
 #include <vector>
 
-#include "util/sse_util.hpp"
-
-#ifdef DORIS_WITH_LZO
-#include <lzo/lzo1c.h>
-#include <lzo/lzo1x.h>
-#endif
-
-#include <stdarg.h>
-
 #include "common/logging.h"
 #include "common/status.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
 #include "olap/olap_common.h"
+#include "util/sse_util.hpp"
 #include "util/string_parser.hpp"
 #include "vec/runtime/ipv4_value.h"
 #include "vec/runtime/ipv6_value.h"
diff --git a/be/src/pch/pch.h b/be/src/pch/pch.h
index 34428d739d0..8c7ef3ea3a9 100644
--- a/be/src/pch/pch.h
+++ b/be/src/pch/pch.h
@@ -255,10 +255,6 @@
 #include <lz4/lz4.h>
 #include <lz4/lz4frame.h>
 
-// lzo headers
-#include <lzo/lzo1x.h>
-#include <lzo/lzoconf.h>
-
 // mysql headers
 #include <mysql/mysql.h>
 
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 9f8c01b10db..84c8d911f94 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -559,6 +559,7 @@ Status CsvReader::_create_decompressor() {
             compress_type = CompressType::GZIP;
             break;
         case TFileCompressType::LZO:
+        case TFileCompressType::LZOP:
             compress_type = CompressType::LZOP;
             break;
         case TFileCompressType::BZ2:
diff --git a/build.sh b/build.sh
index 3b819929d00..8363705b555 100755
--- a/build.sh
+++ b/build.sh
@@ -327,9 +327,6 @@ fi
 if [[ -z "${USE_AVX2}" ]]; then
     USE_AVX2='ON'
 fi
-if [[ -z "${WITH_LZO}" ]]; then
-    WITH_LZO='OFF'
-fi
 if [[ -z "${USE_LIBCPP}" ]]; then
     if [[ "$(uname -s)" != 'Darwin' ]]; then
         USE_LIBCPP='OFF'
@@ -425,7 +422,6 @@ echo "Get params:
     PARALLEL                    -- ${PARALLEL}
     CLEAN                       -- ${CLEAN}
     WITH_MYSQL                  -- ${WITH_MYSQL}
-    WITH_LZO                    -- ${WITH_LZO}
     GLIBC_COMPATIBILITY         -- ${GLIBC_COMPATIBILITY}
     USE_AVX2                    -- ${USE_AVX2}
     USE_LIBCPP                  -- ${USE_LIBCPP}
@@ -514,7 +510,6 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then
         -DBUILD_FS_BENCHMARK="${BUILD_FS_BENCHMARK}" \
         ${CMAKE_USE_CCACHE:+${CMAKE_USE_CCACHE}} \
         -DWITH_MYSQL="${WITH_MYSQL}" \
-        -DWITH_LZO="${WITH_LZO}" \
         -DUSE_LIBCPP="${USE_LIBCPP}" \
         -DBUILD_META_TOOL="${BUILD_META_TOOL}" \
         -DBUILD_INDEX_TOOL="${BUILD_INDEX_TOOL}" \
diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out 
b/regression-test/data/load_p0/stream_load/test_compress_type.out
index f76aa4d7415..56d195c569e 100644
--- a/regression-test/data/load_p0/stream_load/test_compress_type.out
+++ b/regression-test/data/load_p0/stream_load/test_compress_type.out
@@ -1,4 +1,4 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
 -- !sql --
-120
+160
 
diff --git 
a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy 
b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
index d1123027bac..eeab7e80975 100644
--- a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_compress_type", "load_p0") {
+suite("test_stream_load_compress_type", "load_p0") {
     def tableName = "basic_data"
 
     sql """ DROP TABLE IF EXISTS ${tableName} """
@@ -153,6 +153,50 @@ suite("test_compress_type", "load_p0") {
         }       
     }
 
+    // LZO = LZOP
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'LZO'
+
+        file "basic_data.csv.lzo"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'LZOP'
+
+        file "basic_data.csv.lzo"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    // no compress_type
     streamLoad {
         table "${tableName}"
         set 'column_separator', '|'
@@ -174,6 +218,7 @@ suite("test_compress_type", "load_p0") {
         }
     }
 
+    // no compress_type
     streamLoad {
         table "${tableName}"
         set 'column_separator', '|'
@@ -195,6 +240,7 @@ suite("test_compress_type", "load_p0") {
         }
     }
 
+    // no compress_type
     streamLoad {
         table "${tableName}"
         set 'column_separator', '|'
@@ -216,6 +262,7 @@ suite("test_compress_type", "load_p0") {
         }
     }
 
+    // no compress_type
     streamLoad {
         table "${tableName}"
         set 'column_separator', '|'
@@ -236,6 +283,7 @@ suite("test_compress_type", "load_p0") {
         }
     }
 
+    // no compress_type
     streamLoad {
         table "${tableName}"
         set 'column_separator', '|'
@@ -256,6 +304,7 @@ suite("test_compress_type", "load_p0") {
         }
     }
 
+    // no compress_type
     streamLoad {
         table "${tableName}"
         set 'column_separator', '|'
@@ -276,5 +325,26 @@ suite("test_compress_type", "load_p0") {
         }
     }
 
+    // no compress_type
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        file "basic_data.csv.lzo"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(23, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(23, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
     qt_sql """ select count(*) from ${tableName} """
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to