This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 195abf8f71 SparkSubmit: Adding propertyfiles option (#36164)
195abf8f71 is described below
commit 195abf8f7116c9e37fd3dc69bfee8cbf546c5a3f
Author: Ashish Patel <[email protected]>
AuthorDate: Mon Dec 11 22:02:32 2023 +0530
SparkSubmit: Adding propertyfiles option (#36164)
Introduce a new parameter 'properties_file' in SparkSubmitHook and
SparkSubmitOperator. This allows loading of extra properties from a specified
file, defaulting to conf/spark-defaults.conf if nothing is specified. The goal
is to provide greater flexibility for Spark configuration. The changes have
been tested for the affected files.
---------
Co-authored-by: ghostp13409 <[email protected]>
---
airflow/providers/apache/spark/hooks/spark_submit.py | 6 ++++++
airflow/providers/apache/spark/operators/spark_submit.py | 6 ++++++
tests/providers/apache/spark/operators/test_spark_submit.py | 3 +++
3 files changed, 15 insertions(+)
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py
b/airflow/providers/apache/spark/hooks/spark_submit.py
index be087dada9..b64f3a0c22 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -78,6 +78,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
:param verbose: Whether to pass the verbose flag to spark-submit process
for debugging
:param spark_binary: The command to use for spark submit.
Some distros may use spark2-submit or spark3-submit.
+ :param properties_file: Path to a file from which to load extra
properties. If not
+ specified, this will look for
conf/spark-defaults.conf.
:param use_krb5ccache: if True, configure spark to use ticket cache
instead of relying
on keytab for Kerberos login
"""
@@ -122,6 +124,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
env_vars: dict[str, Any] | None = None,
verbose: bool = False,
spark_binary: str | None = None,
+ properties_file: str | None = None,
*,
use_krb5ccache: bool = False,
) -> None:
@@ -155,6 +158,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
self._yarn_application_id: str | None = None
self._kubernetes_driver_pod: str | None = None
self.spark_binary = spark_binary
+ self._properties_file = properties_file
self._connection = self._resolve_connection()
self._is_yarn = "yarn" in self._connection["master"]
self._is_kubernetes = "k8s" in self._connection["master"]
@@ -292,6 +296,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
"--conf",
f"spark.kubernetes.namespace={self._connection['namespace']}",
]
+ if self._properties_file:
+ connection_cmd += ["--properties-file", self._properties_file]
if self._files:
connection_cmd += ["--files", self._files]
if self._py_files:
diff --git a/airflow/providers/apache/spark/operators/spark_submit.py
b/airflow/providers/apache/spark/operators/spark_submit.py
index 903ff9b720..3f4c539536 100644
--- a/airflow/providers/apache/spark/operators/spark_submit.py
+++ b/airflow/providers/apache/spark/operators/spark_submit.py
@@ -69,6 +69,8 @@ class SparkSubmitOperator(BaseOperator):
:param verbose: Whether to pass the verbose flag to spark-submit process
for debugging
:param spark_binary: The command to use for spark submit.
Some distros may use spark2-submit or spark3-submit.
+ :param properties_file: Path to a file from which to load extra
properties. If not
+ specified, this will look for
conf/spark-defaults.conf.
:param use_krb5ccache: if True, configure spark to use ticket cache
instead of relying
on keytab for Kerberos login
"""
@@ -88,6 +90,7 @@ class SparkSubmitOperator(BaseOperator):
"_name",
"_application_args",
"_env_vars",
+ "_properties_file",
)
ui_color = WEB_COLORS["LIGHTORANGE"]
@@ -120,6 +123,7 @@ class SparkSubmitOperator(BaseOperator):
env_vars: dict[str, Any] | None = None,
verbose: bool = False,
spark_binary: str | None = None,
+ properties_file: str | None = None,
use_krb5ccache: bool = False,
**kwargs: Any,
) -> None:
@@ -149,6 +153,7 @@ class SparkSubmitOperator(BaseOperator):
self._env_vars = env_vars
self._verbose = verbose
self._spark_binary = spark_binary
+ self._properties_file = properties_file
self._hook: SparkSubmitHook | None = None
self._conn_id = conn_id
self._use_krb5ccache = use_krb5ccache
@@ -191,5 +196,6 @@ class SparkSubmitOperator(BaseOperator):
env_vars=self._env_vars,
verbose=self._verbose,
spark_binary=self._spark_binary,
+ properties_file=self._properties_file,
use_krb5ccache=self._use_krb5ccache,
)
diff --git a/tests/providers/apache/spark/operators/test_spark_submit.py
b/tests/providers/apache/spark/operators/test_spark_submit.py
index 3c6aa78336..884036467f 100644
--- a/tests/providers/apache/spark/operators/test_spark_submit.py
+++ b/tests/providers/apache/spark/operators/test_spark_submit.py
@@ -53,6 +53,7 @@ class TestSparkSubmitOperator:
"application": "test_application.py",
"driver_memory": "3g",
"java_class": "com.foo.bar.AppMain",
+ "properties_file": "conf/spark-custom.conf",
"application_args": [
"-f",
"foo",
@@ -120,6 +121,7 @@ class TestSparkSubmitOperator:
],
"spark_binary": "sparky",
"use_krb5ccache": True,
+ "properties_file": "conf/spark-custom.conf",
}
assert conn_id == operator._conn_id
@@ -147,6 +149,7 @@ class TestSparkSubmitOperator:
assert expected_dict["driver_memory"] == operator._driver_memory
assert expected_dict["application_args"] == operator._application_args
assert expected_dict["spark_binary"] == operator._spark_binary
+ assert expected_dict["properties_file"] == operator._properties_file
assert expected_dict["use_krb5ccache"] == operator._use_krb5ccache
@pytest.mark.db_test