mik-laj commented on a change in pull request #6123: [AIRFLOW-5502][depends on 
AIRFLOW-5498, AIRFLOW-5499] Move GCP base hook to core
URL: https://github.com/apache/airflow/pull/6123#discussion_r325936746
 
 

 ##########
 File path: airflow/contrib/operators/google_api_to_s3_transfer.py
 ##########
 @@ -16,162 +16,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-"""
-This module allows you to transfer data from any Google API endpoint into a S3 
Bucket.
-"""
-import json
-import sys
-
-from airflow.models import BaseOperator
-from airflow.models.xcom import MAX_XCOM_SIZE
-from airflow.utils.decorators import apply_defaults
-
-from airflow.contrib.hooks.google_discovery_api_hook import 
GoogleDiscoveryApiHook
-from airflow.hooks.S3_hook import S3Hook
-
-
-class GoogleApiToS3Transfer(BaseOperator):
-    """
-    Basic class for transferring data from a Google API endpoint into a S3 
Bucket.
-
-    :param google_api_service_name: The specific API service that is being 
requested.
-    :type google_api_service_name: str
-    :param google_api_service_version: The version of the API that is being 
requested.
-    :type google_api_service_version: str
-    :param google_api_endpoint_path: The client libraries path to the api 
call's executing method.
-        For example: 'analyticsreporting.reports.batchGet'
-
-        .. note:: See https://developers.google.com/apis-explorer
-            for more information on which methods are available.
-
-    :type google_api_endpoint_path: str
-    :param google_api_endpoint_params: The params to control the corresponding 
endpoint result.
-    :type google_api_endpoint_params: dict
-    :param s3_destination_key: The url where to put the data retrieved from 
the endpoint in S3.
-    :type s3_destination_key: str
-    :param google_api_response_via_xcom: Can be set to expose the google api 
response to xcom.
-    :type google_api_response_via_xcom: str
-    :param google_api_endpoint_params_via_xcom: If set to a value this value 
will be used as a key
-        for pulling from xcom and updating the google api endpoint params.
-    :type google_api_endpoint_params_via_xcom: str
-    :param google_api_endpoint_params_via_xcom_task_ids: Task ids to filter 
xcom by.
-    :type google_api_endpoint_params_via_xcom_task_ids: str or list of str
-    :param google_api_pagination: If set to True Pagination will be enabled 
for this request
-        to retrieve all data.
-
-        .. note:: This means the response will be a list of responses.
-
-    :type google_api_pagination: bool
-    :param google_api_num_retries: Define the number of retries for the google 
api requests being made
-        if it fails.
-    :type google_api_num_retries: int
-    :param s3_overwrite: Specifies whether the s3 file will be overwritten if 
exists.
-    :type s3_overwrite: bool
-    :param gcp_conn_id: The connection ID to use when fetching connection info.
-    :type gcp_conn_id: string
-    :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have
-        domain-wide delegation enabled.
-    :type delegate_to: string
-    :param aws_conn_id: The connection id specifying the authentication 
information for the S3 Bucket.
-    :type aws_conn_id: str
-    """
-
-    template_fields = (
-        'google_api_endpoint_params',
-        's3_destination_key',
-    )
-    template_ext = ()
-    ui_color = '#cc181e'
-
-    @apply_defaults
-    def __init__(
-        self,
-        google_api_service_name,
-        google_api_service_version,
-        google_api_endpoint_path,
-        google_api_endpoint_params,
-        s3_destination_key,
-        *args,
-        google_api_response_via_xcom=None,
-        google_api_endpoint_params_via_xcom=None,
-        google_api_endpoint_params_via_xcom_task_ids=None,
-        google_api_pagination=False,
-        google_api_num_retries=0,
-        s3_overwrite=False,
-        gcp_conn_id='google_cloud_default',
-        delegate_to=None,
-        aws_conn_id='aws_default',
-        **kwargs
-    ):
-        super(GoogleApiToS3Transfer, self).__init__(*args, **kwargs)
-        self.google_api_service_name = google_api_service_name
-        self.google_api_service_version = google_api_service_version
-        self.google_api_endpoint_path = google_api_endpoint_path
-        self.google_api_endpoint_params = google_api_endpoint_params
-        self.s3_destination_key = s3_destination_key
-        self.google_api_response_via_xcom = google_api_response_via_xcom
-        self.google_api_endpoint_params_via_xcom = 
google_api_endpoint_params_via_xcom
-        self.google_api_endpoint_params_via_xcom_task_ids = 
google_api_endpoint_params_via_xcom_task_ids
-        self.google_api_pagination = google_api_pagination
-        self.google_api_num_retries = google_api_num_retries
-        self.s3_overwrite = s3_overwrite
-        self.gcp_conn_id = gcp_conn_id
-        self.delegate_to = delegate_to
-        self.aws_conn_id = aws_conn_id
-
-    def execute(self, context):
-        """
-        Transfers Google APIs json data to S3.
-
-        :param context: The context that is being provided when executing.
-        :type context: dict
-        """
-        self.log.info('Transferring data from %s to s3', 
self.google_api_service_name)
-
-        if self.google_api_endpoint_params_via_xcom:
-            
self._update_google_api_endpoint_params_via_xcom(context['task_instance'])
-
-        data = self._retrieve_data_from_google_api()
-
-        self._load_data_to_s3(data)
-
-        if self.google_api_response_via_xcom:
-            
self._expose_google_api_response_via_xcom(context['task_instance'], data)
-
-    def _retrieve_data_from_google_api(self):
-        google_discovery_api_hook = GoogleDiscoveryApiHook(
-            gcp_conn_id=self.gcp_conn_id,
-            delegate_to=self.delegate_to,
-            api_service_name=self.google_api_service_name,
-            api_version=self.google_api_service_version
-        )
-        google_api_response = google_discovery_api_hook.query(
-            endpoint=self.google_api_endpoint_path,
-            data=self.google_api_endpoint_params,
-            paginate=self.google_api_pagination,
-            num_retries=self.google_api_num_retries
-        )
-        return google_api_response
+"""This module is deprecated. Please use 
`airflow.operators.google_api_to_s3_transfer`."""
 
-    def _load_data_to_s3(self, data):
-        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
-        s3_hook.load_string(
-            string_data=json.dumps(data),
-            key=self.s3_destination_key,
-            replace=self.s3_overwrite
-        )
+import warnings
 
-    def _update_google_api_endpoint_params_via_xcom(self, task_instance):
-        google_api_endpoint_params = task_instance.xcom_pull(
-            task_ids=self.google_api_endpoint_params_via_xcom_task_ids,
-            key=self.google_api_endpoint_params_via_xcom
-        )
-        self.google_api_endpoint_params.update(google_api_endpoint_params)
+# pylint: disable=unused-import
+from airflow.operators.google_api_to_s3_transfer import GoogleApiToS3Transfer  
# noqa
 
-    def _expose_google_api_response_via_xcom(self, task_instance, data):
-        if sys.getsizeof(data) < MAX_XCOM_SIZE:
-            task_instance.xcom_push(key=self.google_api_response_via_xcom, 
value=data)
-        else:
-            raise RuntimeError('The size of the downloaded data is too large 
to push to XCom!')
+warnings.warn(
+    "This module is deprecated. Please use 
`airflow.operators.google_api_to_s3_transfer`.",
+    DeprecationWarning,
 
 Review comment:
   Can you add stacklevel here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to