This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 103f82f5 minor: remove unused source files (#1202)
103f82f5 is described below

commit 103f82f5491a48d8f0dbcb697741270d7fe268b8
Author: Andy Grove <[email protected]>
AuthorDate: Fri Dec 27 22:54:17 2024 -0700

    minor: remove unused source files (#1202)
---
 native/Cargo.lock                          |  48 +----
 native/core/Cargo.toml                     |   4 -
 native/core/src/parquet/compression.rs     | 319 -----------------------------
 native/core/src/parquet/util/jni_buffer.rs |  98 ---------
 4 files changed, 2 insertions(+), 467 deletions(-)

diff --git a/native/Cargo.lock b/native/Cargo.lock
index 538c40ee..ad572acb 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -428,17 +428,6 @@ dependencies = [
  "generic-array",
 ]
 
-[[package]]
-name = "brotli"
-version = "3.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391"
-dependencies = [
- "alloc-no-stdlib",
- "alloc-stdlib",
- "brotli-decompressor 2.5.1",
-]
-
 [[package]]
 name = "brotli"
 version = "7.0.0"
@@ -447,17 +436,7 @@ checksum = 
"cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
 dependencies = [
  "alloc-no-stdlib",
  "alloc-stdlib",
- "brotli-decompressor 4.0.1",
-]
-
-[[package]]
-name = "brotli-decompressor"
-version = "2.5.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f"
-dependencies = [
- "alloc-no-stdlib",
- "alloc-stdlib",
+ "brotli-decompressor",
 ]
 
 [[package]]
@@ -900,7 +879,6 @@ dependencies = [
  "arrow-schema",
  "assertables",
  "async-trait",
- "brotli 3.5.0",
  "bytes",
  "crc32fast",
  "criterion",
@@ -912,7 +890,6 @@ dependencies = [
  "datafusion-expr",
  "datafusion-functions-nested",
  "datafusion-physical-expr",
- "flate2",
  "futures",
  "hex",
  "itertools 0.11.0",
@@ -920,7 +897,6 @@ dependencies = [
  "lazy_static",
  "log",
  "log4rs",
- "lz4",
  "mimalloc",
  "num",
  "once_cell",
@@ -932,7 +908,6 @@ dependencies = [
  "regex",
  "serde",
  "simd-adler32",
- "snap",
  "tempfile",
  "thiserror",
  "tokio",
@@ -2111,25 +2086,6 @@ dependencies = [
  "winapi",
 ]
 
-[[package]]
-name = "lz4"
-version = "1.28.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725"
-dependencies = [
- "lz4-sys",
-]
-
-[[package]]
-name = "lz4-sys"
-version = "1.11.1+lz4-1.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6"
-dependencies = [
- "cc",
- "libc",
-]
-
 [[package]]
 name = "lz4_flex"
 version = "0.11.3"
@@ -2382,7 +2338,7 @@ dependencies = [
  "arrow-schema",
  "arrow-select",
  "base64",
- "brotli 7.0.0",
+ "brotli",
  "bytes",
  "chrono",
  "flate2",
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 489da46d..5089e67a 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -52,10 +52,6 @@ serde = { version = "1", features = ["derive"] }
 lazy_static = "1.4.0"
 prost = "0.12.1"
 jni = "0.21"
-snap = "1.1"
-brotli = "3.3"
-flate2 = "1.0"
-lz4 = "1.24"
 zstd = "0.11"
 rand = { workspace = true}
 num = { workspace = true }
diff --git a/native/core/src/parquet/compression.rs 
b/native/core/src/parquet/compression.rs
deleted file mode 100644
index 37b857f4..00000000
--- a/native/core/src/parquet/compression.rs
+++ /dev/null
@@ -1,319 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Contains codec interface and supported codec implementations.
-//!
-//! See [`Compression`](crate::basic::Compression) enum for all available 
compression
-//! algorithms.
-//!
-//! # Example
-//!
-//! ```no_run
-//! use comet::parquet::{basic::Compression, compression::create_codec};
-//!
-//! let mut codec = match create_codec(Compression::SNAPPY) {
-//!     Ok(Some(codec)) => codec,
-//!     _ => panic!(),
-//! };
-//!
-//! let data = vec![b'p', b'a', b'r', b'q', b'u', b'e', b't'];
-//! let mut compressed = vec![];
-//! codec.compress(&data[..], &mut compressed).unwrap();
-//!
-//! let mut output = vec![];
-//! codec.decompress(&compressed[..], &mut output).unwrap();
-//!
-//! assert_eq!(output, data);
-//! ```
-
-use super::basic::Compression as CodecType;
-use crate::errors::{ParquetError, ParquetResult as Result};
-
-use brotli::Decompressor;
-use flate2::{read, write, Compression};
-use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder};
-use std::io::{copy, Read, Write};
-
-/// Parquet compression codec interface.
-#[allow(clippy::ptr_arg)]
-pub trait Codec {
-    /// Compresses data stored in slice `input_buf` and writes the compressed 
result
-    /// to `output_buf`.
-    /// Note that you'll need to call `clear()` before reusing the same 
`output_buf`
-    /// across different `compress` calls.
-    fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()>;
-
-    /// Decompresses data stored in slice `input_buf` and writes output to 
`output_buf`.
-    /// Returns the total number of bytes written.
-    fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<usize>;
-}
-
-/// Given the compression type `codec`, returns a codec used to compress and 
decompress
-/// bytes for the compression type.
-/// This returns `None` if the codec type is `UNCOMPRESSED`.
-pub fn create_codec(codec: CodecType) -> Result<Option<Box<dyn Codec>>> {
-    match codec {
-        CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
-        CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
-        CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
-        CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
-        CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
-        CodecType::UNCOMPRESSED => Ok(None),
-        _ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
-    }
-}
-
-/// Codec for Snappy compression format.
-pub struct SnappyCodec {
-    decoder: Decoder,
-    encoder: Encoder,
-}
-
-impl SnappyCodec {
-    /// Creates new Snappy compression codec.
-    pub(crate) fn new() -> Self {
-        Self {
-            decoder: Decoder::new(),
-            encoder: Encoder::new(),
-        }
-    }
-}
-
-impl Codec for SnappyCodec {
-    fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<usize> {
-        let len = decompress_len(input_buf)?;
-        output_buf.resize(len, 0);
-        self.decoder
-            .decompress(input_buf, output_buf)
-            .map_err(|e| e.into())
-    }
-
-    fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()> {
-        let output_buf_len = output_buf.len();
-        let required_len = max_compress_len(input_buf.len());
-        output_buf.resize(output_buf_len + required_len, 0);
-        let n = self
-            .encoder
-            .compress(input_buf, &mut output_buf[output_buf_len..])?;
-        output_buf.truncate(output_buf_len + n);
-        Ok(())
-    }
-}
-
-/// Codec for GZIP compression algorithm.
-pub struct GZipCodec {}
-
-impl GZipCodec {
-    /// Creates new GZIP compression codec.
-    pub(crate) fn new() -> Self {
-        Self {}
-    }
-}
-
-impl Codec for GZipCodec {
-    fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<usize> {
-        let mut decoder = read::GzDecoder::new(input_buf);
-        decoder.read_to_end(output_buf).map_err(|e| e.into())
-    }
-
-    fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()> {
-        let mut encoder = write::GzEncoder::new(output_buf, 
Compression::default());
-        encoder.write_all(input_buf)?;
-        encoder.try_finish().map_err(|e| e.into())
-    }
-}
-
-const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
-const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9
-const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22
-
-/// Codec for Brotli compression algorithm.
-pub struct BrotliCodec {}
-
-impl BrotliCodec {
-    /// Creates new Brotli compression codec.
-    pub(crate) fn new() -> Self {
-        Self {}
-    }
-}
-
-impl Codec for BrotliCodec {
-    fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<usize> {
-        Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
-            .read_to_end(output_buf)
-            .map_err(|e| e.into())
-    }
-
-    fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()> {
-        let mut encoder = brotli::CompressorWriter::new(
-            output_buf,
-            BROTLI_DEFAULT_BUFFER_SIZE,
-            BROTLI_DEFAULT_COMPRESSION_QUALITY,
-            BROTLI_DEFAULT_LG_WINDOW_SIZE,
-        );
-        encoder.write_all(input_buf)?;
-        encoder.flush().map_err(|e| e.into())
-    }
-}
-
-const LZ4_BUFFER_SIZE: usize = 4096;
-
-/// Codec for LZ4 compression algorithm.
-pub struct LZ4Codec {}
-
-impl LZ4Codec {
-    /// Creates new LZ4 compression codec.
-    pub(crate) fn new() -> Self {
-        Self {}
-    }
-}
-
-impl Codec for LZ4Codec {
-    fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<usize> {
-        let mut decoder = lz4::Decoder::new(input_buf)?;
-        let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
-        let mut total_len = 0;
-        loop {
-            let len = decoder.read(&mut buffer)?;
-            if len == 0 {
-                break;
-            }
-            total_len += len;
-            output_buf.write_all(&buffer[0..len])?;
-        }
-        Ok(total_len)
-    }
-
-    fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()> {
-        let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
-        let mut from = 0;
-        loop {
-            let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
-            encoder.write_all(&input_buf[from..to])?;
-            from += LZ4_BUFFER_SIZE;
-            if from >= input_buf.len() {
-                break;
-            }
-        }
-        encoder.finish().1.map_err(|e| e.into())
-    }
-}
-
-/// Codec for Zstandard compression algorithm.
-pub struct ZSTDCodec {}
-
-impl ZSTDCodec {
-    /// Creates new Zstandard compression codec.
-    pub(crate) fn new() -> Self {
-        Self {}
-    }
-}
-
-/// Compression level (1-21) for ZSTD. Choose 1 here for better compression 
speed.
-const ZSTD_COMPRESSION_LEVEL: i32 = 1;
-
-impl Codec for ZSTDCodec {
-    fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<usize> {
-        let mut decoder = zstd::Decoder::new(input_buf)?;
-        match copy(&mut decoder, output_buf) {
-            Ok(n) => Ok(n as usize),
-            Err(e) => Err(e.into()),
-        }
-    }
-
-    fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()> {
-        let mut encoder = zstd::Encoder::new(output_buf, 
ZSTD_COMPRESSION_LEVEL)?;
-        encoder.write_all(input_buf)?;
-        match encoder.finish() {
-            Ok(_) => Ok(()),
-            Err(e) => Err(e.into()),
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    use crate::parquet::util::test_common::*;
-
-    fn test_roundtrip(c: CodecType, data: &[u8]) {
-        let mut c1 = create_codec(c).unwrap().unwrap();
-        let mut c2 = create_codec(c).unwrap().unwrap();
-
-        // Compress with c1
-        let mut compressed = Vec::new();
-        let mut decompressed = Vec::new();
-        c1.compress(data, &mut compressed)
-            .expect("Error when compressing");
-
-        // Decompress with c2
-        let mut decompressed_size = c2
-            .decompress(compressed.as_slice(), &mut decompressed)
-            .expect("Error when decompressing");
-        assert_eq!(data.len(), decompressed_size);
-        decompressed.truncate(decompressed_size);
-        assert_eq!(data, decompressed.as_slice());
-
-        compressed.clear();
-
-        // Compress with c2
-        c2.compress(data, &mut compressed)
-            .expect("Error when compressing");
-
-        // Decompress with c1
-        decompressed_size = c1
-            .decompress(compressed.as_slice(), &mut decompressed)
-            .expect("Error when decompressing");
-        assert_eq!(data.len(), decompressed_size);
-        decompressed.truncate(decompressed_size);
-        assert_eq!(data, decompressed.as_slice());
-    }
-
-    fn test_codec(c: CodecType) {
-        let sizes = vec![100, 10000, 100000];
-        for size in sizes {
-            let data = random_bytes(size);
-            test_roundtrip(c, &data);
-        }
-    }
-
-    #[test]
-    fn test_codec_snappy() {
-        test_codec(CodecType::SNAPPY);
-    }
-
-    #[test]
-    fn test_codec_gzip() {
-        test_codec(CodecType::GZIP);
-    }
-
-    #[test]
-    fn test_codec_brotli() {
-        test_codec(CodecType::BROTLI);
-    }
-
-    #[test]
-    fn test_codec_lz4() {
-        test_codec(CodecType::LZ4);
-    }
-
-    #[test]
-    fn test_codec_zstd() {
-        test_codec(CodecType::ZSTD);
-    }
-}
diff --git a/native/core/src/parquet/util/jni_buffer.rs 
b/native/core/src/parquet/util/jni_buffer.rs
deleted file mode 100644
index 33f36ed9..00000000
--- a/native/core/src/parquet/util/jni_buffer.rs
+++ /dev/null
@@ -1,98 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use core::slice;
-use std::ptr::NonNull;
-
-use jni::{
-    objects::{ReleaseMode, TypeArray},
-    sys::{jbyte, jbyteArray, JNI_TRUE},
-    JavaVM,
-};
-
-use crate::errors::{CometError, CometResult as Result};
-
-use super::Buffer;
-
-/// An immutable byte buffer wrapping a JNI byte array allocated on heap.
-///
-/// Unlike `AutoArray`, this doesn't have a lifetime and can be used across 
different JNI calls.
-pub struct JniBuffer {
-    /// A pointer for the JVM instance, used to obtain byte array elements (via
-    /// `GetByteArrayElements`) and release byte array elements (via 
`ReleaseByteArrayElements`).
-    jvm: JavaVM,
-    /// The original JNI byte array that backs this buffer
-    inner: jbyteArray,
-    /// The raw pointer from the JNI byte array
-    ptr: NonNull<i8>,
-    /// Total number of bytes in the original array (i.e., `inner`).
-    len: usize,
-    /// Whether the JNI byte array is copied or not.
-    is_copy: bool,
-}
-
-impl JniBuffer {
-    pub fn try_new(jvm: JavaVM, array: jbyteArray, len: usize) -> Result<Self> 
{
-        let env = jvm.get_env()?;
-        let mut is_copy = 0xff;
-        let ptr = jbyte::get(&env, array.into(), &mut is_copy)?;
-        let res = Self {
-            jvm,
-            inner: array,
-            ptr: NonNull::new(ptr)
-                .ok_or_else(|| CometError::NullPointer("null byte array 
pointer".to_string()))?,
-            len,
-            is_copy: is_copy == JNI_TRUE,
-        };
-        Ok(res)
-    }
-
-    /// Whether the JNI byte array is copied or not, i.e., whether the JVM 
pinned down the original
-    /// Java byte array, or made a new copy of it.
-    pub fn is_copy(&self) -> bool {
-        self.is_copy
-    }
-}
-
-impl Buffer for JniBuffer {
-    fn len(&self) -> usize {
-        self.len
-    }
-
-    fn data(&self) -> &[u8] {
-        self.as_ref()
-    }
-}
-
-impl AsRef<[u8]> for JniBuffer {
-    fn as_ref(&self) -> &[u8] {
-        unsafe { slice::from_raw_parts(self.ptr.as_ptr() as *mut u8 as *const 
u8, self.len) }
-    }
-}
-
-impl Drop for JniBuffer {
-    fn drop(&mut self) {
-        let env = self.jvm.get_env().unwrap(); // TODO: log error here
-        jbyte::release(
-            &env,
-            self.inner.into(),
-            self.ptr,
-            ReleaseMode::NoCopyBack as i32, // don't copy back since it's 
read-only here
-        )
-        .unwrap();
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to