This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 08e3043420 Publish built docs triggered by
5e9dd44edddfbd524b9d2dc7684d9880114a4fba
08e3043420 is described below
commit 08e3043420408fc7632ba1ec9d40aa7b5f577cbe
Author: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Oct 8 07:18:14 2023 +0000
Publish built docs triggered by 5e9dd44edddfbd524b9d2dc7684d9880114a4fba
---
.../1a0b5c48023f0bdbcb8dff8324195c3c/expr_api.rs | 134 ----------
.../97b381d1ef654142806e5e91c3788385/simple_udf.rs | 144 ----------
.../simple_udwf.rs | 195 --------------
.../9dc934d68aef6b44bd3235a7dbc9a8be/catalog.rs | 289 ---------------------
.../9f6fbc67bd5c63cb1fd7ba4efdf82d7a/example.csv | 2 -
.../simple_udaf.rs | 178 -------------
_sources/library-user-guide/adding-udfs.md.txt | 10 +-
_sources/library-user-guide/catalogs.md.txt | 2 +-
.../library-user-guide/working-with-exprs.md.txt | 4 +-
_sources/user-guide/example-usage.md.txt | 4 +-
library-user-guide/adding-udfs.html | 6 +-
library-user-guide/catalogs.html | 2 +-
library-user-guide/working-with-exprs.html | 4 +-
user-guide/example-usage.html | 4 +-
14 files changed, 18 insertions(+), 960 deletions(-)
diff --git a/_downloads/1a0b5c48023f0bdbcb8dff8324195c3c/expr_api.rs
b/_downloads/1a0b5c48023f0bdbcb8dff8324195c3c/expr_api.rs
deleted file mode 100644
index 97abf4d552..0000000000
--- a/_downloads/1a0b5c48023f0bdbcb8dff8324195c3c/expr_api.rs
+++ /dev/null
@@ -1,134 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
-use datafusion::error::Result;
-use datafusion::optimizer::simplify_expressions::{ExprSimplifier,
SimplifyContext};
-use datafusion::physical_expr::execution_props::ExecutionProps;
-use datafusion::prelude::*;
-use datafusion_common::{ScalarValue, ToDFSchema};
-use datafusion_expr::expr::BinaryExpr;
-use datafusion_expr::Operator;
-
-/// This example demonstrates the DataFusion [`Expr`] API.
-///
-/// DataFusion comes with a powerful and extensive system for
-/// representing and manipulating expressions such as `A + 5` and `X
-/// IN ('foo', 'bar', 'baz')` and many other constructs.
-#[tokio::main]
-async fn main() -> Result<()> {
- // The easiest way to do create expressions is to use the
- // "fluent"-style API, like this:
- let expr = col("a") + lit(5);
-
- // this creates the same expression as the following though with
- // much less code,
- let expr2 = Expr::BinaryExpr(BinaryExpr::new(
- Box::new(col("a")),
- Operator::Plus,
- Box::new(Expr::Literal(ScalarValue::Int32(Some(5)))),
- ));
- assert_eq!(expr, expr2);
-
- simplify_demo()?;
-
- Ok(())
-}
-
-/// In addition to easy construction, DataFusion exposes APIs for
-/// working with and simplifying such expressions that call into the
-/// same powerful and extensive implementation used for the query
-/// engine.
-fn simplify_demo() -> Result<()> {
- // For example, lets say you have has created an expression such
- // ts = to_timestamp("2020-09-08T12:00:00+00:00")
- let expr = col("ts").eq(call_fn(
- "to_timestamp",
- vec![lit("2020-09-08T12:00:00+00:00")],
- )?);
-
- // Naively evaluating such an expression against a large number of
- // rows would involve re-converting "2020-09-08T12:00:00+00:00" to a
- // timestamp for each row which gets expensive
- //
- // However, DataFusion's simplification logic can do this for you
-
- // you need to tell DataFusion the type of column "ts":
- let schema = Schema::new(vec![make_ts_field("ts")]).to_dfschema_ref()?;
-
- // And then build a simplifier
- // the ExecutionProps carries information needed to simplify
- // expressions, such as the current time (to evaluate `now()`
- // correctly)
- let props = ExecutionProps::new();
- let context = SimplifyContext::new(&props).with_schema(schema);
- let simplifier = ExprSimplifier::new(context);
-
- // And then call the simplify_expr function:
- let expr = simplifier.simplify(expr)?;
-
- // DataFusion has simplified the expression to a comparison with a constant
- // ts = 1599566400000000000; Tada!
- assert_eq!(
- expr,
- col("ts").eq(lit_timestamp_nano(1599566400000000000i64))
- );
-
- // here are some other examples of what DataFusion is capable of
- let schema = Schema::new(vec![
- make_field("i", DataType::Int64),
- make_field("b", DataType::Boolean),
- ])
- .to_dfschema_ref()?;
- let context = SimplifyContext::new(&props).with_schema(schema);
- let simplifier = ExprSimplifier::new(context);
-
- // basic arithmetic simplification
- // i + 1 + 2 => a + 3
- // (note this is not done if the expr is (col("i") + (lit(1) + lit(2))))
- assert_eq!(
- simplifier.simplify(col("i") + (lit(1) + lit(2)))?,
- col("i") + lit(3)
- );
-
- // (i * 0) > 5 --> false (only if null)
- assert_eq!(
- simplifier.simplify((col("i") * lit(0)).gt(lit(5)))?,
- lit(false)
- );
-
- // Logical simplification
-
- // ((i > 5) AND FALSE) OR (i < 10) --> i < 10
- assert_eq!(
- simplifier
-
.simplify(col("i").gt(lit(5)).and(lit(false)).or(col("i").lt(lit(10))))?,
- col("i").lt(lit(10))
- );
-
- Ok(())
-}
-
-fn make_field(name: &str, data_type: DataType) -> Field {
- let nullable = false;
- Field::new(name, data_type, nullable)
-}
-
-fn make_ts_field(name: &str) -> Field {
- let tz = None;
- make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
-}
diff --git a/_downloads/97b381d1ef654142806e5e91c3788385/simple_udf.rs
b/_downloads/97b381d1ef654142806e5e91c3788385/simple_udf.rs
deleted file mode 100644
index dba4385b8e..0000000000
--- a/_downloads/97b381d1ef654142806e5e91c3788385/simple_udf.rs
+++ /dev/null
@@ -1,144 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use datafusion::{
- arrow::{
- array::{ArrayRef, Float32Array, Float64Array},
- datatypes::DataType,
- record_batch::RecordBatch,
- },
- logical_expr::Volatility,
-};
-
-use datafusion::prelude::*;
-use datafusion::{error::Result,
physical_plan::functions::make_scalar_function};
-use datafusion_common::cast::as_float64_array;
-use std::sync::Arc;
-
-// create local execution context with an in-memory table
-fn create_context() -> Result<SessionContext> {
- use datafusion::arrow::datatypes::{Field, Schema};
- // define a schema.
- let schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::Float32, false),
- Field::new("b", DataType::Float64, false),
- ]));
-
- // define data.
- let batch = RecordBatch::try_new(
- schema,
- vec![
- Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])),
- Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
- ],
- )?;
-
- // declare a new context. In spark API, this corresponds to a new spark
SQLsession
- let ctx = SessionContext::new();
-
- // declare a table in memory. In spark API, this corresponds to
createDataFrame(...).
- ctx.register_batch("t", batch)?;
- Ok(ctx)
-}
-
-/// In this example we will declare a single-type, single return type UDF that
exponentiates f64, a^b
-#[tokio::main]
-async fn main() -> Result<()> {
- let ctx = create_context()?;
-
- // First, declare the actual implementation of the calculation
- let pow = |args: &[ArrayRef]| {
- // in DataFusion, all `args` and output are dynamically-typed arrays,
which means that we need to:
- // 1. cast the values to the type we want
- // 2. perform the computation for every element in the array (using a
loop or SIMD) and construct the result
-
- // this is guaranteed by DataFusion based on the function's signature.
- assert_eq!(args.len(), 2);
-
- // 1. cast both arguments to f64. These casts MUST be aligned with the
signature or this function panics!
- let base = as_float64_array(&args[0]).expect("cast failed");
- let exponent = as_float64_array(&args[1]).expect("cast failed");
-
- // this is guaranteed by DataFusion. We place it just to make it
obvious.
- assert_eq!(exponent.len(), base.len());
-
- // 2. perform the computation
- let array = base
- .iter()
- .zip(exponent.iter())
- .map(|(base, exponent)| {
- match (base, exponent) {
- // in arrow, any value can be null.
- // Here we decide to make our UDF to return null when
either base or exponent is null.
- (Some(base), Some(exponent)) => Some(base.powf(exponent)),
- _ => None,
- }
- })
- .collect::<Float64Array>();
-
- // `Ok` because no error occurred during the calculation (we should
add one if exponent was [0, 1[ and the base < 0 because that panics!)
- // `Arc` because arrays are immutable, thread-safe, trait objects.
- Ok(Arc::new(array) as ArrayRef)
- };
- // the function above expects an `ArrayRef`, but DataFusion may pass a
scalar to a UDF.
- // thus, we use `make_scalar_function` to decorare the closure so that it
can handle both Arrays and Scalar values.
- let pow = make_scalar_function(pow);
-
- // Next:
- // * give it a name so that it shows nicely when the plan is printed
- // * declare what input it expects
- // * declare its return type
- let pow = create_udf(
- "pow",
- // expects two f64
- vec![DataType::Float64, DataType::Float64],
- // returns f64
- Arc::new(DataType::Float64),
- Volatility::Immutable,
- pow,
- );
-
- // at this point, we can use it or register it, depending on the use-case:
- // * if the UDF is expected to be used throughout the program in different
contexts,
- // we can register it, and call it later:
- ctx.register_udf(pow.clone()); // clone is only required in this example
because we show both usages
-
- // * if the UDF is expected to be used directly in the scope, `.call` it
directly:
- let expr = pow.call(vec![col("a"), col("b")]);
-
- // get a DataFrame from the context
- let df = ctx.table("t").await?;
-
- // if we do not have `pow` in the scope and we registered it, we can get
it from the registry
- let pow = df.registry().udf("pow")?;
- // equivalent to expr
- let expr1 = pow.call(vec![col("a"), col("b")]);
-
- // equivalent to `'SELECT pow(a, b), pow(a, b) AS pow1 FROM t'`
- let df = df.select(vec![
- expr,
- // alias so that they have different column names
- expr1.alias("pow1"),
- ])?;
-
- // note that "b" is f32, not f64. DataFusion coerces the types to match
the UDF's signature.
-
- // print the results
- df.show().await?;
-
- Ok(())
-}
diff --git a/_downloads/9d6db4688a18f2550d093295ce0f3231/simple_udwf.rs
b/_downloads/9d6db4688a18f2550d093295ce0f3231/simple_udwf.rs
deleted file mode 100644
index 39042a3562..0000000000
--- a/_downloads/9d6db4688a18f2550d093295ce0f3231/simple_udwf.rs
+++ /dev/null
@@ -1,195 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-
-use arrow::{
- array::{ArrayRef, AsArray, Float64Array},
- datatypes::Float64Type,
-};
-use arrow_schema::DataType;
-use datafusion::datasource::file_format::options::CsvReadOptions;
-
-use datafusion::error::Result;
-use datafusion::prelude::*;
-use datafusion_common::ScalarValue;
-use datafusion_expr::{PartitionEvaluator, Volatility, WindowFrame};
-
-// create local execution context with `cars.csv` registered as a table named
`cars`
-async fn create_context() -> Result<SessionContext> {
- // declare a new context. In spark API, this corresponds to a new spark
SQL session
- let ctx = SessionContext::new();
-
- // declare a table in memory. In spark API, this corresponds to
createDataFrame(...).
- println!("pwd: {}", std::env::current_dir().unwrap().display());
- let csv_path = "datafusion/core/tests/data/cars.csv".to_string();
- let read_options = CsvReadOptions::default().has_header(true);
-
- ctx.register_csv("cars", &csv_path, read_options).await?;
- Ok(ctx)
-}
-
-/// In this example we will declare a user defined window function that
computes a moving average and then run it using SQL
-#[tokio::main]
-async fn main() -> Result<()> {
- let ctx = create_context().await?;
-
- // here is where we define the UDWF. We also declare its signature:
- let smooth_it = create_udwf(
- "smooth_it",
- DataType::Float64,
- Arc::new(DataType::Float64),
- Volatility::Immutable,
- Arc::new(make_partition_evaluator),
- );
-
- // register the window function with DataFusion so we can call it
- ctx.register_udwf(smooth_it.clone());
-
- // Use SQL to run the new window function
- let df = ctx.sql("SELECT * from cars").await?;
- // print the results
- df.show().await?;
-
- // Use SQL to run the new window function:
- //
- // `PARTITION BY car`:each distinct value of car (red, and green)
- // should be treated as a separate partition (and will result in
- // creating a new `PartitionEvaluator`)
- //
- // `ORDER BY time`: within each partition ('green' or 'red') the
- // rows will be be ordered by the value in the `time` column
- //
- // `evaluate_inside_range` is invoked with a window defined by the
- // SQL. In this case:
- //
- // The first invocation will be passed row 0, the first row in the
- // partition.
- //
- // The second invocation will be passed rows 0 and 1, the first
- // two rows in the partition.
- //
- // etc.
- let df = ctx
- .sql(
- "SELECT \
- car, \
- speed, \
- smooth_it(speed) OVER (PARTITION BY car ORDER BY time),\
- time \
- from cars \
- ORDER BY \
- car",
- )
- .await?;
- // print the results
- df.show().await?;
-
- // this time, call the new widow function with an explicit
- // window so evaluate will be invoked with each window.
- //
- // `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation
- // sees at most 3 rows: the row before, the current row, and the 1
- // row afterward.
- let df = ctx.sql(
- "SELECT \
- car, \
- speed, \
- smooth_it(speed) OVER (PARTITION BY car ORDER BY time ROWS BETWEEN
1 PRECEDING AND 1 FOLLOWING),\
- time \
- from cars \
- ORDER BY \
- car",
- ).await?;
- // print the results
- df.show().await?;
-
- // Now, run the function using the DataFrame API:
- let window_expr = smooth_it.call(
- vec![col("speed")], // smooth_it(speed)
- vec![col("car")], // PARTITION BY car
- vec![col("time").sort(true, true)], // ORDER BY time ASC
- WindowFrame::new(false),
- );
- let df = ctx.table("cars").await?.window(vec![window_expr])?;
-
- // print the results
- df.show().await?;
-
- Ok(())
-}
-
-/// Create a `PartitionEvalutor` to evaluate this function on a new
-/// partition.
-fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> {
- Ok(Box::new(MyPartitionEvaluator::new()))
-}
-
-/// This implements the lowest level evaluation for a window function
-///
-/// It handles calculating the value of the window function for each
-/// distinct values of `PARTITION BY` (each car type in our example)
-#[derive(Clone, Debug)]
-struct MyPartitionEvaluator {}
-
-impl MyPartitionEvaluator {
- fn new() -> Self {
- Self {}
- }
-}
-
-/// Different evaluation methods are called depending on the various
-/// settings of WindowUDF. This example uses the simplest and most
-/// general, `evaluate`. See `PartitionEvaluator` for the other more
-/// advanced uses.
-impl PartitionEvaluator for MyPartitionEvaluator {
- /// Tell DataFusion the window function varies based on the value
- /// of the window frame.
- fn uses_window_frame(&self) -> bool {
- true
- }
-
- /// This function is called once per input row.
- ///
- /// `range`specifies which indexes of `values` should be
- /// considered for the calculation.
- ///
- /// Note this is the SLOWEST, but simplest, way to evaluate a
- /// window function. It is much faster to implement
- /// evaluate_all or evaluate_all_with_rank, if possible
- fn evaluate(
- &mut self,
- values: &[ArrayRef],
- range: &std::ops::Range<usize>,
- ) -> Result<ScalarValue> {
- // Again, the input argument is an array of floating
- // point numbers to calculate a moving average
- let arr: &Float64Array =
values[0].as_ref().as_primitive::<Float64Type>();
-
- let range_len = range.end - range.start;
-
- // our smoothing function will average all the values in the
- let output = if range_len > 0 {
- let sum: f64 =
arr.values().iter().skip(range.start).take(range_len).sum();
- Some(sum / range_len as f64)
- } else {
- None
- };
-
- Ok(ScalarValue::Float64(output))
- }
-}
diff --git a/_downloads/9dc934d68aef6b44bd3235a7dbc9a8be/catalog.rs
b/_downloads/9dc934d68aef6b44bd3235a7dbc9a8be/catalog.rs
deleted file mode 100644
index aa9fd103a5..0000000000
--- a/_downloads/9dc934d68aef6b44bd3235a7dbc9a8be/catalog.rs
+++ /dev/null
@@ -1,289 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Simple example of a catalog/schema implementation.
-//!
-//! Example requires git submodules to be initialized in repo as it uses data
from
-//! the `parquet-testing` repo.
-use async_trait::async_trait;
-use datafusion::{
- arrow::util::pretty,
- catalog::{
- schema::SchemaProvider,
- {CatalogList, CatalogProvider},
- },
- datasource::{
- file_format::{csv::CsvFormat, parquet::ParquetFormat, FileFormat},
- listing::{ListingOptions, ListingTable, ListingTableConfig,
ListingTableUrl},
- TableProvider,
- },
- error::Result,
- execution::context::SessionState,
- prelude::SessionContext,
-};
-use std::sync::RwLock;
-use std::{
- any::Any,
- collections::HashMap,
- path::{Path, PathBuf},
- sync::Arc,
-};
-
-#[tokio::main]
-async fn main() -> Result<()> {
- let repo_dir = std::fs::canonicalize(
- PathBuf::from(env!("CARGO_MANIFEST_DIR"))
- // parent dir of datafusion-examples = repo root
- .join(".."),
- )
- .unwrap();
- let mut ctx = SessionContext::new();
- let state = ctx.state();
- let catlist = Arc::new(CustomCatalogList::new());
- // use our custom catalog list for context. each context has a single
catalog list.
- // context will by default have MemoryCatalogList
- ctx.register_catalog_list(catlist.clone());
-
- // initialize our catalog and schemas
- let catalog = DirCatalog::new();
- let parquet_schema = DirSchema::create(
- &state,
- DirSchemaOpts {
- format: Arc::new(ParquetFormat::default()),
- dir: &repo_dir.join("parquet-testing").join("data"),
- ext: "parquet",
- },
- )
- .await?;
- let csv_schema = DirSchema::create(
- &state,
- DirSchemaOpts {
- format: Arc::new(CsvFormat::default()),
- dir: &repo_dir.join("testing").join("data").join("csv"),
- ext: "csv",
- },
- )
- .await?;
- // register schemas into catalog
- catalog.register_schema("parquet", parquet_schema.clone())?;
- catalog.register_schema("csv", csv_schema.clone())?;
- // register our catalog in the context
- ctx.register_catalog("dircat", Arc::new(catalog));
- {
- // catalog was passed down into our custom catalog list since we
overide the ctx's default
- let catalogs = catlist.catalogs.read().unwrap();
- assert!(catalogs.contains_key("dircat"));
- };
- // take the first 5 (arbitrary amount) keys from our schema's hashmap.
- // in our `DirSchema`, the table names are equivalent to their key in the
hashmap,
- // so any key in the hashmap will now be a queryable in our datafusion
context.
- let parquet_tables = {
- let tables = parquet_schema.tables.read().unwrap();
- tables.keys().take(5).cloned().collect::<Vec<_>>()
- };
- for table in parquet_tables {
- println!("querying table {table} from parquet schema");
- let df = ctx
- .sql(&format!("select * from dircat.parquet.\"{table}\" "))
- .await?
- .limit(0, Some(5))?;
- let result = df.collect().await;
- match result {
- Ok(batches) => {
- pretty::print_batches(&batches).unwrap();
- }
- Err(e) => {
- println!("table '{table}' query failed due to {e}");
- }
- }
- }
- let table_to_drop = {
- let parquet_tables = parquet_schema.tables.read().unwrap();
- parquet_tables.keys().next().unwrap().to_owned()
- };
- // DDL example
- let df = ctx
- .sql(&format!("DROP TABLE dircat.parquet.\"{table_to_drop}\""))
- .await?;
- df.collect().await?;
- let parquet_tables = parquet_schema.tables.read().unwrap();
- // datafusion has deregistered the table from our schema
- // (called our schema's deregister func)
- assert!(!parquet_tables.contains_key(&table_to_drop));
- Ok(())
-}
-
-struct DirSchemaOpts<'a> {
- ext: &'a str,
- dir: &'a Path,
- format: Arc<dyn FileFormat>,
-}
-/// Schema where every file with extension `ext` in a given `dir` is a table.
-struct DirSchema {
- ext: String,
- tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
-}
-impl DirSchema {
- async fn create(state: &SessionState, opts: DirSchemaOpts<'_>) ->
Result<Arc<Self>> {
- let DirSchemaOpts { ext, dir, format } = opts;
- let mut tables = HashMap::new();
- let listdir = std::fs::read_dir(dir).unwrap();
- for res in listdir {
- let entry = res.unwrap();
- let filename = entry.file_name().to_str().unwrap().to_string();
- if !filename.ends_with(ext) {
- continue;
- }
-
- let table_path =
ListingTableUrl::parse(entry.path().to_str().unwrap())?;
- let opts = ListingOptions::new(format.clone());
- let conf = ListingTableConfig::new(table_path)
- .with_listing_options(opts)
- .infer_schema(state)
- .await?;
- let table = ListingTable::try_new(conf)?;
- tables.insert(filename, Arc::new(table) as Arc<dyn TableProvider>);
- }
- Ok(Arc::new(Self {
- tables: RwLock::new(tables),
- ext: ext.to_string(),
- }))
- }
- #[allow(unused)]
- fn name(&self) -> &str {
- &self.ext
- }
-}
-
-#[async_trait]
-impl SchemaProvider for DirSchema {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn table_names(&self) -> Vec<String> {
- let tables = self.tables.read().unwrap();
- tables.keys().cloned().collect::<Vec<_>>()
- }
-
- async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
- let tables = self.tables.read().unwrap();
- tables.get(name).cloned()
- }
-
- fn table_exist(&self, name: &str) -> bool {
- let tables = self.tables.read().unwrap();
- tables.contains_key(name)
- }
- fn register_table(
- &self,
- name: String,
- table: Arc<dyn TableProvider>,
- ) -> Result<Option<Arc<dyn TableProvider>>> {
- let mut tables = self.tables.write().unwrap();
- println!("adding table {name}");
- tables.insert(name, table.clone());
- Ok(Some(table))
- }
-
- /// If supported by the implementation, removes an existing table from
this schema and returns it.
- /// If no table of that name exists, returns Ok(None).
- #[allow(unused_variables)]
- fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn
TableProvider>>> {
- let mut tables = self.tables.write().unwrap();
- println!("dropping table {name}");
- Ok(tables.remove(name))
- }
-}
-/// Catalog holds multiple schemas
-struct DirCatalog {
- schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
-}
-impl DirCatalog {
- fn new() -> Self {
- Self {
- schemas: RwLock::new(HashMap::new()),
- }
- }
-}
-impl CatalogProvider for DirCatalog {
- fn as_any(&self) -> &dyn Any {
- self
- }
- fn register_schema(
- &self,
- name: &str,
- schema: Arc<dyn SchemaProvider>,
- ) -> Result<Option<Arc<dyn SchemaProvider>>> {
- let mut schema_map = self.schemas.write().unwrap();
- schema_map.insert(name.to_owned(), schema.clone());
- Ok(Some(schema))
- }
-
- fn schema_names(&self) -> Vec<String> {
- let schemas = self.schemas.read().unwrap();
- schemas.keys().cloned().collect()
- }
-
- fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
- let schemas = self.schemas.read().unwrap();
- let maybe_schema = schemas.get(name);
- if let Some(schema) = maybe_schema {
- let schema = schema.clone() as Arc<dyn SchemaProvider>;
- Some(schema)
- } else {
- None
- }
- }
-}
-/// Catalog lists holds multiple catalogs. Each context has a single catalog
list.
-struct CustomCatalogList {
- catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
-}
-impl CustomCatalogList {
- fn new() -> Self {
- Self {
- catalogs: RwLock::new(HashMap::new()),
- }
- }
-}
-impl CatalogList for CustomCatalogList {
- fn as_any(&self) -> &dyn Any {
- self
- }
- fn register_catalog(
- &self,
- name: String,
- catalog: Arc<dyn CatalogProvider>,
- ) -> Option<Arc<dyn CatalogProvider>> {
- let mut cats = self.catalogs.write().unwrap();
- cats.insert(name, catalog.clone());
- Some(catalog)
- }
-
- /// Retrieves the list of available catalog names
- fn catalog_names(&self) -> Vec<String> {
- let cats = self.catalogs.read().unwrap();
- cats.keys().cloned().collect()
- }
-
- /// Retrieves a specific catalog by name, provided it exists.
- fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
- let cats = self.catalogs.read().unwrap();
- cats.get(name).cloned()
- }
-}
diff --git a/_downloads/9f6fbc67bd5c63cb1fd7ba4efdf82d7a/example.csv
b/_downloads/9f6fbc67bd5c63cb1fd7ba4efdf82d7a/example.csv
deleted file mode 100644
index 0eadb69396..0000000000
--- a/_downloads/9f6fbc67bd5c63cb1fd7ba4efdf82d7a/example.csv
+++ /dev/null
@@ -1,2 +0,0 @@
-a,b,c
-1,2,3
\ No newline at end of file
diff --git a/_downloads/ef95791a547ed7bb24fa06bdcae9272d/simple_udaf.rs
b/_downloads/ef95791a547ed7bb24fa06bdcae9272d/simple_udaf.rs
deleted file mode 100644
index 7aec9698d9..0000000000
--- a/_downloads/ef95791a547ed7bb24fa06bdcae9272d/simple_udaf.rs
+++ /dev/null
@@ -1,178 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-/// In this example we will declare a single-type, single return type UDAF
that computes the geometric mean.
-/// The geometric mean is described here:
https://en.wikipedia.org/wiki/Geometric_mean
-use datafusion::arrow::{
- array::ArrayRef, array::Float32Array, datatypes::DataType,
record_batch::RecordBatch,
-};
-use datafusion::{error::Result, physical_plan::Accumulator};
-use datafusion::{logical_expr::Volatility, prelude::*, scalar::ScalarValue};
-use datafusion_common::cast::as_float64_array;
-use datafusion_expr::create_udaf;
-use std::sync::Arc;
-
-// create local session context with an in-memory table
-fn create_context() -> Result<SessionContext> {
- use datafusion::arrow::datatypes::{Field, Schema};
- use datafusion::datasource::MemTable;
- // define a schema.
- let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
false)]));
-
- // define data in two partitions
- let batch1 = RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
- )?;
- let batch2 = RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(Float32Array::from(vec![64.0]))],
- )?;
-
- // declare a new context. In spark API, this corresponds to a new spark
SQLsession
- let ctx = SessionContext::new();
-
- // declare a table in memory. In spark API, this corresponds to
createDataFrame(...).
- let provider = MemTable::try_new(schema, vec![vec![batch1],
vec![batch2]])?;
- ctx.register_table("t", Arc::new(provider))?;
- Ok(ctx)
-}
-
-/// A UDAF has state across multiple rows, and thus we require a `struct` with
that state.
-#[derive(Debug)]
-struct GeometricMean {
- n: u32,
- prod: f64,
-}
-
-impl GeometricMean {
- // how the struct is initialized
- pub fn new() -> Self {
- GeometricMean { n: 0, prod: 1.0 }
- }
-}
-
-// UDAFs are built using the trait `Accumulator`, that offers DataFusion the
necessary functions
-// to use them.
-impl Accumulator for GeometricMean {
- // This function serializes our state to `ScalarValue`, which DataFusion
uses
- // to pass this state between execution stages.
- // Note that this can be arbitrary data.
- fn state(&self) -> Result<Vec<ScalarValue>> {
- Ok(vec![
- ScalarValue::from(self.prod),
- ScalarValue::from(self.n),
- ])
- }
-
- // DataFusion expects this function to return the final value of this
aggregator.
- // in this case, this is the formula of the geometric mean
- fn evaluate(&self) -> Result<ScalarValue> {
- let value = self.prod.powf(1.0 / self.n as f64);
- Ok(ScalarValue::from(value))
- }
-
- // DataFusion calls this function to update the accumulator's state for a
batch
- // of inputs rows. In this case the product is updated with values from
the first column
- // and the count is updated based on the row count
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- if values.is_empty() {
- return Ok(());
- }
- let arr = &values[0];
- (0..arr.len()).try_for_each(|index| {
- let v = ScalarValue::try_from_array(arr, index)?;
-
- if let ScalarValue::Float64(Some(value)) = v {
- self.prod *= value;
- self.n += 1;
- } else {
- unreachable!("")
- }
- Ok(())
- })
- }
-
- // Optimization hint: this trait also supports `update_batch` and
`merge_batch`,
- // that can be used to perform these operations on arrays instead of
single values.
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- if states.is_empty() {
- return Ok(());
- }
- let arr = &states[0];
- (0..arr.len()).try_for_each(|index| {
- let v = states
- .iter()
- .map(|array| ScalarValue::try_from_array(array, index))
- .collect::<Result<Vec<_>>>()?;
- if let (ScalarValue::Float64(Some(prod)),
ScalarValue::UInt32(Some(n))) =
- (&v[0], &v[1])
- {
- self.prod *= prod;
- self.n += n;
- } else {
- unreachable!("")
- }
- Ok(())
- })
- }
-
- fn size(&self) -> usize {
- std::mem::size_of_val(self)
- }
-}
-
-#[tokio::main]
-async fn main() -> Result<()> {
- let ctx = create_context()?;
-
- // here is where we define the UDAF. We also declare its signature:
- let geometric_mean = create_udaf(
- // the name; used to represent it in plan descriptions and in the
registry, to use in SQL.
- "geo_mean",
- // the input type; DataFusion guarantees that the first entry of
`values` in `update` has this type.
- vec![DataType::Float64],
- // the return type; DataFusion expects this to match the type returned
by `evaluate`.
- Arc::new(DataType::Float64),
- Volatility::Immutable,
- // This is the accumulator factory; DataFusion uses it to create new
accumulators.
- Arc::new(|_| Ok(Box::new(GeometricMean::new()))),
- // This is the description of the state. `state()` must match the
types here.
- Arc::new(vec![DataType::Float64, DataType::UInt32]),
- );
-
- // get a DataFrame from the context
- // this table has 1 column `a` f32 with values {2,4,8,64}, whose geometric
mean is 8.0.
- let df = ctx.table("t").await?;
-
- // perform the aggregation
- let df = df.aggregate(vec![], vec![geometric_mean.call(vec![col("a")])])?;
-
- // note that "a" is f32, not f64. DataFusion coerces it to match the
UDAF's signature.
-
- // execute the query
- let results = df.collect().await?;
-
- // downcast the array to the expected type
- let result = as_float64_array(results[0].column(0))?;
-
- // verify that the calculation is correct
- assert!((result.value(0) - 8.0).abs() < f64::EPSILON);
- println!("The geometric mean of [2,4,8,64] is {}", result.value(0));
-
- Ok(())
-}
diff --git a/_sources/library-user-guide/adding-udfs.md.txt
b/_sources/library-user-guide/adding-udfs.md.txt
index d3f31bd45a..a4b5ed0b40 100644
--- a/_sources/library-user-guide/adding-udfs.md.txt
+++ b/_sources/library-user-guide/adding-udfs.md.txt
@@ -23,11 +23,11 @@ User Defined Functions (UDFs) are functions that can be
used in the context of D
This page covers how to add UDFs to DataFusion. In particular, it covers how
to add Scalar, Window, and Aggregate UDFs.
-| UDF Type | Description
| Example
|
-| --------- |
----------------------------------------------------------------------------------------------------------
| ---------------------------------------------------------------------- |
-| Scalar | A function that takes a row of data and returns a single value.
|
[simple_udf.rs](../../../datafusion-examples/examples/simple_udf.rs) |
-| Window | A function that takes a row of data and returns a single value,
but also has access to the rows around it. |
[simple_udwf.rs](../../../datafusion-examples/examples/simple_udwf.rs) |
-| Aggregate | A function that takes a group of rows and returns a single
value. |
[simple_udaf.rs](../../../datafusion-examples/examples/simple_udaf.rs) |
+| UDF Type | Description
| Example
|
+| --------- |
----------------------------------------------------------------------------------------------------------
|
------------------------------------------------------------------------------------------------------------------
|
+| Scalar | A function that takes a row of data and returns a single value.
|
[simple_udf.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simple_udf.rs)
|
+| Window | A function that takes a row of data and returns a single value,
but also has access to the rows around it. |
[simple_udwf.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simple_udwf.rs)
|
+| Aggregate | A function that takes a group of rows and returns a single
value. |
[simple_udaf.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simple_udaf.rs)
|
First we'll talk about adding an Scalar UDF end-to-end, then we'll talk about
the differences between the different types of UDFs.
diff --git a/_sources/library-user-guide/catalogs.md.txt
b/_sources/library-user-guide/catalogs.md.txt
index 1dd235f0a2..e53d163663 100644
--- a/_sources/library-user-guide/catalogs.md.txt
+++ b/_sources/library-user-guide/catalogs.md.txt
@@ -19,7 +19,7 @@
# Catalogs, Schemas, and Tables
-This section describes how to create and manage catalogs, schemas, and tables
in DataFusion. For those wanting to dive into the code quickly please see the
[example](../../../datafusion-examples/examples/catalog.rs).
+This section describes how to create and manage catalogs, schemas, and tables
in DataFusion. For those wanting to dive into the code quickly please see the
[example](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/catalog.rs).
## General Concepts
diff --git a/_sources/library-user-guide/working-with-exprs.md.txt
b/_sources/library-user-guide/working-with-exprs.md.txt
index 507e984acb..a8baf24d5f 100644
--- a/_sources/library-user-guide/working-with-exprs.md.txt
+++ b/_sources/library-user-guide/working-with-exprs.md.txt
@@ -52,8 +52,8 @@ As the writer of a library, you may want to use or create
`Expr`s to represent c
There are also executable examples for working with `Expr`s:
-- [rewrite_expr.rs](../../../datafusion-examples/examples/catalog.rs)
-- [expr_api.rs](../../../datafusion-examples/examples/expr_api.rs)
+-
[rewrite_expr.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs)
+-
[expr_api.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs)
## A Scalar UDF Example
diff --git a/_sources/user-guide/example-usage.md.txt
b/_sources/user-guide/example-usage.md.txt
index c631d552dd..a7557f9b0b 100644
--- a/_sources/user-guide/example-usage.md.txt
+++ b/_sources/user-guide/example-usage.md.txt
@@ -19,9 +19,9 @@
# Example Usage
-In this example some simple processing is performed on the
[`example.csv`](../../../datafusion/core/tests/data/example.csv) file.
+In this example some simple processing is performed on the
[`example.csv`](https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/data/example.csv)
file.
-Even [`more code examples`](../../../datafusion-examples) attached to the
project
+Even [`more code
examples`](https://github.com/apache/arrow-datafusion/tree/main/datafusion-examples)
attached to the project.
## Update `Cargo.toml`
diff --git a/library-user-guide/adding-udfs.html
b/library-user-guide/adding-udfs.html
index 2c49ba9dea..f2a112e4d2 100644
--- a/library-user-guide/adding-udfs.html
+++ b/library-user-guide/adding-udfs.html
@@ -428,15 +428,15 @@
<tbody>
<tr class="row-even"><td><p>Scalar</p></td>
<td><p>A function that takes a row of data and returns a single value.</p></td>
-<td><p><a class="reference download internal" download=""
href="../_downloads/97b381d1ef654142806e5e91c3788385/simple_udf.rs"><span
class="xref download myst">simple_udf.rs</span></a></p></td>
+<td><p><a class="reference external"
href="https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simple_udf.rs">simple_udf.rs</a></p></td>
</tr>
<tr class="row-odd"><td><p>Window</p></td>
<td><p>A function that takes a row of data and returns a single value, but
also has access to the rows around it.</p></td>
-<td><p><a class="reference download internal" download=""
href="../_downloads/9d6db4688a18f2550d093295ce0f3231/simple_udwf.rs"><span
class="xref download myst">simple_udwf.rs</span></a></p></td>
+<td><p><a class="reference external"
href="https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simple_udwf.rs">simple_udwf.rs</a></p></td>
</tr>
<tr class="row-even"><td><p>Aggregate</p></td>
<td><p>A function that takes a group of rows and returns a single
value.</p></td>
-<td><p><a class="reference download internal" download=""
href="../_downloads/ef95791a547ed7bb24fa06bdcae9272d/simple_udaf.rs"><span
class="xref download myst">simple_udaf.rs</span></a></p></td>
+<td><p><a class="reference external"
href="https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simple_udaf.rs">simple_udaf.rs</a></p></td>
</tr>
</tbody>
</table>
diff --git a/library-user-guide/catalogs.html b/library-user-guide/catalogs.html
index 3fdfbc14e3..8de6e4d12a 100644
--- a/library-user-guide/catalogs.html
+++ b/library-user-guide/catalogs.html
@@ -446,7 +446,7 @@
-->
<section id="catalogs-schemas-and-tables">
<h1>Catalogs, Schemas, and Tables<a class="headerlink"
href="#catalogs-schemas-and-tables" title="Link to this heading">¶</a></h1>
-<p>This section describes how to create and manage catalogs, schemas, and
tables in DataFusion. For those wanting to dive into the code quickly please
see the <a class="reference download internal" download=""
href="../_downloads/9dc934d68aef6b44bd3235a7dbc9a8be/catalog.rs"><span
class="xref download myst">example</span></a>.</p>
+<p>This section describes how to create and manage catalogs, schemas, and
tables in DataFusion. For those wanting to dive into the code quickly please
see the <a class="reference external"
href="https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/catalog.rs">example</a>.</p>
<section id="general-concepts">
<h2>General Concepts<a class="headerlink" href="#general-concepts" title="Link
to this heading">¶</a></h2>
<p>CatalogList, Catalogs, schemas, and tables are organized in a hierarchy. A
CatalogList contains catalogs, a catalog contains schemas and a schema contains
tables.</p>
diff --git a/library-user-guide/working-with-exprs.html
b/library-user-guide/working-with-exprs.html
index 93ced4454c..f04b5cf815 100644
--- a/library-user-guide/working-with-exprs.html
+++ b/library-user-guide/working-with-exprs.html
@@ -463,8 +463,8 @@
<p>As the writer of a library, you may want to use or create <code
class="docutils literal notranslate"><span class="pre">Expr</span></code>s to
represent computations that you want to perform. This guide will walk you
through how to make your own scalar UDF as an <code class="docutils literal
notranslate"><span class="pre">Expr</span></code> and how to rewrite <code
class="docutils literal notranslate"><span class="pre">Expr</span></code>s to
inline the simple UDF.</p>
<p>There are also executable examples for working with <code class="docutils
literal notranslate"><span class="pre">Expr</span></code>s:</p>
<ul class="simple">
-<li><p><a class="reference download internal" download=""
href="../_downloads/9dc934d68aef6b44bd3235a7dbc9a8be/catalog.rs"><span
class="xref download myst">rewrite_expr.rs</span></a></p></li>
-<li><p><a class="reference download internal" download=""
href="../_downloads/1a0b5c48023f0bdbcb8dff8324195c3c/expr_api.rs"><span
class="xref download myst">expr_api.rs</span></a></p></li>
+<li><p><a class="reference external"
href="https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs">rewrite_expr.rs</a></p></li>
+<li><p><a class="reference external"
href="https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs">expr_api.rs</a></p></li>
</ul>
<section id="a-scalar-udf-example">
<h2>A Scalar UDF Example<a class="headerlink" href="#a-scalar-udf-example"
title="Link to this heading">¶</a></h2>
diff --git a/user-guide/example-usage.html b/user-guide/example-usage.html
index f711497c55..339a38f92e 100644
--- a/user-guide/example-usage.html
+++ b/user-guide/example-usage.html
@@ -459,8 +459,8 @@
-->
<section id="example-usage">
<h1>Example Usage<a class="headerlink" href="#example-usage" title="Link to
this heading">¶</a></h1>
-<p>In this example some simple processing is performed on the <a
class="reference download internal" download=""
href="../_downloads/9f6fbc67bd5c63cb1fd7ba4efdf82d7a/example.csv"><span
class="xref download myst"><code class="docutils literal notranslate"><span
class="pre">example.csv</span></code></span></a> file.</p>
-<p>Even <a class="reference internal"
href="#../../../datafusion-examples"><span class="xref myst"><code
class="docutils literal notranslate"><span class="pre">more</span> <span
class="pre">code</span> <span class="pre">examples</span></code></span></a>
attached to the project</p>
+<p>In this example some simple processing is performed on the <a
class="reference external"
href="https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/data/example.csv"><code
class="docutils literal notranslate"><span
class="pre">example.csv</span></code></a> file.</p>
+<p>Even <a class="reference external"
href="https://github.com/apache/arrow-datafusion/tree/main/datafusion-examples"><code
class="docutils literal notranslate"><span class="pre">more</span> <span
class="pre">code</span> <span class="pre">examples</span></code></a> attached
to the project.</p>
<section id="update-cargo-toml">
<h2>Update <code class="docutils literal notranslate"><span
class="pre">Cargo.toml</span></code><a class="headerlink"
href="#update-cargo-toml" title="Link to this heading">¶</a></h2>
<p>Find latest available Datafusion version on <a class="reference external"
href="https://crates.io/crates/datafusion">DataFusion’s