tusharg1993 opened a new issue, #29684:
URL: https://github.com/apache/airflow/issues/29684
### Apache Airflow Provider(s)
apache-flink
### Versions of Apache Airflow Providers
1.0.0
### Apache Airflow version
2.4.3
### Operating System
Not sure
### Deployment
Microsoft ADF Managed Airflow
### Deployment details
_No response_
### What happened
Hello, I am trying to use Flink Airflow operator to submit a Flink job. My
AKS cluster already has Flink K8s controller installed on it and works
perfectly using kubectl.
However, while trying to do the same through Airflow results in following
error
```
[2023-02-22T05:46:11.473+0000] {flink_kubernetes.py:103} INFO - Creating
flinkApplication with Context: None and op_context: {'conf':
<airflow.configuration.AirflowConfigParser object at 0x7f41ac7003d0>, 'dag':
<DAG: tutorial>, 'dag_run': <DagRun tutorial @ 2023-02-22
05:41:08.519537+00:00: manual__2023-02-22T05:41:08.519537+00:00, state:running,
queued_at: 2023-02-22 05:41:08.526958+00:00. externally triggered: True>,
'data_interval_end': DateTime(2023, 2, 22, 5, 41, 8, 519537,
tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2023, 2, 21, 5, 41, 8,
519537, tzinfo=Timezone('UTC')), 'ds': '2023-02-22', 'ds_nodash': '20230222',
'execution_date': DateTime(2023, 2, 22, 5, 41, 8, 519537,
tzinfo=Timezone('UTC')), 'inlets': [], 'logical_date': DateTime(2023, 2, 22, 5,
41, 8, 519537, tzinfo=Timezone('UTC')), 'macros': <module 'airflow.macros' from
'/home/airflow/.local/lib/python3.8/site-packages/airflow/macros/__init__.py'>,
'next_ds': '2023-02-22', 'next_ds_nodash': '20230222',
'next_execution_date': DateTime(2023, 2, 22, 5, 41, 8, 519537,
tzinfo=Timezone('UTC')), 'outlets': [], 'params': {},
'prev_data_interval_start_success': DateTime(2023, 2, 21, 5, 32, 5, 541590,
tzinfo=Timezone('UTC')), 'prev_data_interval_end_success': DateTime(2023, 2,
22, 5, 32, 5, 541590, tzinfo=Timezone('UTC')), 'prev_ds': '2023-02-22',
'prev_ds_nodash': '20230222', 'prev_execution_date': DateTime(2023, 2, 22, 5,
41, 8, 519537, tzinfo=Timezone('UTC')), 'prev_execution_date_success': None,
'prev_start_date_success': DateTime(2023, 2, 22, 5, 32, 6, 249219,
tzinfo=Timezone('UTC')), 'run_id': 'manual__2023-02-22T05:41:08.519537+00:00',
'task': <Task(FlinkKubernetesOperator): sample_flink_task>, 'task_instance':
<TaskInstance: tutorial.sample_flink_task
manual__2023-02-22T05:41:08.519537+00:00 [running]>, 'task_instance_key_str':
'tutorial__sample_flink_task__20230222', 'test_mode': False, 'ti':
<TaskInstance: tutorial.sample_flink_task
manual__2023-02-22T05:41:08.519537+00:00 [runnin
g]>, 'tomorrow_ds': '2023-02-23', 'tomorrow_ds_nodash': '20230223',
'triggering_dataset_events': <Proxy at 0x7f41973edd80 with factory <function
TaskInstance.get_template_context.<locals>.get_triggering_events at
0x7f419749caf0>>, 'ts': '2023-02-22T05:41:08.519537+00:00', 'ts_nodash':
'20230222T054108', 'ts_nodash_with_tz': '20230222T054108.519537+0000', 'var':
{'json': None, 'value': None}, 'conn': None, 'yesterday_ds': '2023-02-21',
'yesterday_ds_nodash': '20230221'}
[2023-02-22T05:46:11.474+0000] {taskinstance.py:1851} ERROR - Task failed
with exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/flink/operators/flink_kubernetes.py",
line 107, in execute
self.hook.custom_object_client.list_cluster_custom_object(
AttributeError: 'KubernetesHook' object has no attribute
'custom_object_client'
```
### What you think should happen instead
_No response_
### How to reproduce
The error can be reproduced by trying to configure a AKS kubernetes
connection with Airflow and then try to use Flink operator for submitting a
FlinkDeployment job.
```
TEST_VALID_APPLICATION_JSON = """
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink:1.16
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless
"""
t6 = FlinkKubernetesOperator(
application_file=TEST_VALID_APPLICATION_JSON,
in_cluster=False,
namespace="default",
kubernetes_conn_id="tgoyal_aks",
task_id="sample_flink_task",
)
```
### Anything else
I validated that the Kubernetes connection is configured correctly by using
the following operator successfully
```
t5 = KubernetesPodOperator(
kubernetes_conn_id="tgoyal_aks",
name="hello-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
in_cluster=False,
namespace="default"
)
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]