kaxil commented on a change in pull request #5743: [AIRFLOW-5088][AIP-24]
Persisting serialized DAG in DB for webserver scalability
URL: https://github.com/apache/airflow/pull/5743#discussion_r337769592
##########
File path: airflow/dag/serialization/serialization.py
##########
@@ -53,81 +50,81 @@ class Serialization:
_primitive_types = (int, bool, float, str)
# Time types.
- _datetime_types = (datetime.datetime, datetime.date, datetime.time)
+ # datetime.date and datetime.time are converted to strings.
+ _datetime_types = (datetime.datetime,)
# Object types that are always excluded in serialization.
# FIXME: not needed if _included_fields of DAG and operator are customized.
_excluded_types = (logging.Logger, Connection, type)
- _json_schema = None # type: Optional[Dict]
+ _json_schema = None # type: Optional[Validator]
+
+ _CONSTRUCTOR_PARAMS = {} # type: Dict[str, Parameter]
+
+ SERIALIZER_VERSION = 1
@classmethod
def to_json(cls, var: Union[DAG, BaseOperator, dict, list, set, tuple]) ->
str:
"""Stringifies DAGs and operators contained by var and returns a JSON
string of var.
"""
- return json.dumps(cls._serialize(var, {}), ensure_ascii=True)
+ return json.dumps(cls.to_dict(var), ensure_ascii=True)
@classmethod
- def from_json(cls, json_str: str) -> Union[
+ def to_dict(cls, var: Union[DAG, BaseOperator, dict, list, set, tuple]) ->
dict:
+ """Stringifies DAGs and operators contained by var and returns a dict
of var.
+ """
+ # Don't call on this class directly - only SerializedDAG or
+ # SerializedBaseOperator should be used as the "entrypoint"
+ raise NotImplementedError()
+
+ @classmethod
+ def from_json(cls, serialized_obj: str) -> Union[
'SerializedDAG', 'SerializedBaseOperator', dict, list, set, tuple]:
"""Deserializes json_str and reconstructs all DAGs and operators it
contains."""
- return cls._deserialize(json.loads(json_str), {})
+ return cls.from_dict(json.loads(serialized_obj))
@classmethod
- def validate_json(cls, json_str: str):
- """Validate json_str satisfies JSON schema."""
+ def from_dict(cls, serialized_obj: dict) -> Union[
+ 'SerializedDAG', 'SerializedBaseOperator', dict, list, set, tuple]:
+ """Deserializes a python dict stored with type decorators and
+ reconstructs all DAGs and operators it contains."""
+ return cls._deserialize(serialized_obj)
+
+ @classmethod
+ def validate_schema(cls, serialized_obj: Union[str, dict]):
+ """Validate serialized_obj satisfies JSON schema."""
if cls._json_schema is None:
raise AirflowException('JSON schema of {:s} is not
set.'.format(cls.__name__))
- jsonschema.validate(json.loads(json_str), cls._json_schema)
+
+ if isinstance(serialized_obj, dict):
+ cls._json_schema.validate(serialized_obj)
+ elif isinstance(serialized_obj, str):
+ cls._json_schema.validate(json.loads(serialized_obj))
Review comment:
Updated
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services