This is an automated email from the ASF dual-hosted git repository.

iwasakims pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bigtop.git


The following commit(s) were added to refs/heads/master by this push:
     new 76c48c5af BIGTOP-4370. Integrate the bigpetstore-spark workflow with 
Airflow. (#1355)
76c48c5af is described below

commit 76c48c5af06b63007da176ff2ffcc5dd851e9213
Author: Kengo Seki <sek...@apache.org>
AuthorDate: Wed Jul 23 14:38:42 2025 +0900

    BIGTOP-4370. Integrate the bigpetstore-spark workflow with Airflow. (#1355)
---
 bigtop-bigpetstore/bigpetstore-spark/README.md     |  45 +++++++++++++
 bigtop-bigpetstore/bigpetstore-spark/build.gradle  |   2 +-
 .../bigpetstore-spark/images/dag_list.png          | Bin 0 -> 245248 bytes
 .../bigpetstore-spark/images/running_dag.png       | Bin 0 -> 330673 bytes
 .../bigpetstore-spark/images/trigger_dag.png       | Bin 0 -> 182612 bytes
 bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml |   1 +
 .../puppet/modules/airflow/manifests/init.pp       |  25 ++++++-
 .../airflow/templates/example_bigpetstore.py       |  72 +++++++++++++++++++++
 bigtop-packages/src/common/airflow/airflow.default |   1 +
 9 files changed, 144 insertions(+), 2 deletions(-)

diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md 
b/bigtop-bigpetstore/bigpetstore-spark/README.md
index e550e48f3..9cd1bdaf3 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/README.md
+++ b/bigtop-bigpetstore/bigpetstore-spark/README.md
@@ -172,3 +172,48 @@ spark-submit --master local[2] --class 
org.apache.bigtop.bigpetstore.spark.analy
 ```
 
 The resulting json file will contain lists of customers, products, and 
products recommended to each customer.
+
+Airflow Integration
+--------------------------------------------
+
+The steps described above are consolidated into [a single Airflow 
DAG](../../bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py).
+You can try it as follows. The example here is tested on Debian 12, and 
assuming [Puppet](../../bigtop_toolchain/bin/puppetize.sh) and [Bigtop 
toolchain](../../bigtop_toolchain/README.md) are already installed and BPS 
Spark is built as a shadowed JAR in accordance with [the guide 
above](#building-and-running-with-spark).
+
+1. (Optional) build Airflow and Spark including their dependencies.
+   You can skip it if you use Bigtop's binary distribution packages on its 
public repository.
+
+```
+$ ./gradlew allclean airflow-pkg spark-pkg repo -Dbuildwithdeps=true
+```
+
+2. Deploy the packages above though Bigtop's Puppet manifests with appropriate 
parameters:
+
+```
+$ cat bigtop-deploy/puppet/hieradata/site.yaml
+bigtop::bigtop_repo_gpg_check: false
+bigtop::bigtop_repo_uri: [...]
+bigtop::hadoop_head_node: ...
+hadoop::hadoop_storage_dirs: [/data]
+hadoop_cluster_node::cluster_components: [bigtop-utils, hdfs, yarn, spark, 
airflow]
+airflow::server::install_bigpetstore_example: true  # Enable the BigPetStore 
DAG
+airflow::server::load_examples: false  # Disable Airflow's default examples 
for simplicity
+$ sudo cp -r bigtop-deploy/puppet/hiera* /etc/puppet
+$ sudo puppet apply --hiera_config=/etc/puppet/hiera.yaml 
--modulepath=/vagrant_data/bigtop/bigtop-deploy/puppet/modules:/etc/puppet/code/modules:/usr/share/puppet/modules
 /vagrant_data/bigtop/bigtop-deploy/puppet/manifests
+```
+
+3. Create output directories on HDFS with the airflow owner:
+
+```
+$ sudo -u hdfs hdfs dfs -mkdir /user/airflow
+$ sudo -u hdfs hdfs dfs -chown airflow:airflow /user/airflow
+```
+
+4. Login Airflow's web UI with admin/admin and wait for the DAG picked up for 
a while.
+   Once it's found, you can run it through the triangle button on the right.
+   ![dag_list](images/dag_list.png)
+
+5. Trigger the DAG with the path to the BPS Spark JAR.
+   ![trigger_dag](images/trigger_dag.png)
+
+6. If settings are appropriate, the BPS DAG should successfully run as follows:
+   ![running_dag](images/running_dag.png)
diff --git a/bigtop-bigpetstore/bigpetstore-spark/build.gradle 
b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
index 970065b48..eae173eac 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/build.gradle
+++ b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
@@ -91,7 +91,7 @@ dependencies {
     compile("org.apache.spark:spark-core_${scalaVersion}:${sparkVersion}")
     compile("org.apache.spark:spark-mllib_${scalaVersion}:${sparkVersion}")
     compile("org.apache.spark:spark-sql_${scalaVersion}:${sparkVersion}")
-    compile "org.apache.bigtop:bigpetstore-data-generator:3.5.0-SNAPSHOT"
+    compile "org.apache.bigtop:bigpetstore-data-generator:3.6.0-SNAPSHOT"
     compile "org.json4s:json4s-jackson_${scalaVersion}:3.6.12"
 
     testCompile "junit:junit:4.13.2"
diff --git a/bigtop-bigpetstore/bigpetstore-spark/images/dag_list.png 
b/bigtop-bigpetstore/bigpetstore-spark/images/dag_list.png
new file mode 100644
index 000000000..3e18b441a
Binary files /dev/null and 
b/bigtop-bigpetstore/bigpetstore-spark/images/dag_list.png differ
diff --git a/bigtop-bigpetstore/bigpetstore-spark/images/running_dag.png 
b/bigtop-bigpetstore/bigpetstore-spark/images/running_dag.png
new file mode 100644
index 000000000..b8dbde8a2
Binary files /dev/null and 
b/bigtop-bigpetstore/bigpetstore-spark/images/running_dag.png differ
diff --git a/bigtop-bigpetstore/bigpetstore-spark/images/trigger_dag.png 
b/bigtop-bigpetstore/bigpetstore-spark/images/trigger_dag.png
new file mode 100644
index 000000000..9c18cb33c
Binary files /dev/null and 
b/bigtop-bigpetstore/bigpetstore-spark/images/trigger_dag.png differ
diff --git a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml 
b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
index c27cc26ad..630ba9166 100644
--- a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
+++ b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
@@ -218,3 +218,4 @@ ranger::admin::admin_password: "Admin01234"
 airflow::server::executor: "SequentialExecutor"
 airflow::server::load_examples: "True"
 airflow::server::sql_alchemy_conn: "sqlite:////var/lib/airflow/airflow.db"
+airflow::server::install_bigpetstore_example: "False"
diff --git a/bigtop-deploy/puppet/modules/airflow/manifests/init.pp 
b/bigtop-deploy/puppet/modules/airflow/manifests/init.pp
index 1c26b3964..0dd9cb15d 100644
--- a/bigtop-deploy/puppet/modules/airflow/manifests/init.pp
+++ b/bigtop-deploy/puppet/modules/airflow/manifests/init.pp
@@ -20,7 +20,7 @@ class airflow {
     }
   }
 
-  class server($executor, $load_examples, $sql_alchemy_conn) {
+  class server($executor, $load_examples, $sql_alchemy_conn, 
$install_bigpetstore_example=False) {
     package { 'airflow':
       ensure => latest,
     }
@@ -55,5 +55,28 @@ class airflow {
       ensure  => running,
       require => Exec['airflow-db-init'],
     }
+
+    if $install_bigpetstore_example {
+      exec { 'install-spark-provider':
+        command     => "/usr/lib/airflow/bin/python3 -m pip install 
apache-airflow-providers-apache-spark 'pyspark<4'",
+        environment => ['AIRFLOW_HOME=/var/lib/airflow'],
+        user        => 'root',
+        require     => Package['airflow'],
+      }
+
+      file { '/var/lib/airflow/dags':
+        ensure  => 'directory',
+        owner   => 'airflow',
+        group   => 'airflow',
+        require => Package['airflow'],
+      }
+
+      file { '/var/lib/airflow/dags/example_bigpetstore.py':
+        content => template('airflow/example_bigpetstore.py'),
+        owner   => 'airflow',
+        group   => 'airflow',
+        require => File['/var/lib/airflow/dags'],
+      }
+    }
   }
 }
diff --git 
a/bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py 
b/bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py
new file mode 100644
index 000000000..1b8cb7f2a
--- /dev/null
+++ b/bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pendulum
+
+from airflow.models import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.providers.apache.spark.operators.spark_submit import 
SparkSubmitOperator
+
+with DAG(
+    "bigpetstore_dag",
+    params={"bigpetstore_jar_path": Param("bigpetstore-spark-3.6.0-all.jar")},
+    schedule=None,
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+) as dag:
+    clean_hdfs_task = BashOperator(
+        bash_command="hdfs dfs -rm -f -r generated_data transformed_data",
+        task_id="clean_hdfs_task",
+    )
+
+    clean_fs_task = BashOperator(
+        bash_command="rm -f /tmp/PetStoreStats.json /tmp/recommendations.json",
+        task_id="clean_fs_task",
+    )
+
+    generate_task = SparkSubmitOperator(
+        application="{{ params.bigpetstore_jar_path }}",
+        application_args=["generated_data", "10", "1000", "365", "345"],
+        java_class="org.apache.bigtop.bigpetstore.spark.generator.SparkDriver",
+        task_id="generate_task"
+    )
+
+    transform_task = SparkSubmitOperator(
+        application="{{ params.bigpetstore_jar_path }}",
+        application_args=["generated_data", "transformed_data"],
+        java_class="org.apache.bigtop.bigpetstore.spark.etl.SparkETL",
+        task_id="transform_task"
+    )
+
+    analyze_task = SparkSubmitOperator(
+        application="{{ params.bigpetstore_jar_path }}",
+        application_args=["transformed_data", "/tmp/PetStoreStats.json"],
+        
java_class="org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics",
+        task_id="analyze_task"
+    )
+
+    recommend_task = SparkSubmitOperator(
+        application="{{ params.bigpetstore_jar_path }}",
+        application_args=["transformed_data", "/tmp/recommendations.json"],
+        
java_class="org.apache.bigtop.bigpetstore.spark.analytics.RecommendProducts",
+        task_id="recommend_task"
+    )
+
+    [clean_hdfs_task, clean_fs_task] >> generate_task >> transform_task >> 
[analyze_task, recommend_task]
+
+if __name__ == "__main__":
+    dag.test()
diff --git a/bigtop-packages/src/common/airflow/airflow.default 
b/bigtop-packages/src/common/airflow/airflow.default
index 3c73385b5..7fb32a7ea 100644
--- a/bigtop-packages/src/common/airflow/airflow.default
+++ b/bigtop-packages/src/common/airflow/airflow.default
@@ -21,4 +21,5 @@
 #
 # AIRFLOW_CONFIG=
 AIRFLOW_HOME=/var/lib/airflow
+HADOOP_CONF_DIR=/etc/hadoop/conf
 
PATH=/usr/lib/airflow/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

Reply via email to