This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 103f82f5 minor: remove unused source files (#1202)
103f82f5 is described below
commit 103f82f5491a48d8f0dbcb697741270d7fe268b8
Author: Andy Grove <[email protected]>
AuthorDate: Fri Dec 27 22:54:17 2024 -0700
minor: remove unused source files (#1202)
---
native/Cargo.lock | 48 +----
native/core/Cargo.toml | 4 -
native/core/src/parquet/compression.rs | 319 -----------------------------
native/core/src/parquet/util/jni_buffer.rs | 98 ---------
4 files changed, 2 insertions(+), 467 deletions(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 538c40ee..ad572acb 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -428,17 +428,6 @@ dependencies = [
"generic-array",
]
-[[package]]
-name = "brotli"
-version = "3.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391"
-dependencies = [
- "alloc-no-stdlib",
- "alloc-stdlib",
- "brotli-decompressor 2.5.1",
-]
-
[[package]]
name = "brotli"
version = "7.0.0"
@@ -447,17 +436,7 @@ checksum =
"cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
- "brotli-decompressor 4.0.1",
-]
-
-[[package]]
-name = "brotli-decompressor"
-version = "2.5.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f"
-dependencies = [
- "alloc-no-stdlib",
- "alloc-stdlib",
+ "brotli-decompressor",
]
[[package]]
@@ -900,7 +879,6 @@ dependencies = [
"arrow-schema",
"assertables",
"async-trait",
- "brotli 3.5.0",
"bytes",
"crc32fast",
"criterion",
@@ -912,7 +890,6 @@ dependencies = [
"datafusion-expr",
"datafusion-functions-nested",
"datafusion-physical-expr",
- "flate2",
"futures",
"hex",
"itertools 0.11.0",
@@ -920,7 +897,6 @@ dependencies = [
"lazy_static",
"log",
"log4rs",
- "lz4",
"mimalloc",
"num",
"once_cell",
@@ -932,7 +908,6 @@ dependencies = [
"regex",
"serde",
"simd-adler32",
- "snap",
"tempfile",
"thiserror",
"tokio",
@@ -2111,25 +2086,6 @@ dependencies = [
"winapi",
]
-[[package]]
-name = "lz4"
-version = "1.28.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725"
-dependencies = [
- "lz4-sys",
-]
-
-[[package]]
-name = "lz4-sys"
-version = "1.11.1+lz4-1.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6"
-dependencies = [
- "cc",
- "libc",
-]
-
[[package]]
name = "lz4_flex"
version = "0.11.3"
@@ -2382,7 +2338,7 @@ dependencies = [
"arrow-schema",
"arrow-select",
"base64",
- "brotli 7.0.0",
+ "brotli",
"bytes",
"chrono",
"flate2",
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 489da46d..5089e67a 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -52,10 +52,6 @@ serde = { version = "1", features = ["derive"] }
lazy_static = "1.4.0"
prost = "0.12.1"
jni = "0.21"
-snap = "1.1"
-brotli = "3.3"
-flate2 = "1.0"
-lz4 = "1.24"
zstd = "0.11"
rand = { workspace = true}
num = { workspace = true }
diff --git a/native/core/src/parquet/compression.rs
b/native/core/src/parquet/compression.rs
deleted file mode 100644
index 37b857f4..00000000
--- a/native/core/src/parquet/compression.rs
+++ /dev/null
@@ -1,319 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Contains codec interface and supported codec implementations.
-//!
-//! See [`Compression`](crate::basic::Compression) enum for all available
compression
-//! algorithms.
-//!
-//! # Example
-//!
-//! ```no_run
-//! use comet::parquet::{basic::Compression, compression::create_codec};
-//!
-//! let mut codec = match create_codec(Compression::SNAPPY) {
-//! Ok(Some(codec)) => codec,
-//! _ => panic!(),
-//! };
-//!
-//! let data = vec![b'p', b'a', b'r', b'q', b'u', b'e', b't'];
-//! let mut compressed = vec![];
-//! codec.compress(&data[..], &mut compressed).unwrap();
-//!
-//! let mut output = vec![];
-//! codec.decompress(&compressed[..], &mut output).unwrap();
-//!
-//! assert_eq!(output, data);
-//! ```
-
-use super::basic::Compression as CodecType;
-use crate::errors::{ParquetError, ParquetResult as Result};
-
-use brotli::Decompressor;
-use flate2::{read, write, Compression};
-use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder};
-use std::io::{copy, Read, Write};
-
-/// Parquet compression codec interface.
-#[allow(clippy::ptr_arg)]
-pub trait Codec {
- /// Compresses data stored in slice `input_buf` and writes the compressed
result
- /// to `output_buf`.
- /// Note that you'll need to call `clear()` before reusing the same
`output_buf`
- /// across different `compress` calls.
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()>;
-
- /// Decompresses data stored in slice `input_buf` and writes output to
`output_buf`.
- /// Returns the total number of bytes written.
- fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<usize>;
-}
-
-/// Given the compression type `codec`, returns a codec used to compress and
decompress
-/// bytes for the compression type.
-/// This returns `None` if the codec type is `UNCOMPRESSED`.
-pub fn create_codec(codec: CodecType) -> Result<Option<Box<dyn Codec>>> {
- match codec {
- CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
- CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
- CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
- CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
- CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
- CodecType::UNCOMPRESSED => Ok(None),
- _ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
- }
-}
-
-/// Codec for Snappy compression format.
-pub struct SnappyCodec {
- decoder: Decoder,
- encoder: Encoder,
-}
-
-impl SnappyCodec {
- /// Creates new Snappy compression codec.
- pub(crate) fn new() -> Self {
- Self {
- decoder: Decoder::new(),
- encoder: Encoder::new(),
- }
- }
-}
-
-impl Codec for SnappyCodec {
- fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<usize> {
- let len = decompress_len(input_buf)?;
- output_buf.resize(len, 0);
- self.decoder
- .decompress(input_buf, output_buf)
- .map_err(|e| e.into())
- }
-
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let output_buf_len = output_buf.len();
- let required_len = max_compress_len(input_buf.len());
- output_buf.resize(output_buf_len + required_len, 0);
- let n = self
- .encoder
- .compress(input_buf, &mut output_buf[output_buf_len..])?;
- output_buf.truncate(output_buf_len + n);
- Ok(())
- }
-}
-
-/// Codec for GZIP compression algorithm.
-pub struct GZipCodec {}
-
-impl GZipCodec {
- /// Creates new GZIP compression codec.
- pub(crate) fn new() -> Self {
- Self {}
- }
-}
-
-impl Codec for GZipCodec {
- fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<usize> {
- let mut decoder = read::GzDecoder::new(input_buf);
- decoder.read_to_end(output_buf).map_err(|e| e.into())
- }
-
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = write::GzEncoder::new(output_buf,
Compression::default());
- encoder.write_all(input_buf)?;
- encoder.try_finish().map_err(|e| e.into())
- }
-}
-
-const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
-const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9
-const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22
-
-/// Codec for Brotli compression algorithm.
-pub struct BrotliCodec {}
-
-impl BrotliCodec {
- /// Creates new Brotli compression codec.
- pub(crate) fn new() -> Self {
- Self {}
- }
-}
-
-impl Codec for BrotliCodec {
- fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<usize> {
- Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
- .read_to_end(output_buf)
- .map_err(|e| e.into())
- }
-
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = brotli::CompressorWriter::new(
- output_buf,
- BROTLI_DEFAULT_BUFFER_SIZE,
- BROTLI_DEFAULT_COMPRESSION_QUALITY,
- BROTLI_DEFAULT_LG_WINDOW_SIZE,
- );
- encoder.write_all(input_buf)?;
- encoder.flush().map_err(|e| e.into())
- }
-}
-
-const LZ4_BUFFER_SIZE: usize = 4096;
-
-/// Codec for LZ4 compression algorithm.
-pub struct LZ4Codec {}
-
-impl LZ4Codec {
- /// Creates new LZ4 compression codec.
- pub(crate) fn new() -> Self {
- Self {}
- }
-}
-
-impl Codec for LZ4Codec {
- fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<usize> {
- let mut decoder = lz4::Decoder::new(input_buf)?;
- let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
- let mut total_len = 0;
- loop {
- let len = decoder.read(&mut buffer)?;
- if len == 0 {
- break;
- }
- total_len += len;
- output_buf.write_all(&buffer[0..len])?;
- }
- Ok(total_len)
- }
-
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
- let mut from = 0;
- loop {
- let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
- encoder.write_all(&input_buf[from..to])?;
- from += LZ4_BUFFER_SIZE;
- if from >= input_buf.len() {
- break;
- }
- }
- encoder.finish().1.map_err(|e| e.into())
- }
-}
-
-/// Codec for Zstandard compression algorithm.
-pub struct ZSTDCodec {}
-
-impl ZSTDCodec {
- /// Creates new Zstandard compression codec.
- pub(crate) fn new() -> Self {
- Self {}
- }
-}
-
-/// Compression level (1-21) for ZSTD. Choose 1 here for better compression
speed.
-const ZSTD_COMPRESSION_LEVEL: i32 = 1;
-
-impl Codec for ZSTDCodec {
- fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<usize> {
- let mut decoder = zstd::Decoder::new(input_buf)?;
- match copy(&mut decoder, output_buf) {
- Ok(n) => Ok(n as usize),
- Err(e) => Err(e.into()),
- }
- }
-
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = zstd::Encoder::new(output_buf,
ZSTD_COMPRESSION_LEVEL)?;
- encoder.write_all(input_buf)?;
- match encoder.finish() {
- Ok(_) => Ok(()),
- Err(e) => Err(e.into()),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use crate::parquet::util::test_common::*;
-
- fn test_roundtrip(c: CodecType, data: &[u8]) {
- let mut c1 = create_codec(c).unwrap().unwrap();
- let mut c2 = create_codec(c).unwrap().unwrap();
-
- // Compress with c1
- let mut compressed = Vec::new();
- let mut decompressed = Vec::new();
- c1.compress(data, &mut compressed)
- .expect("Error when compressing");
-
- // Decompress with c2
- let mut decompressed_size = c2
- .decompress(compressed.as_slice(), &mut decompressed)
- .expect("Error when decompressing");
- assert_eq!(data.len(), decompressed_size);
- decompressed.truncate(decompressed_size);
- assert_eq!(data, decompressed.as_slice());
-
- compressed.clear();
-
- // Compress with c2
- c2.compress(data, &mut compressed)
- .expect("Error when compressing");
-
- // Decompress with c1
- decompressed_size = c1
- .decompress(compressed.as_slice(), &mut decompressed)
- .expect("Error when decompressing");
- assert_eq!(data.len(), decompressed_size);
- decompressed.truncate(decompressed_size);
- assert_eq!(data, decompressed.as_slice());
- }
-
- fn test_codec(c: CodecType) {
- let sizes = vec![100, 10000, 100000];
- for size in sizes {
- let data = random_bytes(size);
- test_roundtrip(c, &data);
- }
- }
-
- #[test]
- fn test_codec_snappy() {
- test_codec(CodecType::SNAPPY);
- }
-
- #[test]
- fn test_codec_gzip() {
- test_codec(CodecType::GZIP);
- }
-
- #[test]
- fn test_codec_brotli() {
- test_codec(CodecType::BROTLI);
- }
-
- #[test]
- fn test_codec_lz4() {
- test_codec(CodecType::LZ4);
- }
-
- #[test]
- fn test_codec_zstd() {
- test_codec(CodecType::ZSTD);
- }
-}
diff --git a/native/core/src/parquet/util/jni_buffer.rs
b/native/core/src/parquet/util/jni_buffer.rs
deleted file mode 100644
index 33f36ed9..00000000
--- a/native/core/src/parquet/util/jni_buffer.rs
+++ /dev/null
@@ -1,98 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use core::slice;
-use std::ptr::NonNull;
-
-use jni::{
- objects::{ReleaseMode, TypeArray},
- sys::{jbyte, jbyteArray, JNI_TRUE},
- JavaVM,
-};
-
-use crate::errors::{CometError, CometResult as Result};
-
-use super::Buffer;
-
-/// An immutable byte buffer wrapping a JNI byte array allocated on heap.
-///
-/// Unlike `AutoArray`, this doesn't have a lifetime and can be used across
different JNI calls.
-pub struct JniBuffer {
- /// A pointer for the JVM instance, used to obtain byte array elements (via
- /// `GetByteArrayElements`) and release byte array elements (via
`ReleaseByteArrayElements`).
- jvm: JavaVM,
- /// The original JNI byte array that backs this buffer
- inner: jbyteArray,
- /// The raw pointer from the JNI byte array
- ptr: NonNull<i8>,
- /// Total number of bytes in the original array (i.e., `inner`).
- len: usize,
- /// Whether the JNI byte array is copied or not.
- is_copy: bool,
-}
-
-impl JniBuffer {
- pub fn try_new(jvm: JavaVM, array: jbyteArray, len: usize) -> Result<Self>
{
- let env = jvm.get_env()?;
- let mut is_copy = 0xff;
- let ptr = jbyte::get(&env, array.into(), &mut is_copy)?;
- let res = Self {
- jvm,
- inner: array,
- ptr: NonNull::new(ptr)
- .ok_or_else(|| CometError::NullPointer("null byte array
pointer".to_string()))?,
- len,
- is_copy: is_copy == JNI_TRUE,
- };
- Ok(res)
- }
-
- /// Whether the JNI byte array is copied or not, i.e., whether the JVM
pinned down the original
- /// Java byte array, or made a new copy of it.
- pub fn is_copy(&self) -> bool {
- self.is_copy
- }
-}
-
-impl Buffer for JniBuffer {
- fn len(&self) -> usize {
- self.len
- }
-
- fn data(&self) -> &[u8] {
- self.as_ref()
- }
-}
-
-impl AsRef<[u8]> for JniBuffer {
- fn as_ref(&self) -> &[u8] {
- unsafe { slice::from_raw_parts(self.ptr.as_ptr() as *mut u8 as *const
u8, self.len) }
- }
-}
-
-impl Drop for JniBuffer {
- fn drop(&mut self) {
- let env = self.jvm.get_env().unwrap(); // TODO: log error here
- jbyte::release(
- &env,
- self.inner.into(),
- self.ptr,
- ReleaseMode::NoCopyBack as i32, // don't copy back since it's
read-only here
- )
- .unwrap();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]