This is an automated email from the ASF dual-hosted git repository.
houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new bfb3e6a Add support to perform sql query on in-memory datasource.
(#981)
bfb3e6a is described below
commit bfb3e6a8b2851473f749edaa882b5131cfd50ee0
Author: mmuru <[email protected]>
AuthorDate: Sun Sep 12 22:25:55 2021 -0700
Add support to perform sql query on in-memory datasource. (#981)
Co-authored-by: Muru Muthusamy <[email protected]>
---
python/src/context.rs | 23 +++++++++++++++++++++++
python/tests/test_df_sql.py | 43 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 66 insertions(+)
diff --git a/python/src/context.rs b/python/src/context.rs
index 9acc14a..f57d754 100644
--- a/python/src/context.rs
+++ b/python/src/context.rs
@@ -95,6 +95,29 @@ impl ExecutionContext {
))
}
+ fn register_record_batches(
+ &mut self,
+ name: &str,
+ partitions: Vec<Vec<PyObject>>,
+ py: Python,
+ ) -> PyResult<()> {
+ let partitions: Vec<Vec<RecordBatch>> = partitions
+ .iter()
+ .map(|batches| {
+ batches
+ .iter()
+ .map(|batch| to_rust::to_rust_batch(batch.as_ref(py)))
+ .collect()
+ })
+ .collect::<PyResult<_>>()?;
+
+ let table =
+ errors::wrap(MemTable::try_new(partitions[0][0].schema(),
partitions))?;
+
+ errors::wrap(self.ctx.register_table(&*name, Arc::new(table)))?;
+ Ok(())
+ }
+
fn register_parquet(&mut self, name: &str, path: &str) -> PyResult<()> {
errors::wrap(self.ctx.register_parquet(name, path))?;
Ok(())
diff --git a/python/tests/test_df_sql.py b/python/tests/test_df_sql.py
new file mode 100644
index 0000000..17a7645
--- /dev/null
+++ b/python/tests/test_df_sql.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow as pa
+import pytest
+from datafusion import ExecutionContext
+
+
[email protected]
+def ctx():
+ return ExecutionContext()
+
+
+def test_register_record_batches(ctx):
+
+ # create a RecordBatch and register it as memtable
+ batch = pa.RecordBatch.from_arrays(
+ [pa.array([1, 2, 3]), pa.array([4, 5, 6])],
+ names=["a", "b"],
+ )
+
+ ctx.register_record_batches("t", [[batch]])
+
+ assert ctx.tables() == {"t"}
+
+ result = ctx.sql("SELECT a+b, a-b FROM t").collect()
+
+ assert result[0].column(0) == pa.array([5, 7, 9])
+ assert result[0].column(1) == pa.array([-3, -3, -3])