jliu0812 commented on issue #38005:
URL: https://github.com/apache/airflow/issues/38005#issuecomment-1986631676
I will be working on this.
Full stack trace for reference:
[2024-03-09T00:45:58.280+0000] {logging_mixin.py:188} INFO -
[2024-03-09T00:45:58.280+0000] {dagbag.py:540} INFO - Filling up the DagBag
from /files/dags/example_mapped_emr_serverless.py
[2024-03-09T00:45:58.298+0000] {processor.py:840} INFO - DAG(s)
'example_emr_serverless' retrieved from
/files/dags/example_mapped_emr_serverless.py
[2024-03-09T00:45:58.315+0000] {logging_mixin.py:188} INFO -
[2024-03-09T00:45:58.310+0000] {dagbag.py:649} ERROR - Failed to write
serialized DAG: /files/dags/example_mapped_emr_serverless.py
Traceback (most recent call last):
File "/opt/airflow/airflow/serialization/serialized_objects.py", line
1354, in serialize_dag
serialized_dag["tasks"] = [cls.serialize(task) for _, task in
dag.task_dict.items()]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line
1354, in <listcomp>
serialized_dag["tasks"] = [cls.serialize(task) for _, task in
dag.task_dict.items()]
^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 462,
in serialize
return SerializedBaseOperator.serialize_mapped_operator(var)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 857,
in serialize_mapped_operator
serialized_op = cls._serialize_node(op, include_deps=op.deps !=
MappedOperator.deps_for(BaseOperator))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 899,
in _serialize_node
op.operator_extra_links.__get__(op)
File "/opt/airflow/airflow/providers/amazon/aws/operators/emr.py", line
1273, in operator_extra_links
if self.is_monitoring_in_job_override("s3MonitoringConfiguration",
configuration_overrides):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'MappedOperator' object has no attribute
'is_monitoring_in_job_override'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/airflow/airflow/models/dagbag.py", line 637, in
_serialize_dag_capturing_errors
dag_was_updated = SerializedDagModel.write_dag(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/serialized_dag.py", line 166, in
write_dag
new_serialized_dag = cls(dag, processor_subdir)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 4, in __init__
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/state.py",
line 481, in _initialize_instance
with util.safe_reraise():
File
"/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line
70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/state.py",
line 479, in _initialize_instance
return manager.original_init(*mixed[1:], **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/serialized_dag.py", line 113, in __init__
dag_data = SerializedDAG.to_dict(dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line
1463, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag":
cls.serialize_dag(var)}
^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line
1378, in serialize_dag
raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}")
airflow.exceptions.SerializationError: Failed to serialize DAG
'example_emr_serverless': 'MappedOperator' object has no attribute
'is_monitoring_in_job_override'
[2024-03-09T00:45:58.316+0000] {logging_mixin.py:188} INFO -
[2024-03-09T00:45:58.316+0000] {dag.py:3068} INFO - Sync 1 DAGs
[2024-03-09T00:45:58.326+0000] {logging_mixin.py:188} INFO -
[2024-03-09T00:45:58.326+0000] {dag.py:3912} INFO - Setting next_dagrun for
example_emr_serverless to 2021-01-01 00:00:00+00:00, run_after=2021-01-01
00:00:00+00:00
[2024-03-09T00:45:58.345+0000] {processor.py:183} INFO - Processing
/files/dags/example_mapped_emr_serverless.py took 0.073 seconds
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]