This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ray.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ee46ab  Update README for running benchmarks in k8s (#39)
8ee46ab is described below

commit 8ee46ab8ca4eec6328feb7f3110875c3e33eb8d6
Author: Andy Grove <[email protected]>
AuthorDate: Sat Nov 2 18:10:26 2024 -0600

    Update README for running benchmarks in k8s (#39)
---
 tpch/Dockerfile  |  6 ++++
 tpch/README.md   | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 tpch/tpcbench.py | 22 ++++++++++---
 3 files changed, 120 insertions(+), 5 deletions(-)

diff --git a/tpch/Dockerfile b/tpch/Dockerfile
new file mode 100644
index 0000000..0d7f8e7
--- /dev/null
+++ b/tpch/Dockerfile
@@ -0,0 +1,6 @@
+FROM apache/datafusion-ray
+
+RUN sudo apt update && \
+    sudo apt install -y git
+
+RUN git clone https://github.com/apache/datafusion-benchmarks.git
\ No newline at end of file
diff --git a/tpch/README.md b/tpch/README.md
index 4852fbc..5a1d55c 100644
--- a/tpch/README.md
+++ b/tpch/README.md
@@ -21,8 +21,103 @@
 
 ## Running Benchmarks
 
+### Standalone Ray Cluster
+
 Data and queries must be available on all nodes of the Ray cluster.
 
 ```shell
- RAY_ADDRESS='http://ray-cluster-ip-address:8265'  ray job submit 
--working-dir `pwd` -- python3 tpcbench.py --benchmark tpch --data 
/path/to/data --queries /path/to/tpch/queries --concurrency 4
+ RAY_ADDRESS='http://ray-cluster-ip-address:8265'  ray job submit 
--working-dir `pwd` -- python3 tpcbench.py --benchmark tpch --data 
/path/to/data --queries /path/to/tpch/queries
+```
+
+### Kubernetes
+
+Create a Docker image containing the TPC-H queries and push to a Docker 
registry that is accessible from the k8s cluster.
+
+```shell
+docker build -t YOURREPO/datafusion-ray-tpch .
+```
+
+If the data files are local to the k8s nodes, then create a persistent volume 
and persistent volume claim.
+
+Create a `pv.yaml` with the following content and run `kubectl apply -f 
pv.yaml`.
+
+```yaml
+apiVersion: v1
+kind: PersistentVolume
+metadata:
+  name: ray-pv
+spec:
+  storageClassName: manual
+  capacity:
+    storage: 10Gi
+  accessModes:
+    - ReadWriteOnce
+  hostPath:
+    path: "/mnt/bigdata"  # Adjust the path as needed
+```
+
+Create a `pvc.yaml` with the following content and run `kubectl apply -f 
pvc.yaml`.
+
+```yaml
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: ray-pvc
+spec:
+  storageClassName: manual  # Should match the PV's storageClassName if static
+  accessModes:
+    - ReadWriteOnce
+  resources:
+    requests:
+      storage: 10Gi
+```
+
+Create the Ray cluster using the custom image.
+
+Create a `ray-cluster.yaml` with the following content and run `kubectl apply 
-f ray-cluster.yaml`.
+
+```yaml
+apiVersion: ray.io/v1alpha1
+kind: RayCluster
+metadata:
+  name: datafusion-ray-cluster
+spec:
+  headGroupSpec:
+    rayStartParams:
+      num-cpus: "1"
+    template:
+      spec:
+        containers:
+          - name: ray-head
+            image: YOURREPO/datafusion-ray-tpch:latest
+            volumeMounts:
+              - mountPath: /mnt/bigdata  # Mount path inside the container
+                name: ray-storage
+        volumes:
+          - name: ray-storage
+            persistentVolumeClaim:
+              claimName: ray-pvc  # Reference the PVC name here
+  workerGroupSpecs:
+    - replicas: 2
+      groupName: "datafusion-ray"
+      rayStartParams:
+        num-cpus: "4"
+      template:
+        spec:
+          containers:
+            - name: ray-worker
+              image: YOURREPO/datafusion-ray-tpch:latest
+              volumeMounts:
+                - mountPath: /mnt/bigdata
+                  name: ray-storage
+          volumes:
+            - name: ray-storage
+              persistentVolumeClaim:
+                claimName: ray-pvc
+```
+
+Run the benchmarks
+
+```shell
+ray job submit --working-dir `pwd` -- python3 tpcbench.py --benchmark tpch 
--queries /home/ray/datafusion-benchmarks/tpch/queries/ --data 
/mnt/bigdata/tpch/sf100
 ```
\ No newline at end of file
diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py
index 1e65ee0..d549e65 100644
--- a/tpch/tpcbench.py
+++ b/tpch/tpcbench.py
@@ -17,6 +17,7 @@
 
 import argparse
 import ray
+from datafusion import SessionContext, SessionConfig, RuntimeConfig
 from datafusion_ray import DatafusionRayContext
 from datetime import datetime
 import json
@@ -41,22 +42,32 @@ def main(benchmark: str, data_path: str, query_path: str, 
concurrency: int):
     # use ray job submit
     ray.init()
 
-    ctx = DatafusionRayContext(concurrency)
+    runtime = (
+        RuntimeConfig()
+    )
+    config = (
+        SessionConfig()
+        .with_target_partitions(concurrency)
+        .set("datafusion.execution.parquet.pushdown_filters", "true")
+    )
+    df_ctx = SessionContext(config, runtime)
+
+    ray_ctx = DatafusionRayContext(df_ctx)
 
     for table in table_names:
         path = f"{data_path}/{table}.parquet"
         print(f"Registering table {table} using path {path}")
-        ctx.register_parquet(table, path)
+        df_ctx.register_parquet(table, path)
 
     results = {
         'engine': 'datafusion-python',
         'benchmark': benchmark,
         'data_path': data_path,
         'query_path': query_path,
-        'concurrency': concurrency,
     }
 
     for query in range(1, num_queries + 1):
+
         # read text file
         path = f"{query_path}/q{query}.sql"
         print(f"Reading query {query} using path {path}")
@@ -70,7 +81,7 @@ def main(benchmark: str, data_path: str, query_path: str, 
concurrency: int):
                 sql = sql.strip()
                 if len(sql) > 0:
                     print(f"Executing: {sql}")
-                    rows = ctx.sql(sql)
+                    rows = ray_ctx.sql(sql)
 
                     print(f"Query {query} returned {len(rows)} rows")
             end_time = time.time()
@@ -86,6 +97,9 @@ def main(benchmark: str, data_path: str, query_path: str, 
concurrency: int):
     with open(results_path, "w") as f:
         f.write(str)
 
+    # write results to stdout
+    print(str)
+
 if __name__ == "__main__":
     parser = argparse.ArgumentParser(description="DataFusion benchmark derived 
from TPC-H / TPC-DS")
     parser.add_argument("--benchmark", required=True, help="Benchmark to run 
(tpch or tpcds)")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to