ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r705677631



##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the 
validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all 
kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, 
format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = 
False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the 
validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, 
format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly 
string and values
+    are converted into Param's object if they are not already. This class is 
to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: 
bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while 
initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)

Review comment:
       ```suggestion
           super().__init__(params_dict)
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the 
validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all 
kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, 
format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = 
False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the 
validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, 
format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly 
string and values
+    are converted into Param's object if they are not already. This class is 
to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: 
bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while 
initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure 
that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could 
be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in 
dict already
+            # if the new value is of Param type, then just use it otherwise 
call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, 
suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)
+
+        dict.__setitem__(self, key, param)

Review comment:
       ```suggestion
           super().__setitem__(key, param)
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the 
validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all 
kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, 
format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = 
False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the 
validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, 
format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly 
string and values
+    are converted into Param's object if they are not already. This class is 
to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: 
bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while 
initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure 
that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could 
be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in 
dict already
+            # if the new value is of Param type, then just use it otherwise 
call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, 
suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)

Review comment:
       Slightly simpler version (that doesn't needlessly get the param if we 
don't need to check it etc.)
   
   ```suggestion
           if isinstance(value, Param):
               param = value
           elif key in self:
               param = dict.__getitem__(self, key)
               param.resolve(value=value, 
suppress_exception=self.suppress_exception)
           else:
               # if the key isn't there already and if the value isn't of Param 
type create a new Param object
               param = Param(value)
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the 
validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all 
kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, 
format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = 
False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the 
validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, 
format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly 
string and values
+    are converted into Param's object if they are not already. This class is 
to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: 
bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while 
initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure 
that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could 
be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in 
dict already
+            # if the new value is of Param type, then just use it otherwise 
call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, 
suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)
+
+        dict.__setitem__(self, key, param)
+
+    def __getitem__(self, key: str) -> Any:
+        """
+        Override for dictionary's ``getitem`` method. After fetching the key, 
it would call the
+        resolve method as well on the Param object.
+
+        :param key: The key to fetch
+        :type key: str
+        """
+        param = dict.__getitem__(self, key)

Review comment:
       ```suggestion
           param = super().__getitem__(key)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to