This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 55909a8  [SessionContext] - Add read_csv/read_parquet/read_avro 
functions to SessionContext (#57)
55909a8 is described below

commit 55909a8d451f6b17e5beed18b544ddf91f143005
Author: Francis Du <[email protected]>
AuthorDate: Thu Oct 13 20:37:08 2022 +0800

    [SessionContext] - Add read_csv/read_parquet/read_avro functions to 
SessionContext (#57)
---
 .github/workflows/test.yaml      |   1 +
 .gitmodules                      |   6 ++
 Cargo.lock                       | 117 ++++++++++++++++++++++++++++++++++++---
 Cargo.toml                       |   2 +-
 datafusion/tests/test_context.py |  15 +++++
 parquet                          |   1 +
 src/context.rs                   |  97 +++++++++++++++++++++++++++++++-
 testing                          |   1 +
 8 files changed, 230 insertions(+), 10 deletions(-)

diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index a91fbd0..5280310 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -101,6 +101,7 @@ jobs:
 
       - name: Run tests
         run: |
+          git submodule update --init
           source venv/bin/activate
           maturin develop --locked
           RUST_BACKTRACE=1 pytest -v .
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..a3b1b51
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,6 @@
+[submodule "testing"]
+       path = testing
+       url = https://github.com/apache/arrow-testing.git
+[submodule "parquet"]
+       path = parquet
+       url = https://github.com/apache/parquet-testing.git
diff --git a/Cargo.lock b/Cargo.lock
index 41cdd3a..7c45586 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8,6 +8,12 @@ version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
 
+[[package]]
+name = "adler32"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
+
 [[package]]
 name = "ahash"
 version = "0.7.6"
@@ -56,6 +62,33 @@ dependencies = [
  "alloc-no-stdlib",
 ]
 
+[[package]]
+name = "apache-avro"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8cf4144857f9e4d7dd6cc4ba4c78efd2a46bad682b029bd0d91e76a021af1b2a"
+dependencies = [
+ "byteorder",
+ "crc32fast",
+ "digest",
+ "lazy_static",
+ "libflate",
+ "log",
+ "num-bigint",
+ "quad-rand",
+ "rand 0.8.5",
+ "regex",
+ "serde",
+ "serde_json",
+ "snap",
+ "strum",
+ "strum_macros",
+ "thiserror",
+ "typed-builder",
+ "uuid 1.1.2",
+ "zerocopy",
+]
+
 [[package]]
 name = "arrayref"
 version = "0.3.6"
@@ -329,6 +362,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "2aca80caa2b0f7fdf267799b8895ac8b6341ea879db6b1e2d361ec49b47bc676"
 dependencies = [
  "ahash 0.8.0",
+ "apache-avro",
  "arrow",
  "async-trait",
  "bytes",
@@ -345,6 +379,7 @@ dependencies = [
  "itertools",
  "lazy_static",
  "log",
+ "num-traits",
  "num_cpus",
  "object_store",
  "ordered-float 3.0.0",
@@ -369,6 +404,7 @@ version = "12.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7721fd550f6a28ad7235b62462aa51e9a43b08f8346d5cbe4d61f1e83f5df511"
 dependencies = [
+ "apache-avro",
  "arrow",
  "object_store",
  "ordered-float 3.0.0",
@@ -842,6 +878,26 @@ version = "0.2.126"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836"
 
+[[package]]
+name = "libflate"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093"
+dependencies = [
+ "adler32",
+ "crc32fast",
+ "libflate_lz77",
+]
+
+[[package]]
+name = "libflate_lz77"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a"
+dependencies = [
+ "rle-decode-fast",
+]
+
 [[package]]
 name = "libmimalloc-sys"
 version = "0.1.25"
@@ -1252,6 +1308,12 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "quad-rand"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88"
+
 [[package]]
 name = "quote"
 version = "1.0.18"
@@ -1343,9 +1405,9 @@ dependencies = [
 
 [[package]]
 name = "regex"
-version = "1.5.6"
+version = "1.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1"
+checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -1373,6 +1435,12 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "rle-decode-fast"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422"
+
 [[package]]
 name = "rustversion"
 version = "1.0.6"
@@ -1408,18 +1476,18 @@ checksum = 
"0772c5c30e1a0d91f6834f8e545c69281c099dfa9a3ac58d96a9fd629c8d4898"
 
 [[package]]
 name = "serde"
-version = "1.0.137"
+version = "1.0.145"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1"
+checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.137"
+version = "1.0.145"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be"
+checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1428,9 +1496,9 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.81"
+version = "1.0.86"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c"
+checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074"
 dependencies = [
  "itoa 1.0.2",
  "ryu",
@@ -1694,6 +1762,17 @@ dependencies = [
  "once_cell",
 ]
 
+[[package]]
+name = "typed-builder"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "89851716b67b937e393b3daa8423e67ddfc4bbbf1654bcf05488e95e0828db0c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "typenum"
 version = "1.15.0"
@@ -1767,6 +1846,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
 dependencies = [
  "getrandom 0.2.7",
+ "serde",
 ]
 
 [[package]]
@@ -1872,6 +1952,27 @@ version = "0.36.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
 
+[[package]]
+name = "zerocopy"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
+dependencies = [
+ "byteorder",
+ "zerocopy-derive",
+]
+
+[[package]]
+name = "zerocopy-derive"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "zstd"
 version = "0.11.2+zstd.1.5.2"
diff --git a/Cargo.toml b/Cargo.toml
index 217ac1c..a1b8bf4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,7 +34,7 @@ default = ["mimalloc"]
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
 rand = "0.7"
 pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", 
"abi3-py37"] }
-datafusion = { version = "^12.0.0", features = ["pyarrow"] }
+datafusion = { version = "^12.0.0", features = ["pyarrow", "avro"] }
 datafusion-expr = { version = "^12.0.0" }
 datafusion-common = { version = "^12.0.0", features = ["pyarrow"] }
 uuid = { version = "0.8", features = ["v4"] }
diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py
index 324bbec..19b9d0e 100644
--- a/datafusion/tests/test_context.py
+++ b/datafusion/tests/test_context.py
@@ -179,3 +179,18 @@ def test_table_exist(ctx):
     ctx.register_dataset("t", dataset)
 
     assert ctx.table_exist("t") is True
+
+
+def test_read_csv(ctx):
+    csv_df = ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv")
+    csv_df.select(column("c1")).show()
+
+
+def test_read_parquet(ctx):
+    csv_df = ctx.read_parquet(path="parquet/data/alltypes_plain.parquet")
+    csv_df.show()
+
+
+def test_read_avro(ctx):
+    csv_df = ctx.read_avro(path="testing/data/avro/alltypes_plain.avro")
+    csv_df.show()
diff --git a/parquet b/parquet
new file mode 160000
index 0000000..e13af11
--- /dev/null
+++ b/parquet
@@ -0,0 +1 @@
+Subproject commit e13af117de7c4f0a4d9908ae3827b3ab119868f3
diff --git a/src/context.rs b/src/context.rs
index 25d08ef..4b8f930 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -28,7 +28,7 @@ use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::datasource::datasource::TableProvider;
 use datafusion::datasource::MemTable;
 use datafusion::execution::context::{SessionConfig, SessionContext};
-use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
+use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions};
 
 use crate::catalog::{PyCatalog, PyTable};
 use crate::dataframe::PyDataFrame;
@@ -264,4 +264,99 @@ impl PySessionContext {
     fn session_id(&self) -> PyResult<String> {
         Ok(self.ctx.session_id())
     }
+
+    #[allow(clippy::too_many_arguments)]
+    #[args(
+        schema = "None",
+        has_header = "true",
+        delimiter = "\",\"",
+        schema_infer_max_records = "1000",
+        file_extension = "\".csv\"",
+        table_partition_cols = "vec![]"
+    )]
+    fn read_csv(
+        &self,
+        path: PathBuf,
+        schema: Option<Schema>,
+        has_header: bool,
+        delimiter: &str,
+        schema_infer_max_records: usize,
+        file_extension: &str,
+        table_partition_cols: Vec<String>,
+        py: Python,
+    ) -> PyResult<PyDataFrame> {
+        let path = path
+            .to_str()
+            .ok_or_else(|| PyValueError::new_err("Unable to convert path to a 
string"))?;
+
+        let delimiter = delimiter.as_bytes();
+        if delimiter.len() != 1 {
+            return Err(PyValueError::new_err(
+                "Delimiter must be a single character",
+            ));
+        };
+
+        let mut options = CsvReadOptions::new()
+            .has_header(has_header)
+            .delimiter(delimiter[0])
+            .schema_infer_max_records(schema_infer_max_records)
+            .file_extension(file_extension)
+            .table_partition_cols(table_partition_cols);
+        options.schema = schema.as_ref();
+
+        let result = self.ctx.read_csv(path, options);
+        let df = PyDataFrame::new(wait_for_future(py, 
result).map_err(DataFusionError::from)?);
+
+        Ok(df)
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    #[args(
+        parquet_pruning = "true",
+        file_extension = "\".parquet\"",
+        table_partition_cols = "vec![]",
+        skip_metadata = "true"
+    )]
+    fn read_parquet(
+        &self,
+        path: &str,
+        table_partition_cols: Vec<String>,
+        parquet_pruning: bool,
+        file_extension: &str,
+        skip_metadata: bool,
+        py: Python,
+    ) -> PyResult<PyDataFrame> {
+        let mut options = ParquetReadOptions::default()
+            .table_partition_cols(table_partition_cols)
+            .parquet_pruning(parquet_pruning)
+            .skip_metadata(skip_metadata);
+        options.file_extension = file_extension;
+
+        let result = self.ctx.read_parquet(path, options);
+        let df = PyDataFrame::new(wait_for_future(py, 
result).map_err(DataFusionError::from)?);
+        Ok(df)
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    #[args(
+        schema = "None",
+        file_extension = "\".avro\"",
+        table_partition_cols = "vec![]"
+    )]
+    fn read_avro(
+        &self,
+        path: &str,
+        schema: Option<Schema>,
+        table_partition_cols: Vec<String>,
+        file_extension: &str,
+        py: Python,
+    ) -> PyResult<PyDataFrame> {
+        let mut options = 
AvroReadOptions::default().table_partition_cols(table_partition_cols);
+        options.file_extension = file_extension;
+        options.schema = schema.map(Arc::new);
+
+        let result = self.ctx.read_avro(path, options);
+        let df = PyDataFrame::new(wait_for_future(py, 
result).map_err(DataFusionError::from)?);
+        Ok(df)
+    }
 }
diff --git a/testing b/testing
new file mode 160000
index 0000000..5bab2f2
--- /dev/null
+++ b/testing
@@ -0,0 +1 @@
+Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88

Reply via email to