This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new d71c436  Add examples from TPC-H (#666)
d71c436 is described below

commit d71c436ae2006843dc720bfdfcb8b3aeb434815e
Author: Tim Saucer <[email protected]>
AuthorDate: Mon May 13 10:00:18 2024 -0400

    Add examples from TPC-H (#666)
    
    * Update location of docker image
    
    * Initial commit for queries 1-3
    
    * Commit queries 4-7 of TPC-H in examples
    
    * Add required license text
    
    * Add additional text around why to use a case statement in the example
    
    * add market share example
    
    * Add example for product type profit measure
    
    * Inital commit returned item report
    
    * Linting
    
    * Initial commit of q11 example
    
    * Initial commit of q12 from tpc-h
    
    * Initial commit for customer distribution example
    
    * Initial commit of promotion effect example
    
    * Initial commit of q15 in tph-c, top supplier
    
    * Initial commit of q16 in tph-c, part supplier relationship
    
    * Initial commit of q17 in tph-c, small quatity order
    
    * Initial commit of q18 in tph-c, large volume customer
    
    * Initial commit of q19 in tph-c, discounted revenue
    
    * Initial commit of q20 in tph-c, potential part promotion
    
    * Initial commit of q21 in tph-c, supplier who kept order waiting
    
    * Initial commit of q22 in tph-c, global sales opportunity
    
    * Adding readme information and marking text as copyrighted
    
    * Minimum part cost must be identified per part not across all parts that 
match the filters
    
    * Change ordering of output rows to match spec
    
    * Set parameter to match spec
    
    * Set parameter to match spec
    
    * setting values to match spec
    
    * Linting
    
    * Expand on readme to link to examples within tpch folder
    
    * Minor typo
---
 benchmarks/tpch/tpch-gen.sh                        |   6 +-
 examples/README.md                                 |  64 ++++++++
 examples/tpch/.gitignore                           |   2 +
 examples/tpch/README.md                            |  57 +++++++
 examples/tpch/convert_data_to_parquet.py           | 142 +++++++++++++++++
 examples/tpch/q01_pricing_summary_report.py        |  90 +++++++++++
 examples/tpch/q02_minimum_cost_supplier.py         | 139 ++++++++++++++++
 examples/tpch/q03_shipping_priority.py             |  86 ++++++++++
 examples/tpch/q04_order_priority_checking.py       |  80 ++++++++++
 examples/tpch/q05_local_supplier_volume.py         | 102 ++++++++++++
 examples/tpch/q06_forecasting_revenue_change.py    |  87 ++++++++++
 examples/tpch/q07_volume_shipping.py               | 123 +++++++++++++++
 examples/tpch/q08_market_share.py                  | 175 +++++++++++++++++++++
 examples/tpch/q09_product_type_profit_measure.py   |  93 +++++++++++
 examples/tpch/q10_returned_item_reporting.py       | 108 +++++++++++++
 .../tpch/q11_important_stock_identification.py     |  82 ++++++++++
 examples/tpch/q12_ship_mode_order_priority.py      | 112 +++++++++++++
 examples/tpch/q13_customer_distribution.py         |  64 ++++++++
 examples/tpch/q14_promotion_effect.py              |  81 ++++++++++
 examples/tpch/q15_top_supplier.py                  |  87 ++++++++++
 examples/tpch/q16_part_supplier_relationship.py    |  85 ++++++++++
 examples/tpch/q17_small_quantity_order.py          |  69 ++++++++
 examples/tpch/q18_large_volume_customer.py         |  65 ++++++++
 examples/tpch/q19_discounted_revenue.py            | 137 ++++++++++++++++
 examples/tpch/q20_potential_part_promotion.py      |  97 ++++++++++++
 examples/tpch/q21_suppliers_kept_orders_waiting.py | 114 ++++++++++++++
 examples/tpch/q22_global_sales_opportunity.py      |  76 +++++++++
 27 files changed, 2420 insertions(+), 3 deletions(-)

diff --git a/benchmarks/tpch/tpch-gen.sh b/benchmarks/tpch/tpch-gen.sh
index e27472a..15cab12 100755
--- a/benchmarks/tpch/tpch-gen.sh
+++ b/benchmarks/tpch/tpch-gen.sh
@@ -29,7 +29,7 @@ FILE=./data/supplier.tbl
 if test -f "$FILE"; then
     echo "$FILE exists."
 else
-  docker run -v `pwd`/data:/data -it --rm 
ghcr.io/databloom-ai/tpch-docker:main -vf -s $1
+  docker run -v `pwd`/data:/data -it --rm ghcr.io/scalytics/tpch-docker:main 
-vf -s $1
 
   # workaround for https://github.com/apache/arrow-datafusion/issues/6147
   mv data/customer.tbl data/customer.csv
@@ -49,5 +49,5 @@ FILE=./data/answers/q1.out
 if test -f "$FILE"; then
     echo "$FILE exists."
 else
-  docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm 
ghcr.io/databloom-ai/tpch-docker:main -c "cp 
/opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
-fi
\ No newline at end of file
+  docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm 
ghcr.io/scalytics/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/* 
/data/answers/"
+fi
diff --git a/examples/README.md b/examples/README.md
index 8240595..0ef194a 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -52,3 +52,67 @@ Here is a direct link to the file used in the examples:
 - [Executing SQL on Polars](./sql-on-polars.py)
 - [Executing SQL on Pandas](./sql-on-pandas.py)
 - [Executing SQL on cuDF](./sql-on-cudf.py)
+
+## TPC-H Examples
+
+Within the subdirectory `tpch` there are 22 examples that reproduce queries in
+the TPC-H specification. These include realistic data that can be generated at
+arbitrary scale and allow the user to see use cases for a variety of data frame
+operations.
+
+In the list below we describe which new operations can be found in the 
examples.
+The queries are designed to be of increasing complexity, so it is recommended 
to
+review them in order. For brevity, the following list does not include 
operations
+found in previous examples.
+
+- [Convert CSV to Parquet](./tpch/convert_data_to_parquet.py)
+    - Read from a CSV files where the delimiter is something other than a comma
+    - Specify schema during CVS reading
+    - Write to a parquet file
+- [Pricing Summary Report](./tpch/q01_pricing_summary_report.py)
+    - Aggregation computing the maximum value, average, sum, and number of 
entries
+    - Filter data by date and interval
+    - Sorting
+- [Minimum Cost Supplier](./tpch/q02_minimum_cost_supplier.py)
+    - Window operation to find minimum
+    - Sorting in descending order
+- [Shipping Priority](./tpch/q03_shipping_priority.py)
+- [Order Priority Checking](./tpch/q04_order_priority_checking.py)
+    - Aggregating multiple times in one data frame
+- [Local Supplier Volume](./tpch/q05_local_supplier_volume.py)
+- [Forecasting Revenue Change](./tpch/q06_forecasting_revenue_change.py)
+    - Using collect and extracting values as a python object
+- [Volume Shipping](./tpch/q07_volume_shipping.py)
+    - Finding multiple distinct and mutually exclusive values within one 
dataframe
+    - Using `case` and `when` statements
+- [Market Share](./tpch/q08_market_share.py)
+    - The operations in this query are similar to those in the prior examples, 
but
+      it is a more complex example of using filters, joins, and aggregates
+    - Using left outer joins
+- [Product Type Profit Measure](./tpch/q09_product_type_profit_measure.py)
+    - Extract year from a date
+- [Returned Item Reporting](./tpch/q10_returned_item_reporting.py)
+- [Important Stock 
Identification](./tpch/q11_important_stock_identification.py)
+- [Shipping Modes and Order](./tpch/q12_ship_mode_order_priority.py)
+    - Finding non-null values using a boolean operation in a filter
+    - Case statement with default value
+- [Customer Distribution](./tpch/q13_customer_distribution.py)
+- [Promotion Effect](./tpch/q14_promotion_effect.py)
+- [Top Supplier](./tpch/q15_top_supplier.py)
+- [Parts/Supplier Relationship](./tpch/q16_part_supplier_relationship.py)
+    - Using anti joins
+    - Using regular expressions (regex)
+    - Creating arrays of literal values
+    - Determine if an element exists within an array
+- [Small-Quantity-Order Revenue](./tpch/q17_small_quantity_order.py)
+- [Large Volume Customer](./tpch/q18_large_volume_customer.py)
+- [Discounted Revenue](./tpch/q19_discounted_revenue.py)
+    - Creating a user defined function (UDF)
+    - Convert pyarrow Array to python values
+    - Filtering based on a UDF
+- [Potential Part Promotion](./tpch/q20_potential_part_promotion.py)
+    - Extracting part of a string using substr
+- [Suppliers Who Kept Orders 
Waiting](./tpch/q21_suppliers_kept_orders_waiting.py)
+    - Using array aggregation
+    - Determining the size of array elements
+- [Global Sales Opportunity](./tpch/q22_global_sales_opportunity.py)
diff --git a/examples/tpch/.gitignore b/examples/tpch/.gitignore
new file mode 100644
index 0000000..9e67bd4
--- /dev/null
+++ b/examples/tpch/.gitignore
@@ -0,0 +1,2 @@
+data
+
diff --git a/examples/tpch/README.md b/examples/tpch/README.md
new file mode 100644
index 0000000..7c52c82
--- /dev/null
+++ b/examples/tpch/README.md
@@ -0,0 +1,57 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# DataFusion Python Examples for TPC-H
+
+These examples reproduce the problems listed in the Transaction Process Council
+TPC-H benchmark. The purpose of these examples is to demonstrate how to use
+different aspects of Data Fusion and not necessarily geared towards creating 
the
+most performant queries possible. Within each example is a description of the
+problem. For users who are familiar with SQL style commands, you can compare 
the
+approaches in these examples with those listed in the specification.
+
+- https://www.tpc.org/tpch/
+
+The examples provided are based on version 2.18.0 of the TPC-H specification.
+
+## Data Setup
+
+To run these examples, you must first generate a dataset. The `dbgen` tool
+provided by TPC can create datasets of arbitrary scale. For testing it is
+typically sufficient to create a 1 gigabyte dataset. For convenience, this
+repository has a script which uses docker to create this dataset. From the
+`benchmarks/tpch` directory execute the following script.
+
+```bash
+./tpch-gen.sh 1
+```
+
+The examples provided use parquet files for the tables generated by `dbgen`.
+A python script is provided to convert the text files from `dbgen` into parquet
+files expected by the examples. From the `examples/tpch` directory you can
+execute the following command to create the necessary parquet files.
+
+```bash
+python convert_data_to_parquet.py
+```
+
+## Description of Examples
+
+For easier access, a description of the techniques demonstrated in each file
+is in the README.md file in the `examples` directory.
diff --git a/examples/tpch/convert_data_to_parquet.py 
b/examples/tpch/convert_data_to_parquet.py
new file mode 100644
index 0000000..178b7fb
--- /dev/null
+++ b/examples/tpch/convert_data_to_parquet.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is a utility function that will consumer the data generated by dbgen from 
TPC-H and convert
+it into a parquet file with the column names as expected by the TPC-H 
specification. It assumes
+the data generated resides in a path ../../benchmarks/tpch/data relative to 
the current file,
+as will be generated by the script provided in this repository.
+"""
+
+import os
+import pyarrow
+import datafusion
+
+ctx = datafusion.SessionContext()
+
+all_schemas = {}
+
+all_schemas["customer"] = [
+    ("C_CUSTKEY", pyarrow.int32()),
+    ("C_NAME", pyarrow.string()),
+    ("C_ADDRESS", pyarrow.string()),
+    ("C_NATIONKEY", pyarrow.int32()),
+    ("C_PHONE", pyarrow.string()),
+    ("C_ACCTBAL", pyarrow.float32()),
+    ("C_MKTSEGMENT", pyarrow.string()),
+    ("C_COMMENT", pyarrow.string()),
+]
+
+all_schemas["lineitem"] = [
+    ("L_ORDERKEY", pyarrow.int32()),
+    ("L_PARTKEY", pyarrow.int32()),
+    ("L_SUPPKEY", pyarrow.int32()),
+    ("L_LINENUMBER", pyarrow.int32()),
+    ("L_QUANTITY", pyarrow.float32()),
+    ("L_EXTENDEDPRICE", pyarrow.float32()),
+    ("L_DISCOUNT", pyarrow.float32()),
+    ("L_TAX", pyarrow.float32()),
+    ("L_RETURNFLAG", pyarrow.string()),
+    ("L_LINESTATUS", pyarrow.string()),
+    ("L_SHIPDATE", pyarrow.date32()),
+    ("L_COMMITDATE", pyarrow.date32()),
+    ("L_RECEIPTDATE", pyarrow.date32()),
+    ("L_SHIPINSTRUCT", pyarrow.string()),
+    ("L_SHIPMODE", pyarrow.string()),
+    ("L_COMMENT", pyarrow.string()),
+]
+
+all_schemas["nation"] = [
+    ("N_NATIONKEY", pyarrow.int32()),
+    ("N_NAME", pyarrow.string()),
+    ("N_REGIONKEY", pyarrow.int32()),
+    ("N_COMMENT", pyarrow.string()),
+]
+
+all_schemas["orders"] = [
+    ("O_ORDERKEY", pyarrow.int32()),
+    ("O_CUSTKEY", pyarrow.int32()),
+    ("O_ORDERSTATUS", pyarrow.string()),
+    ("O_TOTALPRICE", pyarrow.float32()),
+    ("O_ORDERDATE", pyarrow.date32()),
+    ("O_ORDERPRIORITY", pyarrow.string()),
+    ("O_CLERK", pyarrow.string()),
+    ("O_SHIPPRIORITY", pyarrow.int32()),
+    ("O_COMMENT", pyarrow.string()),
+]
+
+all_schemas["part"] = [
+    ("P_PARTKEY", pyarrow.int32()),
+    ("P_NAME", pyarrow.string()),
+    ("P_MFGR", pyarrow.string()),
+    ("P_BRAND", pyarrow.string()),
+    ("P_TYPE", pyarrow.string()),
+    ("P_SIZE", pyarrow.int32()),
+    ("P_CONTAINER", pyarrow.string()),
+    ("P_RETAILPRICE", pyarrow.float32()),
+    ("P_COMMENT", pyarrow.string()),
+]
+
+all_schemas["partsupp"] = [
+    ("PS_PARTKEY", pyarrow.int32()),
+    ("PS_SUPPKEY", pyarrow.int32()),
+    ("PS_AVAILQTY", pyarrow.int32()),
+    ("PS_SUPPLYCOST", pyarrow.float32()),
+    ("PS_COMMENT", pyarrow.string()),
+]
+
+all_schemas["region"] = [
+    ("r_REGIONKEY", pyarrow.int32()),
+    ("r_NAME", pyarrow.string()),
+    ("r_COMMENT", pyarrow.string()),
+]
+
+all_schemas["supplier"] = [
+    ("S_SUPPKEY", pyarrow.int32()),
+    ("S_NAME", pyarrow.string()),
+    ("S_ADDRESS", pyarrow.string()),
+    ("S_NATIONKEY", pyarrow.int32()),
+    ("S_PHONE", pyarrow.string()),
+    ("S_ACCTBAL", pyarrow.float32()),
+    ("S_COMMENT", pyarrow.string()),
+]
+
+curr_dir = os.path.dirname(os.path.abspath(__file__))
+for filename, curr_schema in all_schemas.items():
+
+    # For convenience, go ahead and convert the schema column names to 
lowercase
+    curr_schema = [(s[0].lower(), s[1]) for s in curr_schema]
+
+    # Pre-collect the output columns so we can ignore the null field we add
+    # in to handle the trailing | in the file
+    output_cols = [r[0] for r in curr_schema]
+
+    # Trailing | requires extra field for in processing
+    curr_schema.append(("some_null", pyarrow.null()))
+
+    schema = pyarrow.schema(curr_schema)
+
+    source_file = os.path.abspath(
+        os.path.join(curr_dir, f"../../benchmarks/tpch/data/{filename}.csv")
+    )
+    dest_file = os.path.abspath(os.path.join(curr_dir, 
f"./data/{filename}.parquet"))
+
+    df = ctx.read_csv(source_file, schema=schema, has_header=False, 
delimiter="|")
+
+    df = df.select_columns(*output_cols)
+
+    df.write_parquet(dest_file, compression="snappy")
diff --git a/examples/tpch/q01_pricing_summary_report.py 
b/examples/tpch/q01_pricing_summary_report.py
new file mode 100644
index 0000000..1aafcca
--- /dev/null
+++ b/examples/tpch/q01_pricing_summary_report.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 1:
+
+The Pricing Summary Report Query provides a summary pricing report for all 
lineitems shipped as of
+a given date. The date is within 60 - 120 days of the greatest ship date 
contained in the database.
+The query lists totals for extended price, discounted extended price, 
discounted extended price
+plus tax, average quantity, average extended price, and average discount. 
These aggregates are
+grouped by RETURNFLAG and LINESTATUS, and listed in ascending order of 
RETURNFLAG and LINESTATUS.
+A count of the number of lineitems in each group is included.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+ctx = SessionContext()
+
+df = ctx.read_parquet("data/lineitem.parquet")
+
+# It may be that the date can be hard coded, based on examples shown.
+# This approach will work with any date range in the provided data set.
+
+greatest_ship_date = df.aggregate(
+    [], [F.max(col("l_shipdate")).alias("shipdate")]
+).collect()[0]["shipdate"][0]
+
+# From the given problem, this is how close to the last date in the database we
+# want to report results for. It should be between 60-120 days before the end.
+DAYS_BEFORE_FINAL = 68
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, DAYS_BEFORE_FINAL), 
type=pa.month_day_nano_interval())
+
+print("Final date in database:", greatest_ship_date)
+
+# Filter data to the dates of interest
+df = df.filter(col("l_shipdate") <= lit(greatest_ship_date) - lit(interval))
+
+# Aggregate the results
+
+df = df.aggregate(
+    [col("l_returnflag"), col("l_linestatus")],
+    [
+        F.sum(col("l_quantity")).alias("sum_qty"),
+        F.sum(col("l_extendedprice")).alias("sum_base_price"),
+        F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias(
+            "sum_disc_price"
+        ),
+        F.sum(
+            col("l_extendedprice")
+            * (lit(1.0) - col("l_discount"))
+            * (lit(1.0) + col("l_tax"))
+        ).alias("sum_charge"),
+        F.avg(col("l_quantity")).alias("avg_qty"),
+        F.avg(col("l_extendedprice")).alias("avg_price"),
+        F.avg(col("l_discount")).alias("avg_disc"),
+        F.count(col("l_returnflag")).alias(
+            "count_order"
+        ),  # Counting any column should return same result
+    ],
+)
+
+# Sort per the expected result
+
+df = df.sort(col("l_returnflag").sort(), col("l_linestatus").sort())
+
+# Note: There appears to be a discrepancy between what is returned here and 
what is in the generated
+# answers file for the case of return flag N and line status O, but I did not 
investigate further.
+
+df.show()
diff --git a/examples/tpch/q02_minimum_cost_supplier.py 
b/examples/tpch/q02_minimum_cost_supplier.py
new file mode 100644
index 0000000..262e2cf
--- /dev/null
+++ b/examples/tpch/q02_minimum_cost_supplier.py
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 2:
+
+The Minimum Cost Supplier Query finds, in a given region, for each part of a 
certain type and size,
+the supplier who can supply it at minimum cost. If several suppliers in that 
region offer the
+desired part type and size at the same (minimum) cost, the query lists the 
parts from suppliers with
+the 100 highest account balances. For each supplier, the query lists the 
supplier's account balance,
+name and nation; the part's number and manufacturer; the supplier's address, 
phone number and
+comment information.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import datafusion
+from datafusion import SessionContext, col, lit, functions as F
+
+# This is the part we're looking for
+SIZE_OF_INTEREST = 15
+TYPE_OF_INTEREST = "BRASS"
+REGION_OF_INTEREST = "EUROPE"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns(
+    "p_partkey", "p_mfgr", "p_type", "p_size"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_acctbal",
+    "s_name",
+    "s_address",
+    "s_phone",
+    "s_comment",
+    "s_nationkey",
+    "s_suppkey",
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+    "ps_partkey", "ps_suppkey", "ps_supplycost"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+    "n_nationkey", "n_regionkey", "n_name"
+)
+df_region = ctx.read_parquet("data/region.parquet").select_columns(
+    "r_regionkey", "r_name"
+)
+
+# Filter down parts. Part names contain the type of interest, so we can use 
strpos to find where
+# in the p_type column the word is. `strpos` will return 0 if not found, 
otherwise the position
+# in the string where it is located.
+
+df_part = df_part.filter(
+    F.strpos(col("p_type"), lit(TYPE_OF_INTEREST)) > lit(0)
+).filter(col("p_size") == lit(SIZE_OF_INTEREST))
+
+# Filter regions down to the one of interest
+
+df_region = df_region.filter(col("r_name") == lit(REGION_OF_INTEREST))
+
+# Now that we have the region, find suppliers in that region. Suppliers are 
tied to their nation
+# and nations are tied to the region.
+
+df_nation = df_nation.join(df_region, (["n_regionkey"], ["r_regionkey"]), 
how="inner")
+df_supplier = df_supplier.join(
+    df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner"
+)
+
+# Now that we know who the potential suppliers are for the part, we can limit 
out part
+# supplies table down. We can further join down to the specific parts we've 
identified
+# as matching the request
+
+df = df_partsupp.join(df_supplier, (["ps_suppkey"], ["s_suppkey"]), 
how="inner")
+
+# Locate the minimum cost across all suppliers. There are multiple ways you 
could do this,
+# but one way is to create a window function across all suppliers, find the 
minimum, and
+# create a column of that value. We can then filter down any rows for which 
the cost and
+# minimum do not match.
+
+# The default window frame as of 5/6/2024 is from unbounded preceeding to the 
current row.
+# We want to evaluate the entire data frame, so we specify this.
+window_frame = datafusion.WindowFrame("rows", None, None)
+df = df.with_column(
+    "min_cost",
+    F.window(
+        "min",
+        [col("ps_supplycost")],
+        partition_by=[col("ps_partkey")],
+        window_frame=window_frame,
+    ),
+)
+
+df = df.filter(col("min_cost") == col("ps_supplycost"))
+
+df = df.join(df_part, (["ps_partkey"], ["p_partkey"]), how="inner")
+
+# From the problem statement, these are the values we wish to output
+
+df = df.select_columns(
+    "s_acctbal",
+    "s_name",
+    "n_name",
+    "p_partkey",
+    "p_mfgr",
+    "s_address",
+    "s_phone",
+    "s_comment",
+)
+
+# Sort and display 100 entries
+df = df.sort(
+    col("s_acctbal").sort(ascending=False),
+    col("n_name").sort(),
+    col("s_name").sort(),
+    col("p_partkey").sort(),
+)
+
+df = df.limit(100)
+
+# Show results
+
+df.show()
diff --git a/examples/tpch/q03_shipping_priority.py 
b/examples/tpch/q03_shipping_priority.py
new file mode 100644
index 0000000..78993e9
--- /dev/null
+++ b/examples/tpch/q03_shipping_priority.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 3:
+
+The Shipping Priority Query retrieves the shipping priority and potential 
revenue, defined as the
+sum of l_extendedprice * (1-l_discount), of the orders having the largest 
revenue among those that
+had not been shipped as of a given date. Orders are listed in decreasing order 
of revenue. If more
+than 10 unshipped orders exist, only the 10 orders with the largest revenue 
are listed.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, col, lit, functions as F
+
+SEGMENT_OF_INTEREST = "BUILDING"
+DATE_OF_INTEREST = "1995-03-15"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+    "c_mktsegment", "c_custkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_orderdate", "o_shippriority", "o_custkey", "o_orderkey"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_orderkey", "l_extendedprice", "l_discount", "l_shipdate"
+)
+
+# Limit dataframes to the rows of interest
+
+df_customer = df_customer.filter(col("c_mktsegment") == 
lit(SEGMENT_OF_INTEREST))
+df_orders = df_orders.filter(col("o_orderdate") < lit(DATE_OF_INTEREST))
+df_lineitem = df_lineitem.filter(col("l_shipdate") > lit(DATE_OF_INTEREST))
+
+# Join all 3 dataframes
+
+df = df_customer.join(df_orders, (["c_custkey"], ["o_custkey"]), 
how="inner").join(
+    df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner"
+)
+
+# Compute the revenue
+
+df = df.aggregate(
+    [col("l_orderkey")],
+    [
+        F.first_value(col("o_orderdate")).alias("o_orderdate"),
+        F.first_value(col("o_shippriority")).alias("o_shippriority"),
+        F.sum(col("l_extendedprice") * (lit(1.0) - 
col("l_discount"))).alias("revenue"),
+    ],
+)
+
+# Sort by priority
+
+df = df.sort(col("revenue").sort(ascending=False), col("o_orderdate").sort())
+
+# Only return 100 results
+
+df = df.limit(100)
+
+# Change the order that the columns are reported in just to match the spec
+
+df = df.select_columns("l_orderkey", "revenue", "o_orderdate", 
"o_shippriority")
+
+# Show result
+
+df.show()
diff --git a/examples/tpch/q04_order_priority_checking.py 
b/examples/tpch/q04_order_priority_checking.py
new file mode 100644
index 0000000..b691d5b
--- /dev/null
+++ b/examples/tpch/q04_order_priority_checking.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 4:
+
+The Order Priority Checking Query counts the number of orders ordered in a 
given quarter of a given
+year in which at least one lineitem was received by the customer later than 
its committed date. The
+query lists the count of such orders for each order priority sorted in 
ascending priority order.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+# Ideally we could put 3 months into the interval. See note below.
+INTERVAL_DAYS = 92
+DATE_OF_INTEREST = "1993-07-01"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_orderdate", "o_orderpriority", "o_orderkey"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_orderkey", "l_commitdate", "l_receiptdate"
+)
+
+# Create a date object from the string
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval())
+
+# Limit results to cases where commitment date before receipt date
+# Aggregate the results so we only get one row to join with the order table.
+# Alterately, and likely more idomatic is instead of `.aggregate` you could
+# do `.select_columns("l_orderkey").distinct()`. The goal here is to show
+# mulitple examples of how to use Data Fusion.
+df_lineitem = df_lineitem.filter(col("l_commitdate") < 
col("l_receiptdate")).aggregate(
+    [col("l_orderkey")], []
+)
+
+# Limit orders to date range of interest
+df_orders = df_orders.filter(col("o_orderdate") >= lit(date)).filter(
+    col("o_orderdate") < lit(date) + lit(interval)
+)
+
+# Perform the join to find only orders for which there are lineitems outside 
of expected range
+df = df_orders.join(df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner")
+
+# Based on priority, find the number of entries
+df = df.aggregate(
+    [col("o_orderpriority")], 
[F.count(col("o_orderpriority")).alias("order_count")]
+)
+
+# Sort the results
+df = df.sort(col("o_orderpriority").sort())
+
+df.show()
diff --git a/examples/tpch/q05_local_supplier_volume.py 
b/examples/tpch/q05_local_supplier_volume.py
new file mode 100644
index 0000000..7cb6e63
--- /dev/null
+++ b/examples/tpch/q05_local_supplier_volume.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 5:
+
+The Local Supplier Volume Query lists for each nation in a region the revenue 
volume that resulted
+from lineitem transactions in which the customer ordering parts and the 
supplier filling them were
+both within that nation. The query is run in order to determine whether to 
institute local
+distribution centers in a given region. The query considers only parts ordered 
in a given year. The
+query displays the nations and revenue volume in descending order by revenue. 
Revenue volume for all
+qualifying lineitems in a particular nation is defined as sum(l_extendedprice 
* (1 - l_discount)).
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+
+DATE_OF_INTEREST = "1994-01-01"
+INTERVAL_DAYS = 365
+REGION_OF_INTEREST = "ASIA"
+
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval())
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+    "c_custkey", "c_nationkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_custkey", "o_orderkey", "o_orderdate"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_suppkey", "s_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+    "n_nationkey", "n_regionkey", "n_name"
+)
+df_region = ctx.read_parquet("data/region.parquet").select_columns(
+    "r_regionkey", "r_name"
+)
+
+# Restrict dataframes to cases of interest
+df_orders = df_orders.filter(col("o_orderdate") >= lit(date)).filter(
+    col("o_orderdate") < lit(date) + lit(interval)
+)
+
+df_region = df_region.filter(col("r_name") == lit(REGION_OF_INTEREST))
+
+# Join all the dataframes
+
+df = (
+    df_customer.join(df_orders, (["c_custkey"], ["o_custkey"]), how="inner")
+    .join(df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner")
+    .join(
+        df_supplier,
+        (["l_suppkey", "c_nationkey"], ["s_suppkey", "s_nationkey"]),
+        how="inner",
+    )
+    .join(df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner")
+    .join(df_region, (["n_regionkey"], ["r_regionkey"]), how="inner")
+)
+
+# Compute the final result
+
+df = df.aggregate(
+    [col("n_name")],
+    [F.sum(col("l_extendedprice") * (lit(1.0) - 
col("l_discount"))).alias("revenue")],
+)
+
+# Sort in descending order
+
+df = df.sort(col("revenue").sort(ascending=False))
+
+df.show()
diff --git a/examples/tpch/q06_forecasting_revenue_change.py 
b/examples/tpch/q06_forecasting_revenue_change.py
new file mode 100644
index 0000000..5fbb917
--- /dev/null
+++ b/examples/tpch/q06_forecasting_revenue_change.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 6:
+
+The Forecasting Revenue Change Query considers all the lineitems shipped in a 
given year with
+discounts between DISCOUNT-0.01 and DISCOUNT+0.01. The query lists the amount 
by which the total
+revenue would have increased if these discounts had been eliminated for 
lineitems with l_quantity
+less than quantity. Note that the potential revenue increase is equal to the 
sum of
+[l_extendedprice * l_discount] for all lineitems with discounts and quantities 
in the qualifying
+range.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+# Variables from the example query
+
+DATE_OF_INTEREST = "1994-01-01"
+DISCOUT = 0.06
+DELTA = 0.01
+QUANTITY = 24
+
+INTERVAL_DAYS = 365
+
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval())
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_shipdate", "l_quantity", "l_extendedprice", "l_discount"
+)
+
+# Filter down to lineitems of interest
+
+df = (
+    df_lineitem.filter(col("l_shipdate") >= lit(date))
+    .filter(col("l_shipdate") < lit(date) + lit(interval))
+    .filter(col("l_discount") >= lit(DISCOUT) - lit(DELTA))
+    .filter(col("l_discount") <= lit(DISCOUT) + lit(DELTA))
+    .filter(col("l_quantity") < lit(QUANTITY))
+)
+
+# Add up all the "lost" revenue
+
+df = df.aggregate(
+    [], [F.sum(col("l_extendedprice") * col("l_discount")).alias("revenue")]
+)
+
+# Show the single result. We could do a `show()` but since we want to 
demonstrate features of how
+# to use Data Fusion, instead collect the result as a python object and print 
it out.
+
+# collect() should give a list of record batches. This is a small query, so we 
should get a
+# single batch back, hence the index [0]. Within each record batch we only 
care about the
+# single column result `revenue`. Since we have only one row returned because 
we aggregated
+# over the entire dataframe, we can index it at 0. Then convert the 
DoubleScalar into a
+# simple python object.
+
+revenue = df.collect()[0]["revenue"][0].as_py()
+
+# Note: the output value from this query may be dependant on the size of the 
database generated
+print(f"Potential lost revenue: {revenue:.2f}")
diff --git a/examples/tpch/q07_volume_shipping.py 
b/examples/tpch/q07_volume_shipping.py
new file mode 100644
index 0000000..3c87f93
--- /dev/null
+++ b/examples/tpch/q07_volume_shipping.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 7:
+
+The Volume Shipping Query finds, for two given nations, the gross discounted 
revenues derived from
+lineitems in which parts were shipped from a supplier in either nation to a 
customer in the other
+nation during 1995 and 1996. The query lists the supplier nation, the customer 
nation, the year,
+and the revenue from shipments that took place in that year. The query orders 
the answer by
+Supplier nation, Customer nation, and year (all ascending).
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+# Variables of interest to query over
+
+nation_1 = lit("FRANCE")
+nation_2 = lit("GERMANY")
+
+START_DATE = "1995-01-01"
+END_DATE = "1996-12-31"
+
+start_date = lit(datetime.strptime(START_DATE, "%Y-%m-%d").date())
+end_date = lit(datetime.strptime(END_DATE, "%Y-%m-%d").date())
+
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_suppkey", "s_nationkey"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_shipdate", "l_extendedprice", "l_discount", "l_suppkey", "l_orderkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_orderkey", "o_custkey"
+)
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+    "c_custkey", "c_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+    "n_nationkey", "n_name"
+)
+
+
+# Filter to time of interest
+df_lineitem = df_lineitem.filter(col("l_shipdate") >= start_date).filter(
+    col("l_shipdate") <= end_date
+)
+
+
+# A simpler way to do the following operation is to use a filter, but we also 
want to demonstrate
+# how to use case statements. Here we are assigning `n_name` to be itself when 
it is either of
+# the two nations of interest. Since there is no `otherwise()` statement, any 
values that do
+# not match these will result in a null value and then get filtered out.
+#
+# To do the same using a simle filter would be:
+# df_nation = df_nation.filter((F.col("n_name") == nation_1) | 
(F.col("n_name") == nation_2))
+df_nation = df_nation.with_column(
+    "n_name",
+    F.case(col("n_name"))
+    .when(nation_1, col("n_name"))
+    .when(nation_2, col("n_name"))
+    .end(),
+).filter(~col("n_name").is_null())
+
+
+# Limit suppliers to either nation
+df_supplier = df_supplier.join(
+    df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner"
+).select(col("s_suppkey"), col("n_name").alias("supp_nation"))
+
+# Limit customers to either nation
+df_customer = df_customer.join(
+    df_nation, (["c_nationkey"], ["n_nationkey"]), how="inner"
+).select(col("c_custkey"), col("n_name").alias("cust_nation"))
+
+# Join up all the data frames from line items, and make sure the supplier and 
customer are in
+# different nations.
+df = (
+    df_lineitem.join(df_orders, (["l_orderkey"], ["o_orderkey"]), how="inner")
+    .join(df_customer, (["o_custkey"], ["c_custkey"]), how="inner")
+    .join(df_supplier, (["l_suppkey"], ["s_suppkey"]), how="inner")
+    .filter(col("cust_nation") != col("supp_nation"))
+)
+
+# Extract out two values for every line item
+df = df.with_column(
+    "l_year", F.datepart(lit("year"), col("l_shipdate")).cast(pa.int32())
+).with_column("volume", col("l_extendedprice") * (lit(1.0) - 
col("l_discount")))
+
+# Aggregate the results
+df = df.aggregate(
+    [col("supp_nation"), col("cust_nation"), col("l_year")],
+    [F.sum(col("volume")).alias("revenue")],
+)
+
+# Sort based on problem statement requirements
+df = df.sort(col("supp_nation").sort(), col("cust_nation").sort(), 
col("l_year").sort())
+
+df.show()
diff --git a/examples/tpch/q08_market_share.py 
b/examples/tpch/q08_market_share.py
new file mode 100644
index 0000000..a415156
--- /dev/null
+++ b/examples/tpch/q08_market_share.py
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 8:
+
+The market share for a given nation within a given region is defined as the 
fraction of the
+revenue, the sum of [l_extendedprice * (1-l_discount)], from the products of a 
specified type in
+that region that was supplied by suppliers from the given nation. The query 
determines this for the
+years 1995 and 1996 presented in this order.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+supplier_nation = lit("BRAZIL")
+customer_region = lit("AMERICA")
+part_of_interest = lit("ECONOMY ANODIZED STEEL")
+
+START_DATE = "1995-01-01"
+END_DATE = "1996-12-31"
+
+start_date = lit(datetime.strptime(START_DATE, "%Y-%m-%d").date())
+end_date = lit(datetime.strptime(END_DATE, "%Y-%m-%d").date())
+
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey", 
"p_type")
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_suppkey", "s_nationkey"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_partkey", "l_extendedprice", "l_discount", "l_suppkey", "l_orderkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_orderkey", "o_custkey", "o_orderdate"
+)
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+    "c_custkey", "c_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+    "n_nationkey", "n_name", "n_regionkey"
+)
+df_region = ctx.read_parquet("data/region.parquet").select_columns(
+    "r_regionkey", "r_name"
+)
+
+# Limit possible parts to the one specified
+df_part = df_part.filter(col("p_type") == part_of_interest)
+
+# Limit orders to those in the specified range
+
+df_orders = df_orders.filter(col("o_orderdate") >= start_date).filter(
+    col("o_orderdate") <= end_date
+)
+
+# Part 1: Find customers in the region
+
+# We want customers in region specified by region_of_interest. This will be 
used to compute
+# the total sales of the part of interest. We want to know of those sales what 
fraction
+# was supplied by the nation of interest. There is no guarantee that the 
nation of
+# interest is within the region of interest.
+
+# First we find all the sales that make up the basis.
+
+df_regional_customers = df_region.filter(col("r_name") == customer_region)
+
+# After this join we have all of the possible sales nations
+df_regional_customers = df_regional_customers.join(
+    df_nation, (["r_regionkey"], ["n_regionkey"]), how="inner"
+)
+
+# Now find the possible customers
+df_regional_customers = df_regional_customers.join(
+    df_customer, (["n_nationkey"], ["c_nationkey"]), how="inner"
+)
+
+# Next find orders for these customers
+df_regional_customers = df_regional_customers.join(
+    df_orders, (["c_custkey"], ["o_custkey"]), how="inner"
+)
+
+# Find all line items from these orders
+df_regional_customers = df_regional_customers.join(
+    df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner"
+)
+
+# Limit to the part of interest
+df_regional_customers = df_regional_customers.join(
+    df_part, (["l_partkey"], ["p_partkey"]), how="inner"
+)
+
+# Compute the volume for each line item
+df_regional_customers = df_regional_customers.with_column(
+    "volume", col("l_extendedprice") * (lit(1.0) - col("l_discount"))
+)
+
+# Part 2: Find suppliers from the nation
+
+# Now that we have all of the sales of that part in the specified region, we 
need
+# to determine which of those came from suppliers in the nation we are 
interested in.
+
+df_national_suppliers = df_nation.filter(col("n_name") == supplier_nation)
+
+# Determine the suppliers by the limited nation key we have in our single row 
df above
+df_national_suppliers = df_national_suppliers.join(
+    df_supplier, (["n_nationkey"], ["s_nationkey"]), how="inner"
+)
+
+# When we join to the customer dataframe, we don't want to confuse other 
columns, so only
+# select the supplier key that we need
+df_national_suppliers = df_national_suppliers.select_columns("s_suppkey")
+
+
+# Part 3: Combine suppliers and customers and compute the market share
+
+# Now we can do a left outer join on the suppkey. Those line items from other 
suppliers
+# will get a null value. We can check for the existence of this null to 
compute a volume
+# column only from suppliers in the nation we are evaluating.
+
+df = df_regional_customers.join(
+    df_national_suppliers, (["l_suppkey"], ["s_suppkey"]), how="left"
+)
+
+# Use a case statement to compute the volume sold by suppliers in the nation 
of interest
+df = df.with_column(
+    "national_volume",
+    F.case(col("s_suppkey").is_null())
+    .when(lit(False), col("volume"))
+    .otherwise(lit(0.0)),
+)
+
+df = df.with_column(
+    "o_year", F.datepart(lit("year"), col("o_orderdate")).cast(pa.int32())
+)
+
+
+# Lastly, sum up the results
+
+df = df.aggregate(
+    [col("o_year")],
+    [
+        F.sum(col("volume")).alias("volume"),
+        F.sum(col("national_volume")).alias("national_volume"),
+    ],
+)
+
+df = df.select(
+    col("o_year"), (F.col("national_volume") / 
F.col("volume")).alias("mkt_share")
+)
+
+df = df.sort(col("o_year").sort())
+
+df.show()
diff --git a/examples/tpch/q09_product_type_profit_measure.py 
b/examples/tpch/q09_product_type_profit_measure.py
new file mode 100644
index 0000000..4fdfc1c
--- /dev/null
+++ b/examples/tpch/q09_product_type_profit_measure.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 9:
+
+The Product Type Profit Measure Query finds, for each nation and each year, 
the profit for all parts
+ordered in that year that contain a specified substring in their names and 
that were filled by a
+supplier in that nation. The profit is defined as the sum of
+[(l_extendedprice*(1-l_discount)) - (ps_supplycost * l_quantity)] for all 
lineitems describing
+parts in the specified line. The query lists the nations in ascending 
alphabetical order and, for
+each nation, the year and profit in descending order by year (most recent 
first).
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+part_color = lit("green")
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey", 
"p_name")
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_suppkey", "s_nationkey"
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+    "ps_suppkey", "ps_partkey", "ps_supplycost"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_partkey",
+    "l_extendedprice",
+    "l_discount",
+    "l_suppkey",
+    "l_orderkey",
+    "l_quantity",
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_orderkey", "o_custkey", "o_orderdate"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+    "n_nationkey", "n_name", "n_regionkey"
+)
+
+# Limit possible parts to the color specified
+df = df_part.filter(F.strpos(col("p_name"), part_color) > lit(0))
+
+# We have a series of joins that get us to limit down to the line items we need
+df = df.join(df_lineitem, (["p_partkey"], ["l_partkey"]), how="inner")
+df = df.join(df_supplier, (["l_suppkey"], ["s_suppkey"]), how="inner")
+df = df.join(df_orders, (["l_orderkey"], ["o_orderkey"]), how="inner")
+df = df.join(
+    df_partsupp, (["l_suppkey", "l_partkey"], ["ps_suppkey", "ps_partkey"]), 
how="inner"
+)
+df = df.join(df_nation, (["s_nationkey"], ["n_nationkey"]), how="inner")
+
+# Compute the intermediate values and limit down to the expressions we need
+df = df.select(
+    col("n_name").alias("nation"),
+    F.datepart(lit("year"), 
col("o_orderdate")).cast(pa.int32()).alias("o_year"),
+    (
+        col("l_extendedprice") * (lit(1.0) - col("l_discount"))
+        - (col("ps_supplycost") * col("l_quantity"))
+    ).alias("amount"),
+)
+
+# Sum up the values by nation and year
+df = df.aggregate(
+    [col("nation"), col("o_year")], [F.sum(col("amount")).alias("profit")]
+)
+
+# Sort according to the problem specification
+df = df.sort(col("nation").sort(), col("o_year").sort(ascending=False))
+
+df.show()
diff --git a/examples/tpch/q10_returned_item_reporting.py 
b/examples/tpch/q10_returned_item_reporting.py
new file mode 100644
index 0000000..1879027
--- /dev/null
+++ b/examples/tpch/q10_returned_item_reporting.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 10:
+
+The Returned Item Reporting Query finds the top 20 customers, in terms of 
their effect on lost
+revenue for a given quarter, who have returned parts. The query considers only 
parts that were
+ordered in the specified quarter. The query lists the customer's name, 
address, nation, phone
+number, account balance, comment information and revenue lost. The customers 
are listed in
+descending order of lost revenue. Revenue lost is defined as
+sum(l_extendedprice*(1-l_discount)) for all qualifying lineitems.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+DATE_START_OF_QUARTER = "1993-10-01"
+
+date_start_of_quarter = lit(datetime.strptime(DATE_START_OF_QUARTER, 
"%Y-%m-%d").date())
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval_one_quarter = lit(pa.scalar((0, 0, 120), 
type=pa.month_day_nano_interval()))
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+    "c_custkey",
+    "c_nationkey",
+    "c_name",
+    "c_acctbal",
+    "c_address",
+    "c_phone",
+    "c_comment",
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_extendedprice", "l_discount", "l_orderkey", "l_returnflag"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_orderkey", "o_custkey", "o_orderdate"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+    "n_nationkey", "n_name", "n_regionkey"
+)
+
+# limit to returns
+df_lineitem = df_lineitem.filter(col("l_returnflag") == lit("R"))
+
+
+# Rather than aggregate by all of the customer fields as you might do looking 
at the specification,
+# we can aggregate by o_custkey and then join in the customer data at the end.
+
+df = df_orders.filter(col("o_orderdate") >= date_start_of_quarter).filter(
+    col("o_orderdate") < date_start_of_quarter + interval_one_quarter
+)
+
+df = df.join(df_lineitem, (["o_orderkey"], ["l_orderkey"]), how="inner")
+
+# Compute the revenue
+df = df.aggregate(
+    [col("o_custkey")],
+    [F.sum(col("l_extendedprice") * (lit(1.0) - 
col("l_discount"))).alias("revenue")],
+)
+
+# Now join in the customer data
+df = df.join(df_customer, (["o_custkey"], ["c_custkey"]), how="inner")
+df = df.join(df_nation, (["c_nationkey"], ["n_nationkey"]), how="inner")
+
+# These are the columns the problem statement requires
+df = df.select_columns(
+    "c_custkey",
+    "c_name",
+    "revenue",
+    "c_acctbal",
+    "n_name",
+    "c_address",
+    "c_phone",
+    "c_comment",
+)
+
+# Sort the results in descending order
+df = df.sort(col("revenue").sort(ascending=False))
+
+# Only return the top 20 results
+df = df.limit(20)
+
+df.show()
diff --git a/examples/tpch/q11_important_stock_identification.py 
b/examples/tpch/q11_important_stock_identification.py
new file mode 100644
index 0000000..78fe26d
--- /dev/null
+++ b/examples/tpch/q11_important_stock_identification.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 11:
+
+The Important Stock Identification Query finds, from scanning the available 
stock of suppliers
+in a given nation, all the parts that represent a significant percentage of 
the total value of
+all available parts. The query displays the part number and the value of those 
parts in
+descending order of value.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, WindowFrame, col, lit, functions as F
+
+NATION = "GERMANY"
+FRACTION = 0.0001
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_suppkey", "s_nationkey"
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+    "ps_supplycost", "ps_availqty", "ps_suppkey", "ps_partkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+    "n_nationkey", "n_name"
+)
+
+# limit to returns
+df_nation = df_nation.filter(col("n_name") == lit(NATION))
+
+# Find part supplies of within this target nation
+
+df = df_nation.join(df_supplier, (["n_nationkey"], ["s_nationkey"]), 
how="inner")
+
+df = df.join(df_partsupp, (["s_suppkey"], ["ps_suppkey"]), how="inner")
+
+
+# Compute the value of individual parts
+df = df.with_column("value", col("ps_supplycost") * col("ps_availqty"))
+
+# Compute total value of specific parts
+df = df.aggregate([col("ps_partkey")], [F.sum(col("value")).alias("value")])
+
+# By default window functions go from unbounded preceeding to current row, but 
we want
+# to compute this sum across all rows
+window_frame = WindowFrame("rows", None, None)
+
+df = df.with_column(
+    "total_value", F.window("sum", [col("value")], window_frame=window_frame)
+)
+
+# Limit to the parts for which there is a significant value based on the 
fraction of the total
+df = df.filter(col("value") / col("total_value") > lit(FRACTION))
+
+# We only need to report on these two columns
+df = df.select_columns("ps_partkey", "value")
+
+# Sort in descending order of value
+df = df.sort(col("value").sort(ascending=False))
+
+df.show()
diff --git a/examples/tpch/q12_ship_mode_order_priority.py 
b/examples/tpch/q12_ship_mode_order_priority.py
new file mode 100644
index 0000000..e76efa5
--- /dev/null
+++ b/examples/tpch/q12_ship_mode_order_priority.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 12:
+
+The Shipping Modes and Order Priority Query counts, by ship mode, for 
lineitems actually received
+by customers in a given year, the number of lineitems belonging to orders for 
which the
+l_receiptdate exceeds the l_commitdate for two different specified ship modes. 
Only lineitems that
+were actually shipped before the l_commitdate are considered. The late 
lineitems are partitioned
+into two groups, those with priority URGENT or HIGH, and those with a priority 
other than URGENT or
+HIGH.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+SHIP_MODE_1 = "MAIL"
+SHIP_MODE_2 = "SHIP"
+DATE_OF_INTEREST = "1994-01-01"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_orderkey", "o_orderpriority"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_orderkey", "l_shipmode", "l_commitdate", "l_shipdate", "l_receiptdate"
+)
+
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, 365), type=pa.month_day_nano_interval())
+
+
+df = df_lineitem.filter(col("l_receiptdate") >= lit(date)).filter(
+    col("l_receiptdate") < lit(date) + lit(interval)
+)
+
+# Note: It is not recommended to use array_has because it treats the second 
argument as an argument
+# so if you pass it col("l_shipmode") it will pass the entire array to process 
which is very slow.
+# Instead check the position of the entry is not null.
+df = df.filter(
+    ~F.array_position(
+        F.make_array(lit(SHIP_MODE_1), lit(SHIP_MODE_2)), col("l_shipmode")
+    ).is_null()
+)
+
+# Since we have only two values, it's much easier to do this as a filter where 
the l_shipmode
+# matches either of the two values, but we want to show doing some array 
operations in this
+# example. If you want to see this done with filters, comment out the above 
line and uncomment
+# this one.
+# df = df.filter((col("l_shipmode") == lit(SHIP_MODE_1)) | (col("l_shipmode") 
== lit(SHIP_MODE_2)))
+
+
+# We need order priority, so join order df to line item
+df = df.join(df_orders, (["l_orderkey"], ["o_orderkey"]), how="inner")
+
+# Restrict to line items we care about based on the problem statement.
+df = df.filter(col("l_commitdate") < col("l_receiptdate"))
+
+df = df.filter(col("l_shipdate") < col("l_commitdate"))
+
+df = df.with_column(
+    "high_line_value",
+    F.case(col("o_orderpriority"))
+    .when(lit("1-URGENT"), lit(1))
+    .when(lit("2-HIGH"), lit(1))
+    .otherwise(lit(0)),
+)
+
+# Aggregate the results
+df = df.aggregate(
+    [col("l_shipmode")],
+    [
+        F.sum(col("high_line_value")).alias("high_line_count"),
+        F.count(col("high_line_value")).alias("all_lines_count"),
+    ],
+)
+
+# Compute the final output
+df = df.select(
+    col("l_shipmode"),
+    col("high_line_count"),
+    (col("all_lines_count") - col("high_line_count")).alias("low_line_count"),
+)
+
+df = df.sort(col("l_shipmode").sort())
+
+df.show()
diff --git a/examples/tpch/q13_customer_distribution.py 
b/examples/tpch/q13_customer_distribution.py
new file mode 100644
index 0000000..1eb9ca3
--- /dev/null
+++ b/examples/tpch/q13_customer_distribution.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 13:
+
+This query determines the distribution of customers by the number of orders 
they have made,
+including customers who have no record of orders, past or present. It counts 
and reports how many
+customers have no orders, how many have 1, 2, 3, etc. A check is made to 
ensure that the orders
+counted do not fall into one of several special categories of orders. Special 
categories are
+identified in the order comment column by looking for a particular pattern.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, col, lit, functions as F
+
+WORD_1 = "special"
+WORD_2 = "requests"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_custkey", "o_comment"
+)
+df_customer = 
ctx.read_parquet("data/customer.parquet").select_columns("c_custkey")
+
+# Use a regex to remove special cases
+df_orders = df_orders.filter(
+    F.regexp_match(col("o_comment"), lit(f"{WORD_1}.?*{WORD_2}")).is_null()
+)
+
+# Since we may have customers with no orders we must do a left join
+df = df_customer.join(df_orders, (["c_custkey"], ["o_custkey"]), how="left")
+
+# Find the number of orders for each customer
+df = df.aggregate([col("c_custkey")], 
[F.count(col("c_custkey")).alias("c_count")])
+
+# Ultimately we want to know the number of customers that have that customer 
count
+df = df.aggregate([col("c_count")], 
[F.count(col("c_count")).alias("custdist")])
+
+# We want to order the results by the highest number of customers per count
+df = df.sort(
+    col("custdist").sort(ascending=False), col("c_count").sort(ascending=False)
+)
+
+df.show()
diff --git a/examples/tpch/q14_promotion_effect.py 
b/examples/tpch/q14_promotion_effect.py
new file mode 100644
index 0000000..9ec3836
--- /dev/null
+++ b/examples/tpch/q14_promotion_effect.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 14:
+
+The Promotion Effect Query determines what percentage of the revenue in a 
given year and month was
+derived from promotional parts. The query considers only parts actually 
shipped in that month and
+gives the percentage. Revenue is defined as (l_extendedprice * (1-l_discount)).
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+DATE = "1995-09-01"
+
+date_of_interest = lit(datetime.strptime(DATE, "%Y-%m-%d").date())
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval_one_month = lit(pa.scalar((0, 0, 30), 
type=pa.month_day_nano_interval()))
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_partkey", "l_shipdate", "l_extendedprice", "l_discount"
+)
+df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey", 
"p_type")
+
+
+# Check part type begins with PROMO
+df_part = df_part.filter(
+    F.substr(col("p_type"), lit(0), lit(6)) == lit("PROMO")
+).with_column("promo_factor", lit(1.0))
+
+df_lineitem = df_lineitem.filter(col("l_shipdate") >= date_of_interest).filter(
+    col("l_shipdate") < date_of_interest + interval_one_month
+)
+
+# Left join so we can sum up the promo parts different from other parts
+df = df_lineitem.join(df_part, (["l_partkey"], ["p_partkey"]), "left")
+
+# Make a factor of 1.0 if it is a promotion, 0.0 otherwise
+df = df.with_column("promo_factor", F.coalesce(col("promo_factor"), lit(0.0)))
+df = df.with_column("revenue", col("l_extendedprice") * (lit(1.0) - 
col("l_discount")))
+
+
+# Sum up the promo and total revenue
+df = df.aggregate(
+    [],
+    [
+        F.sum(col("promo_factor") * col("revenue")).alias("promo_revenue"),
+        F.sum(col("revenue")).alias("total_revenue"),
+    ],
+)
+
+# Return the percentage of revenue from promotions
+df = df.select(
+    (lit(100.0) * col("promo_revenue") / 
col("total_revenue")).alias("promo_revenue")
+)
+
+df.show()
diff --git a/examples/tpch/q15_top_supplier.py 
b/examples/tpch/q15_top_supplier.py
new file mode 100644
index 0000000..7113e04
--- /dev/null
+++ b/examples/tpch/q15_top_supplier.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 15:
+
+The Top Supplier Query finds the supplier who contributed the most to the 
overall revenue for parts
+shipped during a given quarter of a given year. In case of a tie, the query 
lists all suppliers
+whose contribution was equal to the maximum, presented in supplier number 
order.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, WindowFrame, col, lit, functions as F
+
+DATE = "1996-01-01"
+
+date_of_interest = lit(datetime.strptime(DATE, "%Y-%m-%d").date())
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval_3_months = lit(pa.scalar((0, 0, 90), 
type=pa.month_day_nano_interval()))
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_suppkey", "l_shipdate", "l_extendedprice", "l_discount"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_suppkey",
+    "s_name",
+    "s_address",
+    "s_phone",
+)
+
+# Limit line items to the quarter of interest
+df_lineitem = df_lineitem.filter(col("l_shipdate") >= date_of_interest).filter(
+    col("l_shipdate") < date_of_interest + interval_3_months
+)
+
+df = df_lineitem.aggregate(
+    [col("l_suppkey")],
+    [
+        F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias(
+            "total_revenue"
+        )
+    ],
+)
+
+# Use a window function to find the maximum revenue across the entire dataframe
+window_frame = WindowFrame("rows", None, None)
+df = df.with_column(
+    "max_revenue", F.window("max", [col("total_revenue")], 
window_frame=window_frame)
+)
+
+# Find all suppliers whose total revenue is the same as the maximum
+df = df.filter(col("total_revenue") == col("max_revenue"))
+
+# Now that we know the supplier(s) with maximum revenue, get the rest of their 
information
+# from the supplier table
+df = df.join(df_supplier, (["l_suppkey"], ["s_suppkey"]), "inner")
+
+# Return only the colums requested
+df = df.select_columns("s_suppkey", "s_name", "s_address", "s_phone", 
"total_revenue")
+
+# If we have more than one, sort by supplier number (suppkey)
+df = df.sort(col("s_suppkey").sort())
+
+df.show()
diff --git a/examples/tpch/q16_part_supplier_relationship.py 
b/examples/tpch/q16_part_supplier_relationship.py
new file mode 100644
index 0000000..5f941d5
--- /dev/null
+++ b/examples/tpch/q16_part_supplier_relationship.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 16:
+
+The Parts/Supplier Relationship Query counts the number of suppliers who can 
supply parts that
+satisfy a particular customer's requirements. The customer is interested in 
parts of eight
+different sizes as long as they are not of a given type, not of a given brand, 
and not from a
+supplier who has had complaints registered at the Better Business Bureau. 
Results must be presented
+in descending count and ascending brand, type, and size.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+BRAND = "Brand#45"
+TYPE_TO_IGNORE = "MEDIUM POLISHED"
+SIZES_OF_INTEREST = [49, 14, 23, 45, 19, 3, 36, 9]
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns(
+    "p_partkey", "p_brand", "p_type", "p_size"
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+    "ps_suppkey", "ps_partkey"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_suppkey", "s_comment"
+)
+
+df_unwanted_suppliers = df_supplier.filter(
+    ~F.regexp_match(col("s_comment"), lit("Customer.?*Complaints")).is_null()
+)
+
+# Remove unwanted suppliers
+df_partsupp = df_partsupp.join(
+    df_unwanted_suppliers, (["ps_suppkey"], ["s_suppkey"]), "anti"
+)
+
+# Select the parts we are interested in
+df_part = df_part.filter(col("p_brand") == lit(BRAND))
+df_part = df_part.filter(
+    F.substr(col("p_type"), lit(0), lit(len(TYPE_TO_IGNORE) + 1)) != 
lit(TYPE_TO_IGNORE)
+)
+
+# Python conversion of integer to literal casts it to int64 but the data for
+# part size is stored as an int32, so perform a cast. Then check to find if 
the part
+# size is within the array of possible sizes by checking the position of it is 
not
+# null.
+p_sizes = F.make_array(*[lit(s).cast(pa.int32()) for s in SIZES_OF_INTEREST])
+df_part = df_part.filter(~F.array_position(p_sizes, col("p_size")).is_null())
+
+df = df_part.join(df_partsupp, (["p_partkey"], ["ps_partkey"]), "inner")
+
+df = df.select_columns("p_brand", "p_type", "p_size", "ps_suppkey").distinct()
+
+df = df.aggregate(
+    [col("p_brand"), col("p_type"), col("p_size")],
+    [F.count(col("ps_suppkey")).alias("supplier_cnt")],
+)
+
+df = df.sort(col("supplier_cnt").sort(ascending=False))
+
+df.show()
diff --git a/examples/tpch/q17_small_quantity_order.py 
b/examples/tpch/q17_small_quantity_order.py
new file mode 100644
index 0000000..aae238b
--- /dev/null
+++ b/examples/tpch/q17_small_quantity_order.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 17:
+
+The Small-Quantity-Order Revenue Query considers parts of a given brand and 
with a given container
+type and determines the average lineitem quantity of such parts ordered for 
all orders (past and
+pending) in the 7-year database. What would be the average yearly gross 
(undiscounted) loss in
+revenue if orders for these parts with a quantity of less than 20% of this 
average were no longer
+taken?
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, WindowFrame, col, lit, functions as F
+
+BRAND = "Brand#23"
+CONTAINER = "MED BOX"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns(
+    "p_partkey", "p_brand", "p_container"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_partkey", "l_quantity", "l_extendedprice"
+)
+
+# Limit to the problem statement's brand and container types
+df = df_part.filter(col("p_brand") == lit(BRAND)).filter(
+    col("p_container") == lit(CONTAINER)
+)
+
+# Combine data
+df = df.join(df_lineitem, (["p_partkey"], ["l_partkey"]), "inner")
+
+# Find the average quantity
+window_frame = WindowFrame("rows", None, None)
+df = df.with_column(
+    "avg_quantity", F.window("avg", [col("l_quantity")], 
window_frame=window_frame)
+)
+
+df = df.filter(col("l_quantity") < lit(0.2) * col("avg_quantity"))
+
+# Compute the total
+df = df.aggregate([], [F.sum(col("l_extendedprice")).alias("total")])
+
+# Divide by number of years in the problem statement to get average
+df = df.select((col("total") / lit(7.0)).alias("avg_yearly"))
+
+df.show()
diff --git a/examples/tpch/q18_large_volume_customer.py 
b/examples/tpch/q18_large_volume_customer.py
new file mode 100644
index 0000000..96ca08f
--- /dev/null
+++ b/examples/tpch/q18_large_volume_customer.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 18:
+
+The Large Volume Customer Query finds a list of the top 100 customers who have 
ever placed large
+quantity orders. The query lists the customer name, customer key, the order 
key, date and total
+price and the quantity for the order.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, col, lit, functions as F
+
+QUANTITY = 300
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+    "c_custkey", "c_name"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_orderkey", "o_custkey", "o_orderdate", "o_totalprice"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_orderkey", "l_quantity", "l_extendedprice"
+)
+
+df = df_lineitem.aggregate(
+    [col("l_orderkey")], [F.sum(col("l_quantity")).alias("total_quantity")]
+)
+
+# Limit to orders in which the total quantity is above a threshold
+df = df.filter(col("total_quantity") > lit(QUANTITY))
+
+# We've identified the orders of interest, now join the additional data
+# we are required to report on
+df = df.join(df_orders, (["l_orderkey"], ["o_orderkey"]), "inner")
+df = df.join(df_customer, (["o_custkey"], ["c_custkey"]), "inner")
+
+df = df.select_columns(
+    "c_name", "c_custkey", "o_orderkey", "o_orderdate", "o_totalprice", 
"total_quantity"
+)
+
+df = df.sort(col("o_totalprice").sort(ascending=False), 
col("o_orderdate").sort())
+
+df.show()
diff --git a/examples/tpch/q19_discounted_revenue.py 
b/examples/tpch/q19_discounted_revenue.py
new file mode 100644
index 0000000..20ad48a
--- /dev/null
+++ b/examples/tpch/q19_discounted_revenue.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 19:
+
+The Discounted Revenue query finds the gross discounted revenue for all orders 
for three different
+types of parts that were shipped by air and delivered in person. Parts are 
selected based on the
+combination of specific brands, a list of containers, and a range of sizes.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, udf, functions as F
+
+items_of_interest = {
+    "Brand#12": {
+        "min_quantity": 1,
+        "containers": ["SM CASE", "SM BOX", "SM PACK", "SM PKG"],
+        "max_size": 5,
+    },
+    "Brand#23": {
+        "min_quantity": 10,
+        "containers": ["MED BAG", "MED BOX", "MED PKG", "MED PACK"],
+        "max_size": 10,
+    },
+    "Brand#34": {
+        "min_quantity": 20,
+        "containers": ["LG CASE", "LG BOX", "LG PACK", "LG PKG"],
+        "max_size": 15,
+    },
+}
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns(
+    "p_partkey", "p_brand", "p_container", "p_size"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_partkey",
+    "l_quantity",
+    "l_shipmode",
+    "l_shipinstruct",
+    "l_extendedprice",
+    "l_discount",
+)
+
+# These limitations apply to all line items, so go ahead and do them first
+
+df = df_lineitem.filter(col("l_shipinstruct") == lit("DELIVER IN PERSON"))
+
+# Small note: The data generated uses "REG AIR" but the spec says "AIR REG"
+df = df.filter(
+    (col("l_shipmode") == lit("AIR")) | (col("l_shipmode") == lit("REG AIR"))
+)
+
+df = df.join(df_part, (["l_partkey"], ["p_partkey"]), "inner")
+
+
+# Create the user defined function (UDF) definition that does the work
+def is_of_interest(
+    brand_arr: pa.Array,
+    container_arr: pa.Array,
+    quantity_arr: pa.Array,
+    size_arr: pa.Array,
+) -> pa.Array:
+    """
+    The purpose of this function is to demonstrate how a UDF works, taking as 
input a pyarrow Array
+    and generating a resultant Array. The length of the inputs should match 
and there should be the
+    same number of rows in the output.
+    """
+    result = []
+    for idx, brand in enumerate(brand_arr):
+        brand = brand.as_py()
+        if brand in items_of_interest:
+            values_of_interest = items_of_interest[brand]
+
+            container_matches = (
+                container_arr[idx].as_py() in values_of_interest["containers"]
+            )
+
+            quantity = quantity_arr[idx].as_py()
+            quantity_matches = (
+                values_of_interest["min_quantity"]
+                <= quantity
+                <= values_of_interest["min_quantity"] + 10
+            )
+
+            size = size_arr[idx].as_py()
+            size_matches = 1 <= size <= values_of_interest["max_size"]
+
+            result.append(container_matches and quantity_matches and 
size_matches)
+        else:
+            result.append(False)
+
+    return pa.array(result)
+
+
+# Turn the above function into a UDF that DataFusion can understand
+is_of_interest_udf = udf(
+    is_of_interest,
+    [pa.utf8(), pa.utf8(), pa.float32(), pa.int32()],
+    pa.bool_(),
+    "stable",
+)
+
+# Filter results using the above UDF
+df = df.filter(
+    is_of_interest_udf(
+        col("p_brand"), col("p_container"), col("l_quantity"), col("p_size")
+    )
+)
+
+df = df.aggregate(
+    [],
+    [F.sum(col("l_extendedprice") * (lit(1.0) - 
col("l_discount"))).alias("revenue")],
+)
+
+df.show()
diff --git a/examples/tpch/q20_potential_part_promotion.py 
b/examples/tpch/q20_potential_part_promotion.py
new file mode 100644
index 0000000..09686db
--- /dev/null
+++ b/examples/tpch/q20_potential_part_promotion.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 20:
+
+The Potential Part Promotion query identifies suppliers who have an excess of 
a given part
+available; an excess is defined to be more than 50% of the parts like the 
given part that the
+supplier shipped in a given year for a given nation. Only parts whose names 
share a certain naming
+convention are considered.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datetime import datetime
+import pyarrow as pa
+from datafusion import SessionContext, col, lit, functions as F
+
+COLOR_OF_INTEREST = "forest"
+DATE_OF_INTEREST = "1994-01-01"
+NATION_OF_INTEREST = "CANADA"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_part = ctx.read_parquet("data/part.parquet").select_columns("p_partkey", 
"p_name")
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_shipdate", "l_partkey", "l_suppkey", "l_quantity"
+)
+df_partsupp = ctx.read_parquet("data/partsupp.parquet").select_columns(
+    "ps_partkey", "ps_suppkey", "ps_availqty"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_suppkey", "s_address", "s_name", "s_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+    "n_nationkey", "n_name"
+)
+
+date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()
+
+# Note: this is a hack on setting the values. It should be set differently once
+# https://github.com/apache/datafusion-python/issues/665 is resolved.
+interval = pa.scalar((0, 0, 365), type=pa.month_day_nano_interval())
+
+# Filter down dataframes
+df_nation = df_nation.filter(col("n_name") == lit(NATION_OF_INTEREST))
+df_part = df_part.filter(
+    F.substr(col("p_name"), lit(0), lit(len(COLOR_OF_INTEREST) + 1))
+    == lit(COLOR_OF_INTEREST)
+)
+
+df = df_lineitem.filter(col("l_shipdate") >= lit(date)).filter(
+    col("l_shipdate") < lit(date) + lit(interval)
+)
+
+# This will filter down the line items to the parts of interest
+df = df.join(df_part, (["l_partkey"], ["p_partkey"]), "inner")
+
+# Compute the total sold and limit ourselves to indivdual supplier/part 
combinations
+df = df.aggregate(
+    [col("l_partkey"), col("l_suppkey")], 
[F.sum(col("l_quantity")).alias("total_sold")]
+)
+
+df = df.join(
+    df_partsupp, (["l_partkey", "l_suppkey"], ["ps_partkey", "ps_suppkey"]), 
"inner"
+)
+
+# Find cases of excess quantity
+df.filter(col("ps_availqty") > lit(0.5) * col("total_sold"))
+
+# We could do these joins earlier, but now limit to the nation of interest 
suppliers
+df = df.join(df_supplier, (["ps_suppkey"], ["s_suppkey"]), "inner")
+df = df.join(df_nation, (["s_nationkey"], ["n_nationkey"]), "inner")
+
+# Restrict to the requested data per the problem statement
+df = df.select_columns("s_name", "s_address")
+
+df = df.sort(col("s_name").sort())
+
+df.show()
diff --git a/examples/tpch/q21_suppliers_kept_orders_waiting.py 
b/examples/tpch/q21_suppliers_kept_orders_waiting.py
new file mode 100644
index 0000000..2f58d6e
--- /dev/null
+++ b/examples/tpch/q21_suppliers_kept_orders_waiting.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 21:
+
+The Suppliers Who Kept Orders Waiting query identifies suppliers, for a given 
nation, whose product
+was part of a multi-supplier order (with current status of 'F') where they 
were the only supplier
+who failed to meet the committed delivery date.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, col, lit, functions as F
+
+NATION_OF_INTEREST = "SAUDI ARABIA"
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns(
+    "o_orderkey", "o_orderstatus"
+)
+df_lineitem = ctx.read_parquet("data/lineitem.parquet").select_columns(
+    "l_orderkey", "l_receiptdate", "l_commitdate", "l_suppkey"
+)
+df_supplier = ctx.read_parquet("data/supplier.parquet").select_columns(
+    "s_suppkey", "s_name", "s_nationkey"
+)
+df_nation = ctx.read_parquet("data/nation.parquet").select_columns(
+    "n_nationkey", "n_name"
+)
+
+# Limit to suppliers in the nation of interest
+df_suppliers_of_interest = df_nation.filter(col("n_name") == 
lit(NATION_OF_INTEREST))
+
+df_suppliers_of_interest = df_suppliers_of_interest.join(
+    df_supplier, (["n_nationkey"], ["s_nationkey"]), "inner"
+)
+
+# Find the failed orders and all their line items
+df = df_orders.filter(col("o_orderstatus") == lit("F"))
+
+df = df_lineitem.join(df, (["l_orderkey"], ["o_orderkey"]), "inner")
+
+# Identify the line items for which the order is failed due to.
+df = df.with_column(
+    "failed_supp",
+    F.case(col("l_receiptdate") > col("l_commitdate"))
+    .when(lit(True), col("l_suppkey"))
+    .end(),
+)
+
+# There are other ways we could do this but the purpose of this example is to 
work with rows where
+# an element is an array of values. In this case, we will create two columns 
of arrays. One will be
+# an array of all of the suppliers who made up this order. That way we can 
filter the dataframe for
+# only orders where this array is larger than one for multiple supplier 
orders. The second column
+# is all of the suppliers who failed to make their commitment. We can filter 
the second column for
+# arrays with size one. That combination will give us orders that had multiple 
suppliers where only
+# one failed. Use distinct=True in the blow aggregation so we don't get 
multipe line items from the
+# same supplier reported in either array.
+df = df.aggregate(
+    [col("o_orderkey")],
+    [
+        F.array_agg(col("l_suppkey"), distinct=True).alias("all_suppliers"),
+        F.array_agg(col("failed_supp"), 
distinct=True).alias("failed_suppliers"),
+    ],
+)
+
+# Remove the null entries that will get returned by array_agg so we can test 
to see where we only
+# have a single failed supplier in a multiple supplier order
+df = df.with_column(
+    "failed_suppliers", F.array_remove(col("failed_suppliers"), lit(None))
+)
+
+# This is the check described above which will identify single failed supplier 
in a multiple
+# supplier order.
+df = df.filter(F.array_length(col("failed_suppliers")) == lit(1)).filter(
+    F.array_length(col("all_suppliers")) > lit(1)
+)
+
+# Since we have an array we know is exactly one element long, we can extract 
that single value.
+df = df.select(
+    col("o_orderkey"), F.array_element(col("failed_suppliers"), 
lit(1)).alias("suppkey")
+)
+
+# Join to the supplier of interest list for the nation of interest
+df = df.join(df_suppliers_of_interest, (["suppkey"], ["s_suppkey"]), "inner")
+
+# Count how many orders that supplier is the only failed supplier for
+df = df.aggregate([col("s_name")], 
[F.count(col("o_orderkey")).alias("numwait")])
+
+# Return in descending order
+df = df.sort(col("numwait").sort(ascending=False))
+
+df = df.limit(100)
+
+df.show()
diff --git a/examples/tpch/q22_global_sales_opportunity.py 
b/examples/tpch/q22_global_sales_opportunity.py
new file mode 100644
index 0000000..d2d0c5a
--- /dev/null
+++ b/examples/tpch/q22_global_sales_opportunity.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+TPC-H Problem Statement Query 22:
+
+This query counts how many customers within a specific range of country codes 
have not placed
+orders for 7 years but who have a greater than average “positive” account 
balance. It also reflects
+the magnitude of that balance. Country code is defined as the first two 
characters of c_phone.
+
+The above problem statement text is copyrighted by the Transaction Processing 
Performance Council
+as part of their TPC Benchmark H Specification revision 2.18.0.
+"""
+
+from datafusion import SessionContext, WindowFrame, col, lit, functions as F
+
+NATION_CODE = 13
+
+# Load the dataframes we need
+
+ctx = SessionContext()
+
+df_customer = ctx.read_parquet("data/customer.parquet").select_columns(
+    "c_phone", "c_acctbal", "c_custkey"
+)
+df_orders = ctx.read_parquet("data/orders.parquet").select_columns("o_custkey")
+
+# The nation code is a two digit number, but we need to convert it to a string 
literal
+nation_code = lit(str(NATION_CODE))
+
+# Use the substring operation to extract the first two charaters of the phone 
number
+df = df_customer.with_column("cntrycode", F.substr(col("c_phone"), lit(0), 
lit(3)))
+
+# Limit our search to customers with some balance and in the country code above
+df = df.filter(col("c_acctbal") > lit(0.0))
+df = df.filter(nation_code == col("cntrycode"))
+
+# Compute the average balance. By default, the window frame is from unbounded 
preceeding to the
+# current row. We want our frame to cover the entire data frame.
+window_frame = WindowFrame("rows", None, None)
+df = df.with_column(
+    "avg_balance", F.window("avg", [col("c_acctbal")], 
window_frame=window_frame)
+)
+
+# Limit results to customers with above average balance
+df = df.filter(col("c_acctbal") > col("avg_balance"))
+
+# Limit results to customers with no orders
+df = df.join(df_orders, (["c_custkey"], ["o_custkey"]), "anti")
+
+# Count up the customers and the balances
+df = df.aggregate(
+    [col("cntrycode")],
+    [
+        F.count(col("c_custkey")).alias("numcust"),
+        F.sum(col("c_acctbal")).alias("totacctbal"),
+    ],
+)
+
+df = df.sort(col("cntrycode").sort())
+
+df.show()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to