This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git
The following commit(s) were added to refs/heads/main by this push:
new 8ee46ab Update README for running benchmarks in k8s (#39)
8ee46ab is described below
commit 8ee46ab8ca4eec6328feb7f3110875c3e33eb8d6
Author: Andy Grove <[email protected]>
AuthorDate: Sat Nov 2 18:10:26 2024 -0600
Update README for running benchmarks in k8s (#39)
---
tpch/Dockerfile | 6 ++++
tpch/README.md | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
tpch/tpcbench.py | 22 ++++++++++---
3 files changed, 120 insertions(+), 5 deletions(-)
diff --git a/tpch/Dockerfile b/tpch/Dockerfile
new file mode 100644
index 0000000..0d7f8e7
--- /dev/null
+++ b/tpch/Dockerfile
@@ -0,0 +1,6 @@
+FROM apache/datafusion-ray
+
+RUN sudo apt update && \
+ sudo apt install -y git
+
+RUN git clone https://github.com/apache/datafusion-benchmarks.git
\ No newline at end of file
diff --git a/tpch/README.md b/tpch/README.md
index 4852fbc..5a1d55c 100644
--- a/tpch/README.md
+++ b/tpch/README.md
@@ -21,8 +21,103 @@
## Running Benchmarks
+### Standalone Ray Cluster
+
Data and queries must be available on all nodes of the Ray cluster.
```shell
- RAY_ADDRESS='http://ray-cluster-ip-address:8265' ray job submit
--working-dir `pwd` -- python3 tpcbench.py --benchmark tpch --data
/path/to/data --queries /path/to/tpch/queries --concurrency 4
+ RAY_ADDRESS='http://ray-cluster-ip-address:8265' ray job submit
--working-dir `pwd` -- python3 tpcbench.py --benchmark tpch --data
/path/to/data --queries /path/to/tpch/queries
+```
+
+### Kubernetes
+
+Create a Docker image containing the TPC-H queries and push to a Docker
registry that is accessible from the k8s cluster.
+
+```shell
+docker build -t YOURREPO/datafusion-ray-tpch .
+```
+
+If the data files are local to the k8s nodes, then create a persistent volume
and persistent volume claim.
+
+Create a `pv.yaml` with the following content and run `kubectl apply -f
pv.yaml`.
+
+```yaml
+apiVersion: v1
+kind: PersistentVolume
+metadata:
+ name: ray-pv
+spec:
+ storageClassName: manual
+ capacity:
+ storage: 10Gi
+ accessModes:
+ - ReadWriteOnce
+ hostPath:
+ path: "/mnt/bigdata" # Adjust the path as needed
+```
+
+Create a `pvc.yaml` with the following content and run `kubectl apply -f
pvc.yaml`.
+
+```yaml
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ name: ray-pvc
+spec:
+ storageClassName: manual # Should match the PV's storageClassName if static
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 10Gi
+```
+
+Create the Ray cluster using the custom image.
+
+Create a `ray-cluster.yaml` with the following content and run `kubectl apply
-f ray-cluster.yaml`.
+
+```yaml
+apiVersion: ray.io/v1alpha1
+kind: RayCluster
+metadata:
+ name: datafusion-ray-cluster
+spec:
+ headGroupSpec:
+ rayStartParams:
+ num-cpus: "1"
+ template:
+ spec:
+ containers:
+ - name: ray-head
+ image: YOURREPO/datafusion-ray-tpch:latest
+ volumeMounts:
+ - mountPath: /mnt/bigdata # Mount path inside the container
+ name: ray-storage
+ volumes:
+ - name: ray-storage
+ persistentVolumeClaim:
+ claimName: ray-pvc # Reference the PVC name here
+ workerGroupSpecs:
+ - replicas: 2
+ groupName: "datafusion-ray"
+ rayStartParams:
+ num-cpus: "4"
+ template:
+ spec:
+ containers:
+ - name: ray-worker
+ image: YOURREPO/datafusion-ray-tpch:latest
+ volumeMounts:
+ - mountPath: /mnt/bigdata
+ name: ray-storage
+ volumes:
+ - name: ray-storage
+ persistentVolumeClaim:
+ claimName: ray-pvc
+```
+
+Run the benchmarks
+
+```shell
+ray job submit --working-dir `pwd` -- python3 tpcbench.py --benchmark tpch
--queries /home/ray/datafusion-benchmarks/tpch/queries/ --data
/mnt/bigdata/tpch/sf100
```
\ No newline at end of file
diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py
index 1e65ee0..d549e65 100644
--- a/tpch/tpcbench.py
+++ b/tpch/tpcbench.py
@@ -17,6 +17,7 @@
import argparse
import ray
+from datafusion import SessionContext, SessionConfig, RuntimeConfig
from datafusion_ray import DatafusionRayContext
from datetime import datetime
import json
@@ -41,22 +42,32 @@ def main(benchmark: str, data_path: str, query_path: str,
concurrency: int):
# use ray job submit
ray.init()
- ctx = DatafusionRayContext(concurrency)
+ runtime = (
+ RuntimeConfig()
+ )
+ config = (
+ SessionConfig()
+ .with_target_partitions(concurrency)
+ .set("datafusion.execution.parquet.pushdown_filters", "true")
+ )
+ df_ctx = SessionContext(config, runtime)
+
+ ray_ctx = DatafusionRayContext(df_ctx)
for table in table_names:
path = f"{data_path}/{table}.parquet"
print(f"Registering table {table} using path {path}")
- ctx.register_parquet(table, path)
+ df_ctx.register_parquet(table, path)
results = {
'engine': 'datafusion-python',
'benchmark': benchmark,
'data_path': data_path,
'query_path': query_path,
- 'concurrency': concurrency,
}
for query in range(1, num_queries + 1):
+
# read text file
path = f"{query_path}/q{query}.sql"
print(f"Reading query {query} using path {path}")
@@ -70,7 +81,7 @@ def main(benchmark: str, data_path: str, query_path: str,
concurrency: int):
sql = sql.strip()
if len(sql) > 0:
print(f"Executing: {sql}")
- rows = ctx.sql(sql)
+ rows = ray_ctx.sql(sql)
print(f"Query {query} returned {len(rows)} rows")
end_time = time.time()
@@ -86,6 +97,9 @@ def main(benchmark: str, data_path: str, query_path: str,
concurrency: int):
with open(results_path, "w") as f:
f.write(str)
+ # write results to stdout
+ print(str)
+
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DataFusion benchmark derived
from TPC-H / TPC-DS")
parser.add_argument("--benchmark", required=True, help="Benchmark to run
(tpch or tpcds)")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]