This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/master by this push:
new 55909a8 [SessionContext] - Add read_csv/read_parquet/read_avro
functions to SessionContext (#57)
55909a8 is described below
commit 55909a8d451f6b17e5beed18b544ddf91f143005
Author: Francis Du <[email protected]>
AuthorDate: Thu Oct 13 20:37:08 2022 +0800
[SessionContext] - Add read_csv/read_parquet/read_avro functions to
SessionContext (#57)
---
.github/workflows/test.yaml | 1 +
.gitmodules | 6 ++
Cargo.lock | 117 ++++++++++++++++++++++++++++++++++++---
Cargo.toml | 2 +-
datafusion/tests/test_context.py | 15 +++++
parquet | 1 +
src/context.rs | 97 +++++++++++++++++++++++++++++++-
testing | 1 +
8 files changed, 230 insertions(+), 10 deletions(-)
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index a91fbd0..5280310 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -101,6 +101,7 @@ jobs:
- name: Run tests
run: |
+ git submodule update --init
source venv/bin/activate
maturin develop --locked
RUST_BACKTRACE=1 pytest -v .
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..a3b1b51
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,6 @@
+[submodule "testing"]
+ path = testing
+ url = https://github.com/apache/arrow-testing.git
+[submodule "parquet"]
+ path = parquet
+ url = https://github.com/apache/parquet-testing.git
diff --git a/Cargo.lock b/Cargo.lock
index 41cdd3a..7c45586 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8,6 +8,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
+[[package]]
+name = "adler32"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
+
[[package]]
name = "ahash"
version = "0.7.6"
@@ -56,6 +62,33 @@ dependencies = [
"alloc-no-stdlib",
]
+[[package]]
+name = "apache-avro"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8cf4144857f9e4d7dd6cc4ba4c78efd2a46bad682b029bd0d91e76a021af1b2a"
+dependencies = [
+ "byteorder",
+ "crc32fast",
+ "digest",
+ "lazy_static",
+ "libflate",
+ "log",
+ "num-bigint",
+ "quad-rand",
+ "rand 0.8.5",
+ "regex",
+ "serde",
+ "serde_json",
+ "snap",
+ "strum",
+ "strum_macros",
+ "thiserror",
+ "typed-builder",
+ "uuid 1.1.2",
+ "zerocopy",
+]
+
[[package]]
name = "arrayref"
version = "0.3.6"
@@ -329,6 +362,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "2aca80caa2b0f7fdf267799b8895ac8b6341ea879db6b1e2d361ec49b47bc676"
dependencies = [
"ahash 0.8.0",
+ "apache-avro",
"arrow",
"async-trait",
"bytes",
@@ -345,6 +379,7 @@ dependencies = [
"itertools",
"lazy_static",
"log",
+ "num-traits",
"num_cpus",
"object_store",
"ordered-float 3.0.0",
@@ -369,6 +404,7 @@ version = "12.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7721fd550f6a28ad7235b62462aa51e9a43b08f8346d5cbe4d61f1e83f5df511"
dependencies = [
+ "apache-avro",
"arrow",
"object_store",
"ordered-float 3.0.0",
@@ -842,6 +878,26 @@ version = "0.2.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836"
+[[package]]
+name = "libflate"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093"
+dependencies = [
+ "adler32",
+ "crc32fast",
+ "libflate_lz77",
+]
+
+[[package]]
+name = "libflate_lz77"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a"
+dependencies = [
+ "rle-decode-fast",
+]
+
[[package]]
name = "libmimalloc-sys"
version = "0.1.25"
@@ -1252,6 +1308,12 @@ dependencies = [
"syn",
]
+[[package]]
+name = "quad-rand"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88"
+
[[package]]
name = "quote"
version = "1.0.18"
@@ -1343,9 +1405,9 @@ dependencies = [
[[package]]
name = "regex"
-version = "1.5.6"
+version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1"
+checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b"
dependencies = [
"aho-corasick",
"memchr",
@@ -1373,6 +1435,12 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "rle-decode-fast"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422"
+
[[package]]
name = "rustversion"
version = "1.0.6"
@@ -1408,18 +1476,18 @@ checksum =
"0772c5c30e1a0d91f6834f8e545c69281c099dfa9a3ac58d96a9fd629c8d4898"
[[package]]
name = "serde"
-version = "1.0.137"
+version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1"
+checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
-version = "1.0.137"
+version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be"
+checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c"
dependencies = [
"proc-macro2",
"quote",
@@ -1428,9 +1496,9 @@ dependencies = [
[[package]]
name = "serde_json"
-version = "1.0.81"
+version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c"
+checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074"
dependencies = [
"itoa 1.0.2",
"ryu",
@@ -1694,6 +1762,17 @@ dependencies = [
"once_cell",
]
+[[package]]
+name = "typed-builder"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "89851716b67b937e393b3daa8423e67ddfc4bbbf1654bcf05488e95e0828db0c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "typenum"
version = "1.15.0"
@@ -1767,6 +1846,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
dependencies = [
"getrandom 0.2.7",
+ "serde",
]
[[package]]
@@ -1872,6 +1952,27 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
+[[package]]
+name = "zerocopy"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
+dependencies = [
+ "byteorder",
+ "zerocopy-derive",
+]
+
+[[package]]
+name = "zerocopy-derive"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"
diff --git a/Cargo.toml b/Cargo.toml
index 217ac1c..a1b8bf4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,7 +34,7 @@ default = ["mimalloc"]
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
rand = "0.7"
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3",
"abi3-py37"] }
-datafusion = { version = "^12.0.0", features = ["pyarrow"] }
+datafusion = { version = "^12.0.0", features = ["pyarrow", "avro"] }
datafusion-expr = { version = "^12.0.0" }
datafusion-common = { version = "^12.0.0", features = ["pyarrow"] }
uuid = { version = "0.8", features = ["v4"] }
diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py
index 324bbec..19b9d0e 100644
--- a/datafusion/tests/test_context.py
+++ b/datafusion/tests/test_context.py
@@ -179,3 +179,18 @@ def test_table_exist(ctx):
ctx.register_dataset("t", dataset)
assert ctx.table_exist("t") is True
+
+
+def test_read_csv(ctx):
+ csv_df = ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv")
+ csv_df.select(column("c1")).show()
+
+
+def test_read_parquet(ctx):
+ csv_df = ctx.read_parquet(path="parquet/data/alltypes_plain.parquet")
+ csv_df.show()
+
+
+def test_read_avro(ctx):
+ csv_df = ctx.read_avro(path="testing/data/avro/alltypes_plain.avro")
+ csv_df.show()
diff --git a/parquet b/parquet
new file mode 160000
index 0000000..e13af11
--- /dev/null
+++ b/parquet
@@ -0,0 +1 @@
+Subproject commit e13af117de7c4f0a4d9908ae3827b3ab119868f3
diff --git a/src/context.rs b/src/context.rs
index 25d08ef..4b8f930 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -28,7 +28,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::datasource::TableProvider;
use datafusion::datasource::MemTable;
use datafusion::execution::context::{SessionConfig, SessionContext};
-use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
+use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions};
use crate::catalog::{PyCatalog, PyTable};
use crate::dataframe::PyDataFrame;
@@ -264,4 +264,99 @@ impl PySessionContext {
fn session_id(&self) -> PyResult<String> {
Ok(self.ctx.session_id())
}
+
+ #[allow(clippy::too_many_arguments)]
+ #[args(
+ schema = "None",
+ has_header = "true",
+ delimiter = "\",\"",
+ schema_infer_max_records = "1000",
+ file_extension = "\".csv\"",
+ table_partition_cols = "vec![]"
+ )]
+ fn read_csv(
+ &self,
+ path: PathBuf,
+ schema: Option<Schema>,
+ has_header: bool,
+ delimiter: &str,
+ schema_infer_max_records: usize,
+ file_extension: &str,
+ table_partition_cols: Vec<String>,
+ py: Python,
+ ) -> PyResult<PyDataFrame> {
+ let path = path
+ .to_str()
+ .ok_or_else(|| PyValueError::new_err("Unable to convert path to a
string"))?;
+
+ let delimiter = delimiter.as_bytes();
+ if delimiter.len() != 1 {
+ return Err(PyValueError::new_err(
+ "Delimiter must be a single character",
+ ));
+ };
+
+ let mut options = CsvReadOptions::new()
+ .has_header(has_header)
+ .delimiter(delimiter[0])
+ .schema_infer_max_records(schema_infer_max_records)
+ .file_extension(file_extension)
+ .table_partition_cols(table_partition_cols);
+ options.schema = schema.as_ref();
+
+ let result = self.ctx.read_csv(path, options);
+ let df = PyDataFrame::new(wait_for_future(py,
result).map_err(DataFusionError::from)?);
+
+ Ok(df)
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ #[args(
+ parquet_pruning = "true",
+ file_extension = "\".parquet\"",
+ table_partition_cols = "vec![]",
+ skip_metadata = "true"
+ )]
+ fn read_parquet(
+ &self,
+ path: &str,
+ table_partition_cols: Vec<String>,
+ parquet_pruning: bool,
+ file_extension: &str,
+ skip_metadata: bool,
+ py: Python,
+ ) -> PyResult<PyDataFrame> {
+ let mut options = ParquetReadOptions::default()
+ .table_partition_cols(table_partition_cols)
+ .parquet_pruning(parquet_pruning)
+ .skip_metadata(skip_metadata);
+ options.file_extension = file_extension;
+
+ let result = self.ctx.read_parquet(path, options);
+ let df = PyDataFrame::new(wait_for_future(py,
result).map_err(DataFusionError::from)?);
+ Ok(df)
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ #[args(
+ schema = "None",
+ file_extension = "\".avro\"",
+ table_partition_cols = "vec![]"
+ )]
+ fn read_avro(
+ &self,
+ path: &str,
+ schema: Option<Schema>,
+ table_partition_cols: Vec<String>,
+ file_extension: &str,
+ py: Python,
+ ) -> PyResult<PyDataFrame> {
+ let mut options =
AvroReadOptions::default().table_partition_cols(table_partition_cols);
+ options.file_extension = file_extension;
+ options.schema = schema.map(Arc::new);
+
+ let result = self.ctx.read_avro(path, options);
+ let df = PyDataFrame::new(wait_for_future(py,
result).map_err(DataFusionError::from)?);
+ Ok(df)
+ }
}
diff --git a/testing b/testing
new file mode 160000
index 0000000..5bab2f2
--- /dev/null
+++ b/testing
@@ -0,0 +1 @@
+Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88