kaxil commented on a change in pull request #4314: [AIRFLOW-3398] Google Cloud
Spanner instance database query operator
URL: https://github.com/apache/incubator-airflow/pull/4314#discussion_r242005377
##########
File path: airflow/contrib/operators/gcp_spanner_operator.py
##########
@@ -130,3 +132,67 @@ def execute(self, context):
self.log.info("Instance '%s' does not exist in project '%s'. "
"Aborting delete.", self.instance_id,
self.project_id)
return True
+
+
+class CloudSpannerInstanceDatabaseQueryOperator(BaseOperator):
+ """
+ Executes an arbitrary DML query (INSERT, UPDATE, DELETE).
+
+ :param project_id: The ID of the project which owns the instances, tables
and data.
+ :type project_id: str
+ :param instance_id: The ID of the instance.
+ :type instance_id: str
+ :param database_id: The ID of the database.
+ :type database_id: str
+ :param query: The query or list of queries to be executed. Can be a path
to a SQL file.
+ :type query: str or list
+ :param gcp_conn_id: The connection ID used to connect to Google Cloud
Platform.
+ :type gcp_conn_id: str
+ """
+ # [START gcp_spanner_query_template_fields]
+ template_fields = ('project_id', 'instance_id', 'database_id', 'query',
'gcp_conn_id')
+ template_ext = ('.sql',)
+ # [END gcp_spanner_query_template_fields]
+
+ @apply_defaults
+ def __init__(self,
+ project_id,
+ instance_id,
+ database_id,
+ query,
+ gcp_conn_id='google_cloud_default',
+ *args, **kwargs):
+ self.instance_id = instance_id
+ self.project_id = project_id
+ self.database_id = database_id
+ self.query = query
+ self.gcp_conn_id = gcp_conn_id
+ self._validate_inputs()
+ self._hook = CloudSpannerHook(gcp_conn_id=gcp_conn_id)
+ super(CloudSpannerInstanceDatabaseQueryOperator, self).__init__(*args,
**kwargs)
+
+ def _validate_inputs(self):
+ if not self.project_id:
+ raise AirflowException("The required parameter 'project_id' is
empty")
+ if not self.instance_id:
+ raise AirflowException("The required parameter 'instance_id' is
empty")
+ if not self.database_id:
+ raise AirflowException("The required parameter 'database_id' is
empty")
+ if not self.query:
+ raise AirflowException("The required parameter 'query' is empty")
+
+ def execute(self, context):
+ queries = self.query
+ if isinstance(self.query, six.string_types):
+ queries = [x.strip() for x in self.query.split(';')]
+ self.sanitize_queries(queries)
+ self.log.info("Executing DML query(-ies) on
projects/{}/instances/{}/databases/{}"
+ .format(self.project_id, self.instance_id,
self.database_id))
Review comment:
> Using logger.info("string template {}".format(argument)) should be avoided
whenever possible in favor of logger.info("string template %s", argument). This
is a better practice, as the actual string interpolation will be used only if
the log will be emitted. Not doing so can lead to wasted cycles when we are
logging on a level over INFO, as the interpolation will still occur.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services