jliu0812 commented on issue #38005:
URL: https://github.com/apache/airflow/issues/38005#issuecomment-1986631676

   I will be working on this.
   
   Full stack trace for reference:
   
   [2024-03-09T00:45:58.280+0000] {logging_mixin.py:188} INFO - 
[2024-03-09T00:45:58.280+0000] {dagbag.py:540} INFO - Filling up the DagBag 
from /files/dags/example_mapped_emr_serverless.py
   [2024-03-09T00:45:58.298+0000] {processor.py:840} INFO - DAG(s) 
'example_emr_serverless' retrieved from 
/files/dags/example_mapped_emr_serverless.py
   [2024-03-09T00:45:58.315+0000] {logging_mixin.py:188} INFO - 
[2024-03-09T00:45:58.310+0000] {dagbag.py:649} ERROR - Failed to write 
serialized DAG: /files/dags/example_mapped_emr_serverless.py
   Traceback (most recent call last):
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1354, in serialize_dag
       serialized_dag["tasks"] = [cls.serialize(task) for _, task in 
dag.task_dict.items()]
                                 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1354, in <listcomp>
       serialized_dag["tasks"] = [cls.serialize(task) for _, task in 
dag.task_dict.items()]
                                  ^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 462, 
in serialize
       return SerializedBaseOperator.serialize_mapped_operator(var)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 857, 
in serialize_mapped_operator
       serialized_op = cls._serialize_node(op, include_deps=op.deps != 
MappedOperator.deps_for(BaseOperator))
                       
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 899, 
in _serialize_node
       op.operator_extra_links.__get__(op)
     File "/opt/airflow/airflow/providers/amazon/aws/operators/emr.py", line 
1273, in operator_extra_links
       if self.is_monitoring_in_job_override("s3MonitoringConfiguration", 
configuration_overrides):
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   AttributeError: 'MappedOperator' object has no attribute 
'is_monitoring_in_job_override'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/dagbag.py", line 637, in 
_serialize_dag_capturing_errors
       dag_was_updated = SerializedDagModel.write_dag(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/session.py", line 76, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/serialized_dag.py", line 166, in 
write_dag
       new_serialized_dag = cls(dag, processor_subdir)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "<string>", line 4, in __init__
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/state.py", 
line 481, in _initialize_instance
       with util.safe_reraise():
     File 
"/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/state.py", 
line 479, in _initialize_instance
       return manager.original_init(*mixed[1:], **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/serialized_dag.py", line 113, in __init__
       dag_data = SerializedDAG.to_dict(dag)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1463, in to_dict
       json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": 
cls.serialize_dag(var)}
                                                                
^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1378, in serialize_dag
       raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}")
   airflow.exceptions.SerializationError: Failed to serialize DAG 
'example_emr_serverless': 'MappedOperator' object has no attribute 
'is_monitoring_in_job_override'
   [2024-03-09T00:45:58.316+0000] {logging_mixin.py:188} INFO - 
[2024-03-09T00:45:58.316+0000] {dag.py:3068} INFO - Sync 1 DAGs
   [2024-03-09T00:45:58.326+0000] {logging_mixin.py:188} INFO - 
[2024-03-09T00:45:58.326+0000] {dag.py:3912} INFO - Setting next_dagrun for 
example_emr_serverless to 2021-01-01 00:00:00+00:00, run_after=2021-01-01 
00:00:00+00:00
   [2024-03-09T00:45:58.345+0000] {processor.py:183} INFO - Processing 
/files/dags/example_mapped_emr_serverless.py took 0.073 seconds


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to