This is an automated email from the ASF dual-hosted git repository.

houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new bfb3e6a  Add support to perform sql query on in-memory datasource. 
(#981)
bfb3e6a is described below

commit bfb3e6a8b2851473f749edaa882b5131cfd50ee0
Author: mmuru <[email protected]>
AuthorDate: Sun Sep 12 22:25:55 2021 -0700

    Add support to perform sql query on in-memory datasource. (#981)
    
    Co-authored-by: Muru Muthusamy <[email protected]>
---
 python/src/context.rs       | 23 +++++++++++++++++++++++
 python/tests/test_df_sql.py | 43 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 66 insertions(+)

diff --git a/python/src/context.rs b/python/src/context.rs
index 9acc14a..f57d754 100644
--- a/python/src/context.rs
+++ b/python/src/context.rs
@@ -95,6 +95,29 @@ impl ExecutionContext {
         ))
     }
 
+    fn register_record_batches(
+        &mut self,
+        name: &str,
+        partitions: Vec<Vec<PyObject>>,
+        py: Python,
+    ) -> PyResult<()> {
+        let partitions: Vec<Vec<RecordBatch>> = partitions
+            .iter()
+            .map(|batches| {
+                batches
+                    .iter()
+                    .map(|batch| to_rust::to_rust_batch(batch.as_ref(py)))
+                    .collect()
+            })
+            .collect::<PyResult<_>>()?;
+
+        let table =
+            errors::wrap(MemTable::try_new(partitions[0][0].schema(), 
partitions))?;
+
+        errors::wrap(self.ctx.register_table(&*name, Arc::new(table)))?;
+        Ok(())
+    }
+
     fn register_parquet(&mut self, name: &str, path: &str) -> PyResult<()> {
         errors::wrap(self.ctx.register_parquet(name, path))?;
         Ok(())
diff --git a/python/tests/test_df_sql.py b/python/tests/test_df_sql.py
new file mode 100644
index 0000000..17a7645
--- /dev/null
+++ b/python/tests/test_df_sql.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow as pa
+import pytest
+from datafusion import ExecutionContext
+
+
[email protected]
+def ctx():
+    return ExecutionContext()
+
+
+def test_register_record_batches(ctx):
+
+    # create a RecordBatch and register it as memtable
+    batch = pa.RecordBatch.from_arrays(
+        [pa.array([1, 2, 3]), pa.array([4, 5, 6])],
+        names=["a", "b"],
+    )
+
+    ctx.register_record_batches("t", [[batch]])
+
+    assert ctx.tables() == {"t"}
+
+    result = ctx.sql("SELECT a+b, a-b FROM t").collect()
+
+    assert result[0].column(0) == pa.array([5, 7, 9])
+    assert result[0].column(1) == pa.array([-3, -3, -3])

Reply via email to