kaxil commented on a change in pull request #4314: [AIRFLOW-3398] Google Cloud 
Spanner instance database query operator
URL: https://github.com/apache/incubator-airflow/pull/4314#discussion_r242003606
 
 

 ##########
 File path: airflow/contrib/operators/gcp_spanner_operator.py
 ##########
 @@ -130,3 +132,67 @@ def execute(self, context):
             self.log.info("Instance '%s' does not exist in project '%s'. "
                           "Aborting delete.", self.instance_id, 
self.project_id)
             return True
+
+
+class CloudSpannerInstanceDatabaseQueryOperator(BaseOperator):
+    """
+    Executes an arbitrary DML query (INSERT, UPDATE, DELETE).
+
+    :param project_id: The ID of the project which owns the instances, tables 
and data.
+    :type project_id: str
+    :param instance_id: The ID of the instance.
+    :type instance_id: str
+    :param database_id: The ID of the database.
+    :type database_id: str
+    :param query: The query or list of queries to be executed. Can be a path 
to a SQL file.
+    :type query: str or list
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    """
+    # [START gcp_spanner_query_template_fields]
+    template_fields = ('project_id', 'instance_id', 'database_id', 'query', 
'gcp_conn_id')
+    template_ext = ('.sql',)
+    # [END gcp_spanner_query_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 database_id,
+                 query,
+                 gcp_conn_id='google_cloud_default',
+                 *args, **kwargs):
+        self.instance_id = instance_id
+        self.project_id = project_id
+        self.database_id = database_id
+        self.query = query
+        self.gcp_conn_id = gcp_conn_id
+        self._validate_inputs()
+        self._hook = CloudSpannerHook(gcp_conn_id=gcp_conn_id)
+        super(CloudSpannerInstanceDatabaseQueryOperator, self).__init__(*args, 
**kwargs)
+
+    def _validate_inputs(self):
+        if not self.project_id:
+            raise AirflowException("The required parameter 'project_id' is 
empty")
+        if not self.instance_id:
+            raise AirflowException("The required parameter 'instance_id' is 
empty")
+        if not self.database_id:
+            raise AirflowException("The required parameter 'database_id' is 
empty")
+        if not self.query:
+            raise AirflowException("The required parameter 'query' is empty")
+
+    def execute(self, context):
+        queries = self.query
+        if isinstance(self.query, six.string_types):
+            queries = [x.strip() for x in self.query.split(';')]
+            self.sanitize_queries(queries)
+        self.log.info("Executing DML query(-ies) on 
projects/{}/instances/{}/databases/{}"
+                      .format(self.project_id, self.instance_id, 
self.database_id))
 
 Review comment:
   Can we change this to %s format for logging  - 
https://docs.python.org/3/howto/logging-cookbook.html

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to