This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new d71c436 Add examples from TPC-H (#666)
d71c436 is described below
commit d71c436ae2006843dc720bfdfcb8b3aeb434815e
Author: Tim Saucer <[email protected]>
AuthorDate: Mon May 13 10:00:18 2024 -0400
Add examples from TPC-H (#666)
* Update location of docker image
* Initial commit for queries 1-3
* Commit queries 4-7 of TPC-H in examples
* Add required license text
* Add additional text around why to use a case statement in the example
* add market share example
* Add example for product type profit measure
* Inital commit returned item report
* Linting
* Initial commit of q11 example
* Initial commit of q12 from tpc-h
* Initial commit for customer distribution example
* Initial commit of promotion effect example
* Initial commit of q15 in tph-c, top supplier
* Initial commit of q16 in tph-c, part supplier relationship
* Initial commit of q17 in tph-c, small quatity order
* Initial commit of q18 in tph-c, large volume customer
* Initial commit of q19 in tph-c, discounted revenue
* Initial commit of q20 in tph-c, potential part promotion
* Initial commit of q21 in tph-c, supplier who kept order waiting
* Initial commit of q22 in tph-c, global sales opportunity
* Adding readme information and marking text as copyrighted
* Minimum part cost must be identified per part not across all parts that
match the filters
* Change ordering of output rows to match spec
* Set parameter to match spec
* Set parameter to match spec
* setting values to match spec
* Linting
* Expand on readme to link to examples within tpch folder
* Minor typo
---
benchmarks/tpch/tpch-gen.sh | 6 +-
examples/README.md | 64 ++++++++
examples/tpch/.gitignore | 2 +
examples/tpch/README.md | 57 +++++++
examples/tpch/convert_data_to_parquet.py | 142 +++++++++++++++++
examples/tpch/q01_pricing_summary_report.py | 90 +++++++++++
examples/tpch/q02_minimum_cost_supplier.py | 139 ++++++++++++++++
examples/tpch/q03_shipping_priority.py | 86 ++++++++++
examples/tpch/q04_order_priority_checking.py | 80 ++++++++++
examples/tpch/q05_local_supplier_volume.py | 102 ++++++++++++
examples/tpch/q06_forecasting_revenue_change.py | 87 ++++++++++
examples/tpch/q07_volume_shipping.py | 123 +++++++++++++++
examples/tpch/q08_market_share.py | 175 +++++++++++++++++++++
examples/tpch/q09_product_type_profit_measure.py | 93 +++++++++++
examples/tpch/q10_returned_item_reporting.py | 108 +++++++++++++
.../tpch/q11_important_stock_identification.py | 82 ++++++++++
examples/tpch/q12_ship_mode_order_priority.py | 112 +++++++++++++
examples/tpch/q13_customer_distribution.py | 64 ++++++++
examples/tpch/q14_promotion_effect.py | 81 ++++++++++
examples/tpch/q15_top_supplier.py | 87 ++++++++++
examples/tpch/q16_part_supplier_relationship.py | 85 ++++++++++
examples/tpch/q17_small_quantity_order.py | 69 ++++++++
examples/tpch/q18_large_volume_customer.py | 65 ++++++++
examples/tpch/q19_discounted_revenue.py | 137 ++++++++++++++++
examples/tpch/q20_potential_part_promotion.py | 97 ++++++++++++
examples/tpch/q21_suppliers_kept_orders_waiting.py | 114 ++++++++++++++
examples/tpch/q22_global_sales_opportunity.py | 76 +++++++++
27 files changed, 2420 insertions(+), 3 deletions(-)
diff --git a/benchmarks/tpch/tpch-gen.sh b/benchmarks/tpch/tpch-gen.sh
index e27472a..15cab12 100755
--- a/benchmarks/tpch/tpch-gen.sh
+++ b/benchmarks/tpch/tpch-gen.sh
@@ -29,7 +29,7 @@ FILE=./data/supplier.tbl
if test -f "$FILE"; then
echo "$FILE exists."
else
- docker run -v `pwd`/data:/data -it --rm
ghcr.io/databloom-ai/tpch-docker:main -vf -s $1
+ docker run -v `pwd`/data:/data -it --rm ghcr.io/scalytics/tpch-docker:main
-vf -s $1
# workaround for https://github.com/apache/arrow-datafusion/issues/6147
mv data/customer.tbl data/customer.csv
@@ -49,5 +49,5 @@ FILE=./data/answers/q1.out
if test -f "$FILE"; then
echo "$FILE exists."
else
- docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm
ghcr.io/databloom-ai/tpch-docker:main -c "cp
/opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
-fi
\ No newline at end of file
+ docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm
ghcr.io/scalytics/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/*
/data/answers/"
+fi
diff --git a/examples/README.md b/examples/README.md
index 8240595..0ef194a 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -52,3 +52,67 @@ Here is a direct link to the file used in the examples:
- [Executing SQL on Polars](./sql-on-polars.py)
- [Executing SQL on Pandas](./sql-on-pandas.py)
- [Executing SQL on cuDF](./sql-on-cudf.py)
+
+## TPC-H Examples
+
+Within the subdirectory `tpch` there are 22 examples that reproduce queries in
+the TPC-H specification. These include realistic data that can be generated at
+arbitrary scale and allow the user to see use cases for a variety of data frame
+operations.
+
+In the list below we describe which new operations can be found in the
examples.
+The queries are designed to be of increasing complexity, so it is recommended
to
+review them in order. For brevity, the following list does not include
operations
+found in previous examples.
+
+- [Convert CSV to Parquet](./tpch/convert_data_to_parquet.py)
+ - Read from a CSV files where the delimiter is something other than a comma
+ - Specify schema during CVS reading
+ - Write to a parquet file
+- [Pricing Summary Report](./tpch/q01_pricing_summary_report.py)
+ - Aggregation computing the maximum value, average, sum, and number of
entries
+ - Filter data by date and interval
+ - Sorting
+- [Minimum Cost Supplier](./tpch/q02_minimum_cost_supplier.py)
+ - Window operation to find minimum
+ - Sorting in descending order
+- [Shipping Priority](./tpch/q03_shipping_priority.py)
+- [Order Priority Checking](./tpch/q04_order_priority_checking.py)
+ - Aggregating multiple times in one data frame
+- [Local Supplier Volume](./tpch/q05_local_supplier_volume.py)
+- [Forecasting Revenue Change](./tpch/q06_forecasting_revenue_change.py)
+ - Using collect and extracting values as a python object
+- [Volume Shipping](./tpch/q07_volume_shipping.py)
+ - Finding multiple distinct and mutually exclusive values within one
dataframe
+ - Using `case` and `when` statements
+- [Market Share](./tpch/q08_market_share.py)
+ - The operations in this query are similar to those in the prior examples,
but
+ it is a more complex example of using filters, joins, and aggregates
+ - Using left outer joins
+- [Product Type Profit Measure](./tpch/q09_product_type_profit_measure.py)
+ - Extract year from a date
+- [Returned Item Reporting](./tpch/q10_returned_item_reporting.py)
+- [Important Stock
Identification](./tpch/q11_important_stock_identification.py)
+- [Shipping Modes and Order](./tpch/q12_ship_mode_order_priority.py)
+ - Finding non-null values using a boolean operation in a filter
+ - Case statement with default value
+- [Customer Distribution](./tpch/q13_customer_distribution.py)
+- [Promotion Effect](./tpch/q14_promotion_effect.py)
+- [Top Supplier](./tpch/q15_top_supplier.py)
+- [Parts/Supplier Relationship](./tpch/q16_part_supplier_relationship.py)
+ - Using anti joins
+ - Using regular expressions (regex)
+ - Creating arrays of literal values
+ - Determine if an element exists within an array
+- [Small-Quantity-Order Revenue](./tpch/q17_small_quantity_order.py)
+- [Large Volume Customer](./tpch/q18_large_volume_customer.py)
+- [Discounted Revenue](./tpch/q19_discounted_revenue.py)
+ - Creating a user defined function (UDF)
+ - Convert pyarrow Array to python values
+ - Filtering based on a UDF
+- [Potential Part Promotion](./tpch/q20_potential_part_promotion.py)
+ - Extracting part of a string using substr
+- [Suppliers Who Kept Orders
Waiting](./tpch/q21_suppliers_kept_orders_waiting.py)
+ - Using array aggregation
+ - Determining the size of array elements
+- [Global Sales Opportunity](./tpch/q22_global_sales_opportunity.py)
diff --git a/examples/tpch/.gitignore b/examples/tpch/.gitignore
new file mode 100644
index 0000000..9e67bd4
--- /dev/null
+++ b/examples/tpch/.gitignore
@@ -0,0 +1,2 @@
+data
+
diff --git a/examples/tpch/README.md b/examples/tpch/README.md
new file mode 100644
index 0000000..7c52c82
--- /dev/null
+++ b/examples/tpch/README.md
@@ -0,0 +1,57 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# DataFusion Python Examples for TPC-H
+
+These examples reproduce the problems listed in the Transaction Process Council
+TPC-H benchmark. The purpose of these examples is to demonstrate how to use
+different aspects of Data Fusion and not necessarily geared towards creating
the
+most performant queries possible. Within each example is a description of the
+problem. For users who are familiar with SQL style commands, you can compare
the
+approaches in these examples with those listed in the specification.
+
+- https://www.tpc.org/tpch/
+
+The examples provided are based on version 2.18.0 of the TPC-H specification.
+
+## Data Setup
+
+To run these examples, you must first generate a dataset. The `dbgen` tool
+provided by TPC can create datasets of arbitrary scale. For testing it is
+typically sufficient to create a 1 gigabyte dataset. For convenience, this
+repository has a script which uses docker to create this dataset. From the
+`benchmarks/tpch` directory execute the following script.
+
+```bash
+./tpch-gen.sh 1
+```
+
+The examples provided use parquet files for the tables generated by `dbgen`.
+A python script is provided to convert the text files from `dbgen` into parquet
+files expected by the examples. From the `examples/tpch` directory you can
+execute the following command to create the necessary parquet files.
+
+```bash
+python convert_data_to_parquet.py
+```
+
+## Description of Examples
+
+For easier access, a description of the techniques demonstrated in each file
+is in the README.md file in the `examples` directory.
diff --git a/examples/tpch/convert_data_to_parquet.py
b/examples/tpch/convert_data_to_parquet.py
new file mode 100644
index 0000000..178b7fb
--- /dev/null
+++ b/examples/tpch/convert_data_to_parquet.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is a utility function that will consumer the data generated by dbgen from
TPC-H and convert
+it into a parquet file with the column names as expected by the TPC-H
specification. It assumes
+the data generated resides in a path ../../benchmarks/tpch/data relative to
the current file,
+as will be generated by the script provided in this repository.
+"""
+
+import os
+import pyarrow
+import datafusion
+
+ctx = datafusion.SessionContext()
+
+all_schemas = {}
+
+all_schemas["customer"] = [
+ ("C_CUSTKEY", pyarrow.int32()),
+ ("C_NAME", pyarrow.string()),
+ ("C_ADDRESS", pyarrow.string()),
+ ("C_NATIONKEY", pyarrow.int32()),
+ ("C_PHONE", pyarrow.string()),
+ ("C_ACCTBAL", pyarrow.float32()),
+ ("C_MKTSEGMENT", pyarrow.string()),
+ ("C_COMMENT", pyarrow.string()),
+]
+
+all_schemas["lineitem"] = [
+ ("L_ORDERKEY", pyarrow.int32()),
+ ("L_PARTKEY", pyarrow.int32()),
+ ("L_SUPPKEY", pyarrow.int32()),
+ ("L_LINENUMBER", pyarrow.int32()),
+ ("L_QUANTITY", pyarrow.float32()),
+ ("L_EXTENDEDPRICE", pyarrow.float32()),
+ ("L_DISCOUNT", pyarrow.float32()),
+ ("L_TAX", pyarrow.float32()),
+ ("L_RETURNFLAG", pyarrow.string()),
+ ("L_LINESTATUS", pyarrow.string()),
+ ("L_SHIPDATE", pyarrow.date32()),
+ ("L_COMMITDATE", pyarrow.date32()),
+ ("L_RECEIPTDATE", pyarrow.date32()),
+ ("L_SHIPINSTRUCT", pyarrow.string()),
+ ("L_SHIPMODE", pyarrow.string()),
+ ("L_COMMENT", pyarrow.string()),
+]
+
+all_schemas["nation"] = [
+ ("N_NATIONKEY", pyarrow.int32()),
+ ("N_NAME", pyarrow.string()),
+ ("N_REGIONKEY", pyarrow.int32()),
+ ("N_COMMENT", pyarrow.string()),
+]
+
+all_schemas["orders"] = [
+ ("O_ORDERKEY", pyarrow.int32()),
+ ("O_CUSTKEY", pyarrow.int32()),
+ ("O_ORDERSTATUS", pyarrow.string()),
+ ("O_TOTALPRICE", pyarrow.float32()),
+ ("O_ORDERDATE", pyarrow.date32()),
+ ("O_ORDERPRIORITY", pyarrow.string()),
+ ("O_CLERK", pyarrow.string()),
+ ("O_SHIPPRIORITY", pyarrow.int32()),
+ ("O_COMMENT", pyarrow.string()),
+]
+
+all_schemas["part"] = [
+ ("P_PARTKEY", pyarrow.int32()),
+ ("P_NAME", pyarrow.string()),
+ ("P_MFGR", pyarrow.string()),
+ ("P_BRAND", pyarrow.string()),
+ ("P_TYPE", pyarrow.string()),
+ ("P_SIZE", pyarrow.int32()),
+ ("P_CONTAINER", pyarrow.string()),
+ ("P_RETAILPRICE", pyarrow.float32()),
+ ("P_COMMENT", pyarrow.string()),
+]
+
+all_schemas["partsupp"] = [
+ ("PS_PARTKEY", pyarrow.int32()),
+ ("PS_SUPPKEY", pyarrow.int32()),
+ ("PS_AVAILQTY", pyarrow.int32()),
+ ("PS_SUPPLYCOST", pyarrow.float32()),
+ ("PS_COMMENT", pyarrow.string()),
+]
+
+all_schemas["region"] = [
+ ("r_REGIONKEY", pyarrow.int32()),
+ ("r_NAME", pyarrow.string()),
+ ("r_COMMENT", pyarrow.string()),
+]
+
+all_schemas["supplier"] = [
+ ("S_SUPPKEY", pyarrow.int32()),
+ ("S_NAME", pyarrow.string()),
+ ("S_ADDRESS", pyarrow.string()),
+ ("S_NATIONKEY", pyarrow.int32()),
+ ("S_PHONE", pyarrow.string()),
+ ("S_ACCTBAL", pyarrow.float32()),
+ ("S_COMMENT", pyarrow.string()),
+]
+
+curr_dir = os.path.dirname(os.path.abspath(__file__))
+for filename, curr_schema in all_schemas.items():
+
+ # For convenience, go ahead and convert the schema column names to
lowercase
+ curr_schema = [(s[0].lower(), s[1]) for s in curr_schema]
+
+ # Pre-collect the output columns so we can ignore the null field we add
+ # in to handle the trailing | in the file
+ output_cols = [r[0] for r in curr_schema]
+
+ # Trailing | requires extra field for in processing
+ curr_schema.append(("some_null", pyarrow.null()))
+
+ schema = pyarrow.schema(curr_schema)
+
+ source_file = os.path.abspath(
+ os.path.join(curr_dir, f"../../benchmarks/tpch/data/{filename}.csv")
+ )
+ dest_file = os.path.abspath(os.path.join(curr_dir,
f"./data/{filename}.parquet"))
+
+ df = ctx.read_csv(source_file, schema=schema, has_header=False,
delimiter="|")
+
+ df = df.select_columns(*output_cols)
+
+ df.write_parquet(dest_file, compression="snappy")
diff --git a/examples/tpch/q01_pricing_summary_report.py
b/examples/tpch/q01_pricing_summary_report.py
new file mode 100644
index 0000000..1aafcca
--- /dev/null
+++ b/examples/tpch/q01_pricing_summary_report.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 1:
+
+The Pricing Summary Report Query provides a summary pricing report for all
lineitems shipped as of
+a given date. The date is within 60 - 120 days of the greatest ship date
contained in the database.
+The query lists totals for extended price, discounted extended price,
discounted extended price
+plus tax, average quantity, average extended price, and average discount.
These aggregates are
+grouped by RETURNFLAG and LINESTATUS, and listed in ascending order of
RETURNFLAG and LINESTATUS.
+A count of the number of lineitems in each group is included.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+ctx = SessionContext()
+
+df = ctx.read_parquet("data/lineitem.parquet")
+
+# It may be that the date can be hard coded, based on examples shown.
+# This approach will work with any date range in the provided data set.
+
+greatest_ship_date = df.aggregate(
+ [], [F.max(col("l_shipdate")).alias("shipdate")]
+).collect()[0]["shipdate"][0]
+
+# From the given problem, this is how close to the last date in the database we
+# want to report results for. It should be between 60-120 days before the end.
+DAYS_BEFORE_FINAL = 68
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, DAYS_BEFORE_FINAL),
type=pa.month_day_nano_interval())
+
+print("Final date in database:", greatest_ship_date)
+
+# Filter data to the dates of interest
+df = df.filter(col("l_shipdate") <= lit(greatest_ship_date) - lit(interval))
+
+# Aggregate the results
+
+df = df.aggregate(
+ [col("l_returnflag"), col("l_linestatus")],
+ [
+ F.sum(col("l_quantity")).alias("sum_qty"),
+ F.sum(col("l_extendedprice")).alias("sum_base_price"),
+ F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias(
+ "sum_disc_price"
+ ),
+ F.sum(
+ col("l_extendedprice")
+ * (lit(1.0) - col("l_discount"))
+ * (lit(1.0) + col("l_tax"))
+ ).alias("sum_charge"),
+ F.avg(col("l_quantity")).alias("avg_qty"),
+ F.avg(col("l_extendedprice")).alias("avg_price"),
+ F.avg(col("l_discount")).alias("avg_disc"),
+ F.count(col("l_returnflag")).alias(
+ "count_order"
+ ), # Counting any column should return same result
+ ],
+)
+
+# Sort per the expected result
+
+df = df.sort(col("l_returnflag").sort(), col("l_linestatus").sort())
+
+# Note: There appears to be a discrepancy between what is returned here and
what is in the generated
+# answers file for the case of return flag N and line status O, but I did not
investigate further.
+
+df.show()
diff --git a/examples/tpch/q02_minimum_cost_supplier.py
b/examples/tpch/q02_minimum_cost_supplier.py
new file mode 100644
index 0000000..262e2cf
--- /dev/null
+++ b/examples/tpch/q02_minimum_cost_supplier.py
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 2:
+
+The Minimum Cost Supplier Query finds, in a given region, for each part of a
certain type and size,
+the supplier who can supply it at minimum cost. If several suppliers in that
region offer the
+desired part type and size at the same (minimum) cost, the query lists the
parts from suppliers with
+the 100 highest account balances. For each supplier, the query lists the
supplier's account balance,
+name and nation; the part's number and manufacturer; the supplier's address,
phone number and
+comment information.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import datafusion
+from datafusion import SessionContext, col, lit, functions as F
+
+# This is the part we're looking for
+SIZE_OF_INTEREST = 15
+TYPE_OF_INTEREST = "BRASS"
+REGION_OF_INTEREST = "EUROPE"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns(
+ "p_partkey", "p_mfgr", "p_type", "p_size"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_acctbal",
+ "s_name",
+ "s_address",
+ "s_phone",
+ "s_comment",
+ "s_nationkey",
+ "s_suppkey",
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+ "ps_partkey", "ps_suppkey", "ps_supplycost"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+ "n_nationkey", "n_regionkey", "n_name"
+)
+df_region = ctx.read_parquet("data/region.parquet").select_columns(
+ "r_regionkey", "r_name"
+)
+
+# Filter down parts. Part names contain the type of interest, so we can use
strpos to find where
+# in the p_type column the word is. `strpos` will return 0 if not found,
otherwise the position
+# in the string where it is located.
+
+df_part = df_part.filter(
+ F.strpos(col("p_type"), lit(TYPE_OF_INTEREST)) > lit(0)
+).filter(col("p_size") == lit(SIZE_OF_INTEREST))
+
+# Filter regions down to the one of interest
+
+df_region = df_region.filter(col("r_name") == lit(REGION_OF_INTEREST))
+
+# Now that we have the region, find suppliers in that region. Suppliers are
tied to their nation
+# and nations are tied to the region.
+
+df_nation = df_nation.join(df_region, (["n_regionkey"], ["r_regionkey"]),
how="inner")
+df_supplier = df_supplier.join(
+ df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner"
+)
+
+# Now that we know who the potential suppliers are for the part, we can limit
out part
+# supplies table down. We can further join down to the specific parts we've
identified
+# as matching the request
+
+df = df_partsupp.join(df_supplier, (["ps_suppkey"], ["s_suppkey"]),
how="inner")
+
+# Locate the minimum cost across all suppliers. There are multiple ways you
could do this,
+# but one way is to create a window function across all suppliers, find the
minimum, and
+# create a column of that value. We can then filter down any rows for which
the cost and
+# minimum do not match.
+
+# The default window frame as of 5/6/2024 is from unbounded preceeding to the
current row.
+# We want to evaluate the entire data frame, so we specify this.
+window_frame = datafusion.WindowFrame("rows", None, None)
+df = df.with_column(
+ "min_cost",
+ F.window(
+ "min",
+ [col("ps_supplycost")],
+ partition_by=[col("ps_partkey")],
+ window_frame=window_frame,
+ ),
+)
+
+df = df.filter(col("min_cost") == col("ps_supplycost"))
+
+df = df.join(df_part, (["ps_partkey"], ["p_partkey"]), how="inner")
+
+# From the problem statement, these are the values we wish to output
+
+df = df.select_columns(
+ "s_acctbal",
+ "s_name",
+ "n_name",
+ "p_partkey",
+ "p_mfgr",
+ "s_address",
+ "s_phone",
+ "s_comment",
+)
+
+# Sort and display 100 entries
+df = df.sort(
+ col("s_acctbal").sort(ascending=False),
+ col("n_name").sort(),
+ col("s_name").sort(),
+ col("p_partkey").sort(),
+)
+
+df = df.limit(100)
+
+# Show results
+
+df.show()
diff --git a/examples/tpch/q03_shipping_priority.py
b/examples/tpch/q03_shipping_priority.py
new file mode 100644
index 0000000..78993e9
--- /dev/null
+++ b/examples/tpch/q03_shipping_priority.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 3:
+
+The Shipping Priority Query retrieves the shipping priority and potential
revenue, defined as the
+sum of l_extendedprice * (1-l_discount), of the orders having the largest
revenue among those that
+had not been shipped as of a given date. Orders are listed in decreasing order
of revenue. If more
+than 10 unshipped orders exist, only the 10 orders with the largest revenue
are listed.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, col, lit, functions as F
+
+SEGMENT_OF_INTEREST = "BUILDING"
+DATE_OF_INTEREST = "1995-03-15"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+ "c_mktsegment", "c_custkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_orderdate", "o_shippriority", "o_custkey", "o_orderkey"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_orderkey", "l_extendedprice", "l_discount", "l_shipdate"
+)
+
+# Limit dataframes to the rows of interest
+
+df_customer = df_customer.filter(col("c_mktsegment") ==
lit(SEGMENT_OF_INTEREST))
+df_orders = df_orders.filter(col("o_orderdate") < lit(DATE_OF_INTEREST))
+df_lineitem = df_lineitem.filter(col("l_shipdate") > lit(DATE_OF_INTEREST))
+
+# Join all 3 dataframes
+
+df = df_customer.join(df_orders, (["c_custkey"], ["o_custkey"]),
how="inner").join(
+ df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner"
+)
+
+# Compute the revenue
+
+df = df.aggregate(
+ [col("l_orderkey")],
+ [
+ F.first_value(col("o_orderdate")).alias("o_orderdate"),
+ F.first_value(col("o_shippriority")).alias("o_shippriority"),
+ F.sum(col("l_extendedprice") * (lit(1.0) -
col("l_discount"))).alias("revenue"),
+ ],
+)
+
+# Sort by priority
+
+df = df.sort(col("revenue").sort(ascending=False), col("o_orderdate").sort())
+
+# Only return 100 results
+
+df = df.limit(100)
+
+# Change the order that the columns are reported in just to match the spec
+
+df = df.select_columns("l_orderkey", "revenue", "o_orderdate",
"o_shippriority")
+
+# Show result
+
+df.show()
diff --git a/examples/tpch/q04_order_priority_checking.py
b/examples/tpch/q04_order_priority_checking.py
new file mode 100644
index 0000000..b691d5b
--- /dev/null
+++ b/examples/tpch/q04_order_priority_checking.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 4:
+
+The Order Priority Checking Query counts the number of orders ordered in a
given quarter of a given
+year in which at least one lineitem was received by the customer later than
its committed date. The
+query lists the count of such orders for each order priority sorted in
ascending priority order.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+# Ideally we could put 3 months into the interval. See note below.
+INTERVAL_DAYS = 92
+DATE_OF_INTEREST = "1993-07-01"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_orderdate", "o_orderpriority", "o_orderkey"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_orderkey", "l_commitdate", "l_receiptdate"
+)
+
+# Create a date object from the string
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval())
+
+# Limit results to cases where commitment date before receipt date
+# Aggregate the results so we only get one row to join with the order table.
+# Alterately, and likely more idomatic is instead of `.aggregate` you could
+# do `.select_columns("l_orderkey").distinct()`. The goal here is to show
+# mulitple examples of how to use Data Fusion.
+df_lineitem = df_lineitem.filter(col("l_commitdate") <
col("l_receiptdate")).aggregate(
+ [col("l_orderkey")], []
+)
+
+# Limit orders to date range of interest
+df_orders = df_orders.filter(col("o_orderdate") >= lit(date)).filter(
+ col("o_orderdate") < lit(date) + lit(interval)
+)
+
+# Perform the join to find only orders for which there are lineitems outside
of expected range
+df = df_orders.join(df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner")
+
+# Based on priority, find the number of entries
+df = df.aggregate(
+ [col("o_orderpriority")],
[F.count(col("o_orderpriority")).alias("order_count")]
+)
+
+# Sort the results
+df = df.sort(col("o_orderpriority").sort())
+
+df.show()
diff --git a/examples/tpch/q05_local_supplier_volume.py
b/examples/tpch/q05_local_supplier_volume.py
new file mode 100644
index 0000000..7cb6e63
--- /dev/null
+++ b/examples/tpch/q05_local_supplier_volume.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 5:
+
+The Local Supplier Volume Query lists for each nation in a region the revenue
volume that resulted
+from lineitem transactions in which the customer ordering parts and the
supplier filling them were
+both within that nation. The query is run in order to determine whether to
institute local
+distribution centers in a given region. The query considers only parts ordered
in a given year. The
+query displays the nations and revenue volume in descending order by revenue.
Revenue volume for all
+qualifying lineitems in a particular nation is defined as sum(l_extendedprice
* (1 - l_discount)).
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+
+DATE_OF_INTEREST = "1994-01-01"
+INTERVAL_DAYS = 365
+REGION_OF_INTEREST = "ASIA"
+
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval())
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+ "c_custkey", "c_nationkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_custkey", "o_orderkey", "o_orderdate"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_suppkey", "s_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+ "n_nationkey", "n_regionkey", "n_name"
+)
+df_region = ctx.read_parquet("data/region.parquet").select_columns(
+ "r_regionkey", "r_name"
+)
+
+# Restrict dataframes to cases of interest
+df_orders = df_orders.filter(col("o_orderdate") >= lit(date)).filter(
+ col("o_orderdate") < lit(date) + lit(interval)
+)
+
+df_region = df_region.filter(col("r_name") == lit(REGION_OF_INTEREST))
+
+# Join all the dataframes
+
+df = (
+ df_customer.join(df_orders, (["c_custkey"], ["o_custkey"]), how="inner")
+ .join(df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner")
+ .join(
+ df_supplier,
+ (["l_suppkey", "c_nationkey"], ["s_suppkey", "s_nationkey"]),
+ how="inner",
+ )
+ .join(df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner")
+ .join(df_region, (["n_regionkey"], ["r_regionkey"]), how="inner")
+)
+
+# Compute the final result
+
+df = df.aggregate(
+ [col("n_name")],
+ [F.sum(col("l_extendedprice") * (lit(1.0) -
col("l_discount"))).alias("revenue")],
+)
+
+# Sort in descending order
+
+df = df.sort(col("revenue").sort(ascending=False))
+
+df.show()
diff --git a/examples/tpch/q06_forecasting_revenue_change.py
b/examples/tpch/q06_forecasting_revenue_change.py
new file mode 100644
index 0000000..5fbb917
--- /dev/null
+++ b/examples/tpch/q06_forecasting_revenue_change.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 6:
+
+The Forecasting Revenue Change Query considers all the lineitems shipped in a
given year with
+discounts between DISCOUNT-0.01 and DISCOUNT+0.01. The query lists the amount
by which the total
+revenue would have increased if these discounts had been eliminated for
lineitems with l_quantity
+less than quantity. Note that the potential revenue increase is equal to the
sum of
+[l_extendedprice * l_discount] for all lineitems with discounts and quantities
in the qualifying
+range.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+# Variables from the example query
+
+DATE_OF_INTEREST = "1994-01-01"
+DISCOUT = 0.06
+DELTA = 0.01
+QUANTITY = 24
+
+INTERVAL_DAYS = 365
+
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval())
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_shipdate", "l_quantity", "l_extendedprice", "l_discount"
+)
+
+# Filter down to lineitems of interest
+
+df = (
+ df_lineitem.filter(col("l_shipdate") >= lit(date))
+ .filter(col("l_shipdate") < lit(date) + lit(interval))
+ .filter(col("l_discount") >= lit(DISCOUT) - lit(DELTA))
+ .filter(col("l_discount") <= lit(DISCOUT) + lit(DELTA))
+ .filter(col("l_quantity") < lit(QUANTITY))
+)
+
+# Add up all the "lost" revenue
+
+df = df.aggregate(
+ [], [F.sum(col("l_extendedprice") * col("l_discount")).alias("revenue")]
+)
+
+# Show the single result. We could do a `show()` but since we want to
demonstrate features of how
+# to use Data Fusion, instead collect the result as a python object and print
it out.
+
+# collect() should give a list of record batches. This is a small query, so we
should get a
+# single batch back, hence the index [0]. Within each record batch we only
care about the
+# single column result `revenue`. Since we have only one row returned because
we aggregated
+# over the entire dataframe, we can index it at 0. Then convert the
DoubleScalar into a
+# simple python object.
+
+revenue = df.collect()[0]["revenue"][0].as_py()
+
+# Note: the output value from this query may be dependant on the size of the
database generated
+print(f"Potential lost revenue: {revenue:.2f}")
diff --git a/examples/tpch/q07_volume_shipping.py
b/examples/tpch/q07_volume_shipping.py
new file mode 100644
index 0000000..3c87f93
--- /dev/null
+++ b/examples/tpch/q07_volume_shipping.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 7:
+
+The Volume Shipping Query finds, for two given nations, the gross discounted
revenues derived from
+lineitems in which parts were shipped from a supplier in either nation to a
customer in the other
+nation during 1995 and 1996. The query lists the supplier nation, the customer
nation, the year,
+and the revenue from shipments that took place in that year. The query orders
the answer by
+Supplier nation, Customer nation, and year (all ascending).
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+# Variables of interest to query over
+
+nation_1 = lit("FRANCE")
+nation_2 = lit("GERMANY")
+
+START_DATE = "1995-01-01"
+END_DATE = "1996-12-31"
+
+start_date = lit(datetime.strptime(START_DATE, "%Y-%m-%d").date())
+end_date = lit(datetime.strptime(END_DATE, "%Y-%m-%d").date())
+
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_suppkey", "s_nationkey"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_shipdate", "l_extendedprice", "l_discount", "l_suppkey", "l_orderkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_orderkey", "o_custkey"
+)
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+ "c_custkey", "c_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+ "n_nationkey", "n_name"
+)
+
+
+# Filter to time of interest
+df_lineitem = df_lineitem.filter(col("l_shipdate") >= start_date).filter(
+ col("l_shipdate") <= end_date
+)
+
+
+# A simpler way to do the following operation is to use a filter, but we also
want to demonstrate
+# how to use case statements. Here we are assigning `n_name` to be itself when
it is either of
+# the two nations of interest. Since there is no `otherwise()` statement, any
values that do
+# not match these will result in a null value and then get filtered out.
+#
+# To do the same using a simle filter would be:
+# df_nation = df_nation.filter((F.col("n_name") == nation_1) |
(F.col("n_name") == nation_2))
+df_nation = df_nation.with_column(
+ "n_name",
+ F.case(col("n_name"))
+ .when(nation_1, col("n_name"))
+ .when(nation_2, col("n_name"))
+ .end(),
+).filter(~col("n_name").is_null())
+
+
+# Limit suppliers to either nation
+df_supplier = df_supplier.join(
+ df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner"
+).select(col("s_suppkey"), col("n_name").alias("supp_nation"))
+
+# Limit customers to either nation
+df_customer = df_customer.join(
+ df_nation, (["c_nationkey"], ["n_nationkey"]), how="inner"
+).select(col("c_custkey"), col("n_name").alias("cust_nation"))
+
+# Join up all the data frames from line items, and make sure the supplier and
customer are in
+# different nations.
+df = (
+ df_lineitem.join(df_orders, (["l_orderkey"], ["o_orderkey"]), how="inner")
+ .join(df_customer, (["o_custkey"], ["c_custkey"]), how="inner")
+ .join(df_supplier, (["l_suppkey"], ["s_suppkey"]), how="inner")
+ .filter(col("cust_nation") != col("supp_nation"))
+)
+
+# Extract out two values for every line item
+df = df.with_column(
+ "l_year", F.datepart(lit("year"), col("l_shipdate")).cast(pa.int32())
+).with_column("volume", col("l_extendedprice") * (lit(1.0) -
col("l_discount")))
+
+# Aggregate the results
+df = df.aggregate(
+ [col("supp_nation"), col("cust_nation"), col("l_year")],
+ [F.sum(col("volume")).alias("revenue")],
+)
+
+# Sort based on problem statement requirements
+df = df.sort(col("supp_nation").sort(), col("cust_nation").sort(),
col("l_year").sort())
+
+df.show()
diff --git a/examples/tpch/q08_market_share.py
b/examples/tpch/q08_market_share.py
new file mode 100644
index 0000000..a415156
--- /dev/null
+++ b/examples/tpch/q08_market_share.py
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 8:
+
+The market share for a given nation within a given region is defined as the
fraction of the
+revenue, the sum of [l_extendedprice * (1-l_discount)], from the products of a
specified type in
+that region that was supplied by suppliers from the given nation. The query
determines this for the
+years 1995 and 1996 presented in this order.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+supplier_nation = lit("BRAZIL")
+customer_region = lit("AMERICA")
+part_of_interest = lit("ECONOMY ANODIZED STEEL")
+
+START_DATE = "1995-01-01"
+END_DATE = "1996-12-31"
+
+start_date = lit(datetime.strptime(START_DATE, "%Y-%m-%d").date())
+end_date = lit(datetime.strptime(END_DATE, "%Y-%m-%d").date())
+
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey",
"p_type")
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_suppkey", "s_nationkey"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_partkey", "l_extendedprice", "l_discount", "l_suppkey", "l_orderkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_orderkey", "o_custkey", "o_orderdate"
+)
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+ "c_custkey", "c_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+ "n_nationkey", "n_name", "n_regionkey"
+)
+df_region = ctx.read_parquet("data/region.parquet").select_columns(
+ "r_regionkey", "r_name"
+)
+
+# Limit possible parts to the one specified
+df_part = df_part.filter(col("p_type") == part_of_interest)
+
+# Limit orders to those in the specified range
+
+df_orders = df_orders.filter(col("o_orderdate") >= start_date).filter(
+ col("o_orderdate") <= end_date
+)
+
+# Part 1: Find customers in the region
+
+# We want customers in region specified by region_of_interest. This will be
used to compute
+# the total sales of the part of interest. We want to know of those sales what
fraction
+# was supplied by the nation of interest. There is no guarantee that the
nation of
+# interest is within the region of interest.
+
+# First we find all the sales that make up the basis.
+
+df_regional_customers = df_region.filter(col("r_name") == customer_region)
+
+# After this join we have all of the possible sales nations
+df_regional_customers = df_regional_customers.join(
+ df_nation, (["r_regionkey"], ["n_regionkey"]), how="inner"
+)
+
+# Now find the possible customers
+df_regional_customers = df_regional_customers.join(
+ df_customer, (["n_nationkey"], ["c_nationkey"]), how="inner"
+)
+
+# Next find orders for these customers
+df_regional_customers = df_regional_customers.join(
+ df_orders, (["c_custkey"], ["o_custkey"]), how="inner"
+)
+
+# Find all line items from these orders
+df_regional_customers = df_regional_customers.join(
+ df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner"
+)
+
+# Limit to the part of interest
+df_regional_customers = df_regional_customers.join(
+ df_part, (["l_partkey"], ["p_partkey"]), how="inner"
+)
+
+# Compute the volume for each line item
+df_regional_customers = df_regional_customers.with_column(
+ "volume", col("l_extendedprice") * (lit(1.0) - col("l_discount"))
+)
+
+# Part 2: Find suppliers from the nation
+
+# Now that we have all of the sales of that part in the specified region, we
need
+# to determine which of those came from suppliers in the nation we are
interested in.
+
+df_national_suppliers = df_nation.filter(col("n_name") == supplier_nation)
+
+# Determine the suppliers by the limited nation key we have in our single row
df above
+df_national_suppliers = df_national_suppliers.join(
+ df_supplier, (["n_nationkey"], ["s_nationkey"]), how="inner"
+)
+
+# When we join to the customer dataframe, we don't want to confuse other
columns, so only
+# select the supplier key that we need
+df_national_suppliers = df_national_suppliers.select_columns("s_suppkey")
+
+
+# Part 3: Combine suppliers and customers and compute the market share
+
+# Now we can do a left outer join on the suppkey. Those line items from other
suppliers
+# will get a null value. We can check for the existence of this null to
compute a volume
+# column only from suppliers in the nation we are evaluating.
+
+df = df_regional_customers.join(
+ df_national_suppliers, (["l_suppkey"], ["s_suppkey"]), how="left"
+)
+
+# Use a case statement to compute the volume sold by suppliers in the nation
of interest
+df = df.with_column(
+ "national_volume",
+ F.case(col("s_suppkey").is_null())
+ .when(lit(False), col("volume"))
+ .otherwise(lit(0.0)),
+)
+
+df = df.with_column(
+ "o_year", F.datepart(lit("year"), col("o_orderdate")).cast(pa.int32())
+)
+
+
+# Lastly, sum up the results
+
+df = df.aggregate(
+ [col("o_year")],
+ [
+ F.sum(col("volume")).alias("volume"),
+ F.sum(col("national_volume")).alias("national_volume"),
+ ],
+)
+
+df = df.select(
+ col("o_year"), (F.col("national_volume") /
F.col("volume")).alias("mkt_share")
+)
+
+df = df.sort(col("o_year").sort())
+
+df.show()
diff --git a/examples/tpch/q09_product_type_profit_measure.py
b/examples/tpch/q09_product_type_profit_measure.py
new file mode 100644
index 0000000..4fdfc1c
--- /dev/null
+++ b/examples/tpch/q09_product_type_profit_measure.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 9:
+
+The Product Type Profit Measure Query finds, for each nation and each year,
the profit for all parts
+ordered in that year that contain a specified substring in their names and
that were filled by a
+supplier in that nation. The profit is defined as the sum of
+[(l_extendedprice*(1-l_discount)) - (ps_supplycost * l_quantity)] for all
lineitems describing
+parts in the specified line. The query lists the nations in ascending
alphabetical order and, for
+each nation, the year and profit in descending order by year (most recent
first).
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+part_color = lit("green")
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey",
"p_name")
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_suppkey", "s_nationkey"
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+ "ps_suppkey", "ps_partkey", "ps_supplycost"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_partkey",
+ "l_extendedprice",
+ "l_discount",
+ "l_suppkey",
+ "l_orderkey",
+ "l_quantity",
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_orderkey", "o_custkey", "o_orderdate"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+ "n_nationkey", "n_name", "n_regionkey"
+)
+
+# Limit possible parts to the color specified
+df = df_part.filter(F.strpos(col("p_name"), part_color) > lit(0))
+
+# We have a series of joins that get us to limit down to the line items we need
+df = df.join(df_lineitem, (["p_partkey"], ["l_partkey"]), how="inner")
+df = df.join(df_supplier, (["l_suppkey"], ["s_suppkey"]), how="inner")
+df = df.join(df_orders, (["l_orderkey"], ["o_orderkey"]), how="inner")
+df = df.join(
+ df_partsupp, (["l_suppkey", "l_partkey"], ["ps_suppkey", "ps_partkey"]),
how="inner"
+)
+df = df.join(df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner")
+
+# Compute the intermediate values and limit down to the expressions we need
+df = df.select(
+ col("n_name").alias("nation"),
+ F.datepart(lit("year"),
col("o_orderdate")).cast(pa.int32()).alias("o_year"),
+ (
+ col("l_extendedprice") * (lit(1.0) - col("l_discount"))
+ - (col("ps_supplycost") * col("l_quantity"))
+ ).alias("amount"),
+)
+
+# Sum up the values by nation and year
+df = df.aggregate(
+ [col("nation"), col("o_year")], [F.sum(col("amount")).alias("profit")]
+)
+
+# Sort according to the problem specification
+df = df.sort(col("nation").sort(), col("o_year").sort(ascending=False))
+
+df.show()
diff --git a/examples/tpch/q10_returned_item_reporting.py
b/examples/tpch/q10_returned_item_reporting.py
new file mode 100644
index 0000000..1879027
--- /dev/null
+++ b/examples/tpch/q10_returned_item_reporting.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 10:
+
+The Returned Item Reporting Query finds the top 20 customers, in terms of
their effect on lost
+revenue for a given quarter, who have returned parts. The query considers only
parts that were
+ordered in the specified quarter. The query lists the customer's name,
address, nation, phone
+number, account balance, comment information and revenue lost. The customers
are listed in
+descending order of lost revenue. Revenue lost is defined as
+sum(l_extendedprice*(1-l_discount)) for all qualifying lineitems.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+DATE_START_OF_QUARTER = "1993-10-01"
+
+date_start_of_quarter = lit(datetime.strptime(DATE_START_OF_QUARTER,
"%Y-%m-%d").date())
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval_one_quarter = lit(pa.scalar((0, 0, 120),
type=pa.month_day_nano_interval()))
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+ "c_custkey",
+ "c_nationkey",
+ "c_name",
+ "c_acctbal",
+ "c_address",
+ "c_phone",
+ "c_comment",
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_extendedprice", "l_discount", "l_orderkey", "l_returnflag"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_orderkey", "o_custkey", "o_orderdate"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+ "n_nationkey", "n_name", "n_regionkey"
+)
+
+# limit to returns
+df_lineitem = df_lineitem.filter(col("l_returnflag") == lit("R"))
+
+
+# Rather than aggregate by all of the customer fields as you might do looking
at the specification,
+# we can aggregate by o_custkey and then join in the customer data at the end.
+
+df = df_orders.filter(col("o_orderdate") >= date_start_of_quarter).filter(
+ col("o_orderdate") < date_start_of_quarter + interval_one_quarter
+)
+
+df = df.join(df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner")
+
+# Compute the revenue
+df = df.aggregate(
+ [col("o_custkey")],
+ [F.sum(col("l_extendedprice") * (lit(1.0) -
col("l_discount"))).alias("revenue")],
+)
+
+# Now join in the customer data
+df = df.join(df_customer, (["o_custkey"], ["c_custkey"]), how="inner")
+df = df.join(df_nation, (["c_nationkey"], ["n_nationkey"]), how="inner")
+
+# These are the columns the problem statement requires
+df = df.select_columns(
+ "c_custkey",
+ "c_name",
+ "revenue",
+ "c_acctbal",
+ "n_name",
+ "c_address",
+ "c_phone",
+ "c_comment",
+)
+
+# Sort the results in descending order
+df = df.sort(col("revenue").sort(ascending=False))
+
+# Only return the top 20 results
+df = df.limit(20)
+
+df.show()
diff --git a/examples/tpch/q11_important_stock_identification.py
b/examples/tpch/q11_important_stock_identification.py
new file mode 100644
index 0000000..78fe26d
--- /dev/null
+++ b/examples/tpch/q11_important_stock_identification.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 11:
+
+The Important Stock Identification Query finds, from scanning the available
stock of suppliers
+in a given nation, all the parts that represent a significant percentage of
the total value of
+all available parts. The query displays the part number and the value of those
parts in
+descending order of value.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, WindowFrame, col, lit, functions as F
+
+NATION = "GERMANY"
+FRACTION = 0.0001
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_suppkey", "s_nationkey"
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+ "ps_supplycost", "ps_availqty", "ps_suppkey", "ps_partkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+ "n_nationkey", "n_name"
+)
+
+# limit to returns
+df_nation = df_nation.filter(col("n_name") == lit(NATION))
+
+# Find part supplies of within this target nation
+
+df = df_nation.join(df_supplier, (["n_nationkey"], ["s_nationkey"]),
how="inner")
+
+df = df.join(df_partsupp, (["s_suppkey"], ["ps_suppkey"]), how="inner")
+
+
+# Compute the value of individual parts
+df = df.with_column("value", col("ps_supplycost") * col("ps_availqty"))
+
+# Compute total value of specific parts
+df = df.aggregate([col("ps_partkey")], [F.sum(col("value")).alias("value")])
+
+# By default window functions go from unbounded preceeding to current row, but
we want
+# to compute this sum across all rows
+window_frame = WindowFrame("rows", None, None)
+
+df = df.with_column(
+ "total_value", F.window("sum", [col("value")], window_frame=window_frame)
+)
+
+# Limit to the parts for which there is a significant value based on the
fraction of the total
+df = df.filter(col("value") / col("total_value") > lit(FRACTION))
+
+# We only need to report on these two columns
+df = df.select_columns("ps_partkey", "value")
+
+# Sort in descending order of value
+df = df.sort(col("value").sort(ascending=False))
+
+df.show()
diff --git a/examples/tpch/q12_ship_mode_order_priority.py
b/examples/tpch/q12_ship_mode_order_priority.py
new file mode 100644
index 0000000..e76efa5
--- /dev/null
+++ b/examples/tpch/q12_ship_mode_order_priority.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 12:
+
+The Shipping Modes and Order Priority Query counts, by ship mode, for
lineitems actually received
+by customers in a given year, the number of lineitems belonging to orders for
which the
+l_receiptdate exceeds the l_commitdate for two different specified ship modes.
Only lineitems that
+were actually shipped before the l_commitdate are considered. The late
lineitems are partitioned
+into two groups, those with priority URGENT or HIGH, and those with a priority
other than URGENT or
+HIGH.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+SHIP_MODE_1 = "MAIL"
+SHIP_MODE_2 = "SHIP"
+DATE_OF_INTEREST = "1994-01-01"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_orderkey", "o_orderpriority"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_orderkey", "l_shipmode", "l_commitdate", "l_shipdate", "l_receiptdate"
+)
+
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, 365), type=pa.month_day_nano_interval())
+
+
+df = df_lineitem.filter(col("l_receiptdate") >= lit(date)).filter(
+ col("l_receiptdate") < lit(date) + lit(interval)
+)
+
+# Note: It is not recommended to use array_has because it treats the second
argument as an argument
+# so if you pass it col("l_shipmode") it will pass the entire array to process
which is very slow.
+# Instead check the position of the entry is not null.
+df = df.filter(
+ ~F.array_position(
+ F.make_array(lit(SHIP_MODE_1), lit(SHIP_MODE_2)), col("l_shipmode")
+ ).is_null()
+)
+
+# Since we have only two values, it's much easier to do this as a filter where
the l_shipmode
+# matches either of the two values, but we want to show doing some array
operations in this
+# example. If you want to see this done with filters, comment out the above
line and uncomment
+# this one.
+# df = df.filter((col("l_shipmode") == lit(SHIP_MODE_1)) | (col("l_shipmode")
== lit(SHIP_MODE_2)))
+
+
+# We need order priority, so join order df to line item
+df = df.join(df_orders, (["l_orderkey"], ["o_orderkey"]), how="inner")
+
+# Restrict to line items we care about based on the problem statement.
+df = df.filter(col("l_commitdate") < col("l_receiptdate"))
+
+df = df.filter(col("l_shipdate") < col("l_commitdate"))
+
+df = df.with_column(
+ "high_line_value",
+ F.case(col("o_orderpriority"))
+ .when(lit("1-URGENT"), lit(1))
+ .when(lit("2-HIGH"), lit(1))
+ .otherwise(lit(0)),
+)
+
+# Aggregate the results
+df = df.aggregate(
+ [col("l_shipmode")],
+ [
+ F.sum(col("high_line_value")).alias("high_line_count"),
+ F.count(col("high_line_value")).alias("all_lines_count"),
+ ],
+)
+
+# Compute the final output
+df = df.select(
+ col("l_shipmode"),
+ col("high_line_count"),
+ (col("all_lines_count") - col("high_line_count")).alias("low_line_count"),
+)
+
+df = df.sort(col("l_shipmode").sort())
+
+df.show()
diff --git a/examples/tpch/q13_customer_distribution.py
b/examples/tpch/q13_customer_distribution.py
new file mode 100644
index 0000000..1eb9ca3
--- /dev/null
+++ b/examples/tpch/q13_customer_distribution.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 13:
+
+This query determines the distribution of customers by the number of orders
they have made,
+including customers who have no record of orders, past or present. It counts
and reports how many
+customers have no orders, how many have 1, 2, 3, etc. A check is made to
ensure that the orders
+counted do not fall into one of several special categories of orders. Special
categories are
+identified in the order comment column by looking for a particular pattern.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, col, lit, functions as F
+
+WORD_1 = "special"
+WORD_2 = "requests"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_custkey", "o_comment"
+)
+df_customer =
ctx.read_parquet("data/customer.parquet").select_columns("c_custkey")
+
+# Use a regex to remove special cases
+df_orders = df_orders.filter(
+ F.regexp_match(col("o_comment"), lit(f"{WORD_1}.?*{WORD_2}")).is_null()
+)
+
+# Since we may have customers with no orders we must do a left join
+df = df_customer.join(df_orders, (["c_custkey"], ["o_custkey"]), how="left")
+
+# Find the number of orders for each customer
+df = df.aggregate([col("c_custkey")],
[F.count(col("c_custkey")).alias("c_count")])
+
+# Ultimately we want to know the number of customers that have that customer
count
+df = df.aggregate([col("c_count")],
[F.count(col("c_count")).alias("custdist")])
+
+# We want to order the results by the highest number of customers per count
+df = df.sort(
+ col("custdist").sort(ascending=False), col("c_count").sort(ascending=False)
+)
+
+df.show()
diff --git a/examples/tpch/q14_promotion_effect.py
b/examples/tpch/q14_promotion_effect.py
new file mode 100644
index 0000000..9ec3836
--- /dev/null
+++ b/examples/tpch/q14_promotion_effect.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 14:
+
+The Promotion Effect Query determines what percentage of the revenue in a
given year and month was
+derived from promotional parts. The query considers only parts actually
shipped in that month and
+gives the percentage. Revenue is defined as (l_extendedprice * (1-l_discount)).
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+DATE = "1995-09-01"
+
+date_of_interest = lit(datetime.strptime(DATE, "%Y-%m-%d").date())
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval_one_month = lit(pa.scalar((0, 0, 30),
type=pa.month_day_nano_interval()))
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_partkey", "l_shipdate", "l_extendedprice", "l_discount"
+)
+df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey",
"p_type")
+
+
+# Check part type begins with PROMO
+df_part = df_part.filter(
+ F.substr(col("p_type"), lit(0), lit(6)) == lit("PROMO")
+).with_column("promo_factor", lit(1.0))
+
+df_lineitem = df_lineitem.filter(col("l_shipdate") >= date_of_interest).filter(
+ col("l_shipdate") < date_of_interest + interval_one_month
+)
+
+# Left join so we can sum up the promo parts different from other parts
+df = df_lineitem.join(df_part, (["l_partkey"], ["p_partkey"]), "left")
+
+# Make a factor of 1.0 if it is a promotion, 0.0 otherwise
+df = df.with_column("promo_factor", F.coalesce(col("promo_factor"), lit(0.0)))
+df = df.with_column("revenue", col("l_extendedprice") * (lit(1.0) -
col("l_discount")))
+
+
+# Sum up the promo and total revenue
+df = df.aggregate(
+ [],
+ [
+ F.sum(col("promo_factor") * col("revenue")).alias("promo_revenue"),
+ F.sum(col("revenue")).alias("total_revenue"),
+ ],
+)
+
+# Return the percentage of revenue from promotions
+df = df.select(
+ (lit(100.0) * col("promo_revenue") /
col("total_revenue")).alias("promo_revenue")
+)
+
+df.show()
diff --git a/examples/tpch/q15_top_supplier.py
b/examples/tpch/q15_top_supplier.py
new file mode 100644
index 0000000..7113e04
--- /dev/null
+++ b/examples/tpch/q15_top_supplier.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 15:
+
+The Top Supplier Query finds the supplier who contributed the most to the
overall revenue for parts
+shipped during a given quarter of a given year. In case of a tie, the query
lists all suppliers
+whose contribution was equal to the maximum, presented in supplier number
order.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, WindowFrame, col, lit, functions as F
+
+DATE = "1996-01-01"
+
+date_of_interest = lit(datetime.strptime(DATE, "%Y-%m-%d").date())
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval_3_months = lit(pa.scalar((0, 0, 90),
type=pa.month_day_nano_interval()))
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_suppkey", "l_shipdate", "l_extendedprice", "l_discount"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_suppkey",
+ "s_name",
+ "s_address",
+ "s_phone",
+)
+
+# Limit line items to the quarter of interest
+df_lineitem = df_lineitem.filter(col("l_shipdate") >= date_of_interest).filter(
+ col("l_shipdate") < date_of_interest + interval_3_months
+)
+
+df = df_lineitem.aggregate(
+ [col("l_suppkey")],
+ [
+ F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias(
+ "total_revenue"
+ )
+ ],
+)
+
+# Use a window function to find the maximum revenue across the entire dataframe
+window_frame = WindowFrame("rows", None, None)
+df = df.with_column(
+ "max_revenue", F.window("max", [col("total_revenue")],
window_frame=window_frame)
+)
+
+# Find all suppliers whose total revenue is the same as the maximum
+df = df.filter(col("total_revenue") == col("max_revenue"))
+
+# Now that we know the supplier(s) with maximum revenue, get the rest of their
information
+# from the supplier table
+df = df.join(df_supplier, (["l_suppkey"], ["s_suppkey"]), "inner")
+
+# Return only the colums requested
+df = df.select_columns("s_suppkey", "s_name", "s_address", "s_phone",
"total_revenue")
+
+# If we have more than one, sort by supplier number (suppkey)
+df = df.sort(col("s_suppkey").sort())
+
+df.show()
diff --git a/examples/tpch/q16_part_supplier_relationship.py
b/examples/tpch/q16_part_supplier_relationship.py
new file mode 100644
index 0000000..5f941d5
--- /dev/null
+++ b/examples/tpch/q16_part_supplier_relationship.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 16:
+
+The Parts/Supplier Relationship Query counts the number of suppliers who can
supply parts that
+satisfy a particular customer's requirements. The customer is interested in
parts of eight
+different sizes as long as they are not of a given type, not of a given brand,
and not from a
+supplier who has had complaints registered at the Better Business Bureau.
Results must be presented
+in descending count and ascending brand, type, and size.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+BRAND = "Brand#45"
+TYPE_TO_IGNORE = "MEDIUM POLISHED"
+SIZES_OF_INTEREST = [49, 14, 23, 45, 19, 3, 36, 9]
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns(
+ "p_partkey", "p_brand", "p_type", "p_size"
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+ "ps_suppkey", "ps_partkey"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_suppkey", "s_comment"
+)
+
+df_unwanted_suppliers = df_supplier.filter(
+ ~F.regexp_match(col("s_comment"), lit("Customer.?*Complaints")).is_null()
+)
+
+# Remove unwanted suppliers
+df_partsupp = df_partsupp.join(
+ df_unwanted_suppliers, (["ps_suppkey"], ["s_suppkey"]), "anti"
+)
+
+# Select the parts we are interested in
+df_part = df_part.filter(col("p_brand") == lit(BRAND))
+df_part = df_part.filter(
+ F.substr(col("p_type"), lit(0), lit(len(TYPE_TO_IGNORE) + 1)) !=
lit(TYPE_TO_IGNORE)
+)
+
+# Python conversion of integer to literal casts it to int64 but the data for
+# part size is stored as an int32, so perform a cast. Then check to find if
the part
+# size is within the array of possible sizes by checking the position of it is
not
+# null.
+p_sizes = F.make_array(*[lit(s).cast(pa.int32()) for s in SIZES_OF_INTEREST])
+df_part = df_part.filter(~F.array_position(p_sizes, col("p_size")).is_null())
+
+df = df_part.join(df_partsupp, (["p_partkey"], ["ps_partkey"]), "inner")
+
+df = df.select_columns("p_brand", "p_type", "p_size", "ps_suppkey").distinct()
+
+df = df.aggregate(
+ [col("p_brand"), col("p_type"), col("p_size")],
+ [F.count(col("ps_suppkey")).alias("supplier_cnt")],
+)
+
+df = df.sort(col("supplier_cnt").sort(ascending=False))
+
+df.show()
diff --git a/examples/tpch/q17_small_quantity_order.py
b/examples/tpch/q17_small_quantity_order.py
new file mode 100644
index 0000000..aae238b
--- /dev/null
+++ b/examples/tpch/q17_small_quantity_order.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 17:
+
+The Small-Quantity-Order Revenue Query considers parts of a given brand and
with a given container
+type and determines the average lineitem quantity of such parts ordered for
all orders (past and
+pending) in the 7-year database. What would be the average yearly gross
(undiscounted) loss in
+revenue if orders for these parts with a quantity of less than 20% of this
average were no longer
+taken?
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, WindowFrame, col, lit, functions as F
+
+BRAND = "Brand#23"
+CONTAINER = "MED BOX"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns(
+ "p_partkey", "p_brand", "p_container"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_partkey", "l_quantity", "l_extendedprice"
+)
+
+# Limit to the problem statement's brand and container types
+df = df_part.filter(col("p_brand") == lit(BRAND)).filter(
+ col("p_container") == lit(CONTAINER)
+)
+
+# Combine data
+df = df.join(df_lineitem, (["p_partkey"], ["l_partkey"]), "inner")
+
+# Find the average quantity
+window_frame = WindowFrame("rows", None, None)
+df = df.with_column(
+ "avg_quantity", F.window("avg", [col("l_quantity")],
window_frame=window_frame)
+)
+
+df = df.filter(col("l_quantity") < lit(0.2) * col("avg_quantity"))
+
+# Compute the total
+df = df.aggregate([], [F.sum(col("l_extendedprice")).alias("total")])
+
+# Divide by number of years in the problem statement to get average
+df = df.select((col("total") / lit(7.0)).alias("avg_yearly"))
+
+df.show()
diff --git a/examples/tpch/q18_large_volume_customer.py
b/examples/tpch/q18_large_volume_customer.py
new file mode 100644
index 0000000..96ca08f
--- /dev/null
+++ b/examples/tpch/q18_large_volume_customer.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 18:
+
+The Large Volume Customer Query finds a list of the top 100 customers who have
ever placed large
+quantity orders. The query lists the customer name, customer key, the order
key, date and total
+price and the quantity for the order.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, col, lit, functions as F
+
+QUANTITY = 300
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+ "c_custkey", "c_name"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_orderkey", "o_custkey", "o_orderdate", "o_totalprice"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_orderkey", "l_quantity", "l_extendedprice"
+)
+
+df = df_lineitem.aggregate(
+ [col("l_orderkey")], [F.sum(col("l_quantity")).alias("total_quantity")]
+)
+
+# Limit to orders in which the total quantity is above a threshold
+df = df.filter(col("total_quantity") > lit(QUANTITY))
+
+# We've identified the orders of interest, now join the additional data
+# we are required to report on
+df = df.join(df_orders, (["l_orderkey"], ["o_orderkey"]), "inner")
+df = df.join(df_customer, (["o_custkey"], ["c_custkey"]), "inner")
+
+df = df.select_columns(
+ "c_name", "c_custkey", "o_orderkey", "o_orderdate", "o_totalprice",
"total_quantity"
+)
+
+df = df.sort(col("o_totalprice").sort(ascending=False),
col("o_orderdate").sort())
+
+df.show()
diff --git a/examples/tpch/q19_discounted_revenue.py
b/examples/tpch/q19_discounted_revenue.py
new file mode 100644
index 0000000..20ad48a
--- /dev/null
+++ b/examples/tpch/q19_discounted_revenue.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 19:
+
+The Discounted Revenue query finds the gross discounted revenue for all orders
for three different
+types of parts that were shipped by air and delivered in person. Parts are
selected based on the
+combination of specific brands, a list of containers, and a range of sizes.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, udf, functions as F
+
+items_of_interest = {
+ "Brand#12": {
+ "min_quantity": 1,
+ "containers": ["SM CASE", "SM BOX", "SM PACK", "SM PKG"],
+ "max_size": 5,
+ },
+ "Brand#23": {
+ "min_quantity": 10,
+ "containers": ["MED BAG", "MED BOX", "MED PKG", "MED PACK"],
+ "max_size": 10,
+ },
+ "Brand#34": {
+ "min_quantity": 20,
+ "containers": ["LG CASE", "LG BOX", "LG PACK", "LG PKG"],
+ "max_size": 15,
+ },
+}
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns(
+ "p_partkey", "p_brand", "p_container", "p_size"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_partkey",
+ "l_quantity",
+ "l_shipmode",
+ "l_shipinstruct",
+ "l_extendedprice",
+ "l_discount",
+)
+
+# These limitations apply to all line items, so go ahead and do them first
+
+df = df_lineitem.filter(col("l_shipinstruct") == lit("DELIVER IN PERSON"))
+
+# Small note: The data generated uses "REG AIR" but the spec says "AIR REG"
+df = df.filter(
+ (col("l_shipmode") == lit("AIR")) | (col("l_shipmode") == lit("REG AIR"))
+)
+
+df = df.join(df_part, (["l_partkey"], ["p_partkey"]), "inner")
+
+
+# Create the user defined function (UDF) definition that does the work
+def is_of_interest(
+ brand_arr: pa.Array,
+ container_arr: pa.Array,
+ quantity_arr: pa.Array,
+ size_arr: pa.Array,
+) -> pa.Array:
+ """
+ The purpose of this function is to demonstrate how a UDF works, taking as
input a pyarrow Array
+ and generating a resultant Array. The length of the inputs should match
and there should be the
+ same number of rows in the output.
+ """
+ result = []
+ for idx, brand in enumerate(brand_arr):
+ brand = brand.as_py()
+ if brand in items_of_interest:
+ values_of_interest = items_of_interest[brand]
+
+ container_matches = (
+ container_arr[idx].as_py() in values_of_interest["containers"]
+ )
+
+ quantity = quantity_arr[idx].as_py()
+ quantity_matches = (
+ values_of_interest["min_quantity"]
+ <= quantity
+ <= values_of_interest["min_quantity"] + 10
+ )
+
+ size = size_arr[idx].as_py()
+ size_matches = 1 <= size <= values_of_interest["max_size"]
+
+ result.append(container_matches and quantity_matches and
size_matches)
+ else:
+ result.append(False)
+
+ return pa.array(result)
+
+
+# Turn the above function into a UDF that DataFusion can understand
+is_of_interest_udf = udf(
+ is_of_interest,
+ [pa.utf8(), pa.utf8(), pa.float32(), pa.int32()],
+ pa.bool_(),
+ "stable",
+)
+
+# Filter results using the above UDF
+df = df.filter(
+ is_of_interest_udf(
+ col("p_brand"), col("p_container"), col("l_quantity"), col("p_size")
+ )
+)
+
+df = df.aggregate(
+ [],
+ [F.sum(col("l_extendedprice") * (lit(1.0) -
col("l_discount"))).alias("revenue")],
+)
+
+df.show()
diff --git a/examples/tpch/q20_potential_part_promotion.py
b/examples/tpch/q20_potential_part_promotion.py
new file mode 100644
index 0000000..09686db
--- /dev/null
+++ b/examples/tpch/q20_potential_part_promotion.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 20:
+
+The Potential Part Promotion query identifies suppliers who have an excess of
a given part
+available; an excess is defined to be more than 50% of the parts like the
given part that the
+supplier shipped in a given year for a given nation. Only parts whose names
share a certain naming
+convention are considered.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+COLOR_OF_INTEREST = "forest"
+DATE_OF_INTEREST = "1994-01-01"
+NATION_OF_INTEREST = "CANADA"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey",
"p_name")
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_shipdate", "l_partkey", "l_suppkey", "l_quantity"
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+ "ps_partkey", "ps_suppkey", "ps_availqty"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_suppkey", "s_address", "s_name", "s_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+ "n_nationkey", "n_name"
+)
+
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, 365), type=pa.month_day_nano_interval())
+
+# Filter down dataframes
+df_nation = df_nation.filter(col("n_name") == lit(NATION_OF_INTEREST))
+df_part = df_part.filter(
+ F.substr(col("p_name"), lit(0), lit(len(COLOR_OF_INTEREST) + 1))
+ == lit(COLOR_OF_INTEREST)
+)
+
+df = df_lineitem.filter(col("l_shipdate") >= lit(date)).filter(
+ col("l_shipdate") < lit(date) + lit(interval)
+)
+
+# This will filter down the line items to the parts of interest
+df = df.join(df_part, (["l_partkey"], ["p_partkey"]), "inner")
+
+# Compute the total sold and limit ourselves to indivdual supplier/part
combinations
+df = df.aggregate(
+ [col("l_partkey"), col("l_suppkey")],
[F.sum(col("l_quantity")).alias("total_sold")]
+)
+
+df = df.join(
+ df_partsupp, (["l_partkey", "l_suppkey"], ["ps_partkey", "ps_suppkey"]),
"inner"
+)
+
+# Find cases of excess quantity
+df.filter(col("ps_availqty") > lit(0.5) * col("total_sold"))
+
+# We could do these joins earlier, but now limit to the nation of interest
suppliers
+df = df.join(df_supplier, (["ps_suppkey"], ["s_suppkey"]), "inner")
+df = df.join(df_nation, (["s_nationkey"], ["n_nationkey"]), "inner")
+
+# Restrict to the requested data per the problem statement
+df = df.select_columns("s_name", "s_address")
+
+df = df.sort(col("s_name").sort())
+
+df.show()
diff --git a/examples/tpch/q21_suppliers_kept_orders_waiting.py
b/examples/tpch/q21_suppliers_kept_orders_waiting.py
new file mode 100644
index 0000000..2f58d6e
--- /dev/null
+++ b/examples/tpch/q21_suppliers_kept_orders_waiting.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 21:
+
+The Suppliers Who Kept Orders Waiting query identifies suppliers, for a given
nation, whose product
+was part of a multi-supplier order (with current status of 'F') where they
were the only supplier
+who failed to meet the committed delivery date.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, col, lit, functions as F
+
+NATION_OF_INTEREST = "SAUDI ARABIA"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+ "o_orderkey", "o_orderstatus"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+ "l_orderkey", "l_receiptdate", "l_commitdate", "l_suppkey"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+ "s_suppkey", "s_name", "s_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+ "n_nationkey", "n_name"
+)
+
+# Limit to suppliers in the nation of interest
+df_suppliers_of_interest = df_nation.filter(col("n_name") ==
lit(NATION_OF_INTEREST))
+
+df_suppliers_of_interest = df_suppliers_of_interest.join(
+ df_supplier, (["n_nationkey"], ["s_nationkey"]), "inner"
+)
+
+# Find the failed orders and all their line items
+df = df_orders.filter(col("o_orderstatus") == lit("F"))
+
+df = df_lineitem.join(df, (["l_orderkey"], ["o_orderkey"]), "inner")
+
+# Identify the line items for which the order is failed due to.
+df = df.with_column(
+ "failed_supp",
+ F.case(col("l_receiptdate") > col("l_commitdate"))
+ .when(lit(True), col("l_suppkey"))
+ .end(),
+)
+
+# There are other ways we could do this but the purpose of this example is to
work with rows where
+# an element is an array of values. In this case, we will create two columns
of arrays. One will be
+# an array of all of the suppliers who made up this order. That way we can
filter the dataframe for
+# only orders where this array is larger than one for multiple supplier
orders. The second column
+# is all of the suppliers who failed to make their commitment. We can filter
the second column for
+# arrays with size one. That combination will give us orders that had multiple
suppliers where only
+# one failed. Use distinct=True in the blow aggregation so we don't get
multipe line items from the
+# same supplier reported in either array.
+df = df.aggregate(
+ [col("o_orderkey")],
+ [
+ F.array_agg(col("l_suppkey"), distinct=True).alias("all_suppliers"),
+ F.array_agg(col("failed_supp"),
distinct=True).alias("failed_suppliers"),
+ ],
+)
+
+# Remove the null entries that will get returned by array_agg so we can test
to see where we only
+# have a single failed supplier in a multiple supplier order
+df = df.with_column(
+ "failed_suppliers", F.array_remove(col("failed_suppliers"), lit(None))
+)
+
+# This is the check described above which will identify single failed supplier
in a multiple
+# supplier order.
+df = df.filter(F.array_length(col("failed_suppliers")) == lit(1)).filter(
+ F.array_length(col("all_suppliers")) > lit(1)
+)
+
+# Since we have an array we know is exactly one element long, we can extract
that single value.
+df = df.select(
+ col("o_orderkey"), F.array_element(col("failed_suppliers"),
lit(1)).alias("suppkey")
+)
+
+# Join to the supplier of interest list for the nation of interest
+df = df.join(df_suppliers_of_interest, (["suppkey"], ["s_suppkey"]), "inner")
+
+# Count how many orders that supplier is the only failed supplier for
+df = df.aggregate([col("s_name")],
[F.count(col("o_orderkey")).alias("numwait")])
+
+# Return in descending order
+df = df.sort(col("numwait").sort(ascending=False))
+
+df = df.limit(100)
+
+df.show()
diff --git a/examples/tpch/q22_global_sales_opportunity.py
b/examples/tpch/q22_global_sales_opportunity.py
new file mode 100644
index 0000000..d2d0c5a
--- /dev/null
+++ b/examples/tpch/q22_global_sales_opportunity.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 22:
+
+This query counts how many customers within a specific range of country codes
have not placed
+orders for 7 years but who have a greater than average “positive” account
balance. It also reflects
+the magnitude of that balance. Country code is defined as the first two
characters of c_phone.
+
+The above problem statement text is copyrighted by the Transaction Processing
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, WindowFrame, col, lit, functions as F
+
+NATION_CODE = 13
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+ "c_phone", "c_acctbal", "c_custkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns("o_custkey")
+
+# The nation code is a two digit number, but we need to convert it to a string
literal
+nation_code = lit(str(NATION_CODE))
+
+# Use the substring operation to extract the first two charaters of the phone
number
+df = df_customer.with_column("cntrycode", F.substr(col("c_phone"), lit(0),
lit(3)))
+
+# Limit our search to customers with some balance and in the country code above
+df = df.filter(col("c_acctbal") > lit(0.0))
+df = df.filter(nation_code == col("cntrycode"))
+
+# Compute the average balance. By default, the window frame is from unbounded
preceeding to the
+# current row. We want our frame to cover the entire data frame.
+window_frame = WindowFrame("rows", None, None)
+df = df.with_column(
+ "avg_balance", F.window("avg", [col("c_acctbal")],
window_frame=window_frame)
+)
+
+# Limit results to customers with above average balance
+df = df.filter(col("c_acctbal") > col("avg_balance"))
+
+# Limit results to customers with no orders
+df = df.join(df_orders, (["c_custkey"], ["o_custkey"]), "anti")
+
+# Count up the customers and the balances
+df = df.aggregate(
+ [col("cntrycode")],
+ [
+ F.count(col("c_custkey")).alias("numcust"),
+ F.sum(col("c_acctbal")).alias("totacctbal"),
+ ],
+)
+
+df = df.sort(col("cntrycode").sort())
+
+df.show()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]