potiuk commented on a change in pull request #4170: [AIRFLOW-3275] Implement
Google Cloud SQL Query operator
URL: https://github.com/apache/incubator-airflow/pull/4170#discussion_r232602991
##########
File path: airflow/contrib/hooks/gcp_sql_hook.py
##########
@@ -261,3 +280,559 @@ def _wait_for_operation_to_complete(self, project_id,
operation_name):
# No meaningful info to return from the response in case of
success
return True
time.sleep(TIME_TO_SLEEP_IN_SECONDS)
+
+
+CLOUD_SQL_PROXY_DOWNLOAD_URL =
"https://dl.google.com/cloudsql/cloud_sql_proxy.{}.{}"
+CLOUD_SQL_PROXY_VERSION_DOWNLOAD_URL = \
+ "https://storage.googleapis.com/cloudsql-proxy/{}/cloud_sql_proxy.{}.{}"
+
+GCP_CREDENTIALS_KEY_PATH = "extra__google_cloud_platform__key_path"
+GCP_CREDENTIALS_KEYFILE_DICT = "extra__google_cloud_platform__keyfile_dict"
+
+
+class CloudSqlProxyRunner(LoggingMixin):
+ """Downloads and runs cloud-sql-proxy as subprocess of the python process.
+
+ The cloud-sql-proxy needs to be downloaded and started before we can
connect
+ to the Google Cloud SQL instance via database connection. It establishes
+ secure tunnel connection to the database - it authorizes using the
+ GCP credentials that are passed by the configuration.
+
+ More details about the proxy can be found here:
+ https://cloud.google.com/sql/docs/mysql/sql-proxy
+
+ """
+ def __init__(self,
+ path_prefix,
+ instance_specification,
+ gcp_conn_id='google_cloud_default',
+ project_id=None,
+ sql_proxy_version=None,
+ sql_proxy_binary_path=None):
+ """
+ Creates the proxy runner class.
+
+ :param path_prefix: unique path prefix where proxy will be downloaded
and
+ directories created for unix sockets.
+ :type path_prefix: str
+ :param instance_specification: specification of the instance to
connect the
+ proxy to. It should be specified in the form that is described in
+
https://cloud.google.com/sql/docs/mysql/sql-proxy#multiple-instances in
+ -instances parameter (typically in the form of
<project>:<region>:<instance>
+ for UNIX socket connections and in the form of
+ <project>:<region>:<instance>=tcp:<port> for TCP connections.
+ :type instance_specification: str
+ :param gcp_conn_id: id of Google Cloud Platform connection to use for
+ authentication
+ :type: str
+ :param project_id: optional id of the GCP project to connect to - it
overwrites
+ default project id taken from the GCP connection
+ :type project_id: str
+ :param sql_proxy_version: specific version of SQL proxy to download
+ (for example 'v1.13'). By default latest version is downloaded.
+ :type sql_proxy_version: str
+ :param sql_proxy_binary_path: If specified, then proxy will be
+ used from the path specified rather than dynamically generated.
This means
+ that if the binary is not present in that path it will also be
downloaded.
+ :type sql_proxy_binary_path: str
+ """
+ super(CloudSqlProxyRunner, self).__init__()
+ self.path_prefix = path_prefix
+ if not self.path_prefix:
+ raise AirflowException("The path_prefix must not be empty!")
+ self.sql_proxy_was_downloaded = False
+ self.sql_proxy_version = sql_proxy_version
+ self.download_sql_proxy_dir = None
+ self.sql_proxy_process = None
+ self.instance_specification = instance_specification
+ self.project_id = project_id
+ self.gcp_conn_id = gcp_conn_id
+ self.command_line_parameters = []
+ self.cloud_sql_proxy_socket_directory = self.path_prefix
+ self.sql_proxy_path = sql_proxy_binary_path if sql_proxy_binary_path \
+ else self.path_prefix + "_cloud_sql_proxy"
+ self.credentials_path = self.path_prefix + "_credentials.json"
+ self._build_command_line_parameters()
+
+ def _build_command_line_parameters(self):
+ self.command_line_parameters.extend(
+ ['-dir', self.cloud_sql_proxy_socket_directory])
+ self.command_line_parameters.extend(
+ ['-instances', self.instance_specification])
+
+ @staticmethod
+ def _is_os_64bit():
+ return platform.machine().endswith('64')
+
+ def _download_sql_proxy_if_needed(self):
+ if os.path.isfile(self.sql_proxy_path):
+ self.log.info("cloud-sql-proxy is already present")
+ return
+ system = platform.system().lower()
+ processor = "amd64" if CloudSqlProxyRunner._is_os_64bit() else "386"
+ if not self.sql_proxy_version:
+ download_url = CLOUD_SQL_PROXY_DOWNLOAD_URL.format(system,
processor)
+ else:
+ download_url = CLOUD_SQL_PROXY_VERSION_DOWNLOAD_URL.format(
+ self.sql_proxy_version, system, processor)
+ proxy_path_tmp = self.sql_proxy_path + ".tmp"
+ self.log.info("Downloading cloud_sql_proxy from {} to {}".
+ format(download_url, proxy_path_tmp))
+ r = requests.get(download_url, allow_redirects=True)
+ # Downloading to .tmp file first to avoid case where partially
downloaded
+ # binary is used by parallel operator which uses the same fixed binary
path
+ with open(proxy_path_tmp, 'wb') as f:
+ f.write(r.content)
+ if r.status_code != 200:
+ raise AirflowException(
+ "The cloud-sql-proxy could not be downloaded. Status code =
{}. "
+ "Reason = {}".format(r.status_code, r.reason))
+ self.log.info("Moving sql_proxy binary from {} to {}".format(
+ proxy_path_tmp, self.sql_proxy_path
+ ))
+ shutil.move(proxy_path_tmp, self.sql_proxy_path)
+ os.chmod(self.sql_proxy_path, 0o744) # Set executable bit
+ self.sql_proxy_was_downloaded = True
+
+ @provide_session
+ def _get_credential_parameters(self, session):
+ connection = session.query(models.Connection). \
+ filter(models.Connection.conn_id == self.gcp_conn_id).first()
+ session.expunge_all()
+ if GCP_CREDENTIALS_KEY_PATH in connection.extra_dejson:
+ credential_params = [
+ '-credential_file',
+ connection.extra_dejson[GCP_CREDENTIALS_KEY_PATH]
+ ]
+ elif GCP_CREDENTIALS_KEYFILE_DICT in connection.extra_dejson:
+ credential_file_content = json.loads(
+ connection.extra_dejson[GCP_CREDENTIALS_KEYFILE_DICT])
+ self.log.info("Saving credentials to
{}".format(self.credentials_path))
+ with open(self.credentials_path, "w") as f:
+ json.dump(credential_file_content, f)
+ credential_params = [
+ '-credential_file',
+ self.credentials_path
+ ]
+ else:
+ self.log.info(
+ "The credentials are not supplied by neither key_path nor "
+ "keyfile_dict of the gcp connection {}. Falling back to "
+ "default activated account".format(self.gcp_conn_id))
+ credential_params = []
+
+ if not self.instance_specification:
+ project_id = connection.extra_dejson.get(
+ 'extra__google_cloud_platform__project')
+ if self.project_id:
+ project_id = self.project_id
+ if not project_id:
+ raise AirflowException("For forwarding all instances, the
project id "
+ "for GCP should be provided either "
+ "by project_id extra in the GCP
connection or by "
+ "project_id provided in the operator.")
+ credential_params.extend(['-projects', project_id])
+ return credential_params
+
+ def start_proxy(self):
+ """
+ Starts Cloud Sql Proxy. You have to remember to stop the proxy if you
started it!
+ """
+ self._download_sql_proxy_if_needed()
+ if self.sql_proxy_process:
+ raise AirflowException("The sql proxy is already running:
{}".format(
+ self.sql_proxy_process))
+ else:
+ command_to_run = [self.sql_proxy_path]
+ command_to_run.extend(self.command_line_parameters)
+ try:
+ self.log.info("Creating directory {}".format(
+ self.cloud_sql_proxy_socket_directory))
+ os.makedirs(self.cloud_sql_proxy_socket_directory)
+ except OSError:
+ # Needed for python 2 compatibility (exists_ok missing)
+ pass
+ command_to_run.extend(self._get_credential_parameters())
+ self.log.info("Running the command: `{}`".format("
".join(command_to_run)))
+ self.sql_proxy_process = Popen(command_to_run,
+ stdin=PIPE, stdout=PIPE,
stderr=PIPE)
+ self.log.info("The pid of cloud_sql_proxy: {}".format(
+ self.sql_proxy_process.pid))
+ while True:
+ line = self.sql_proxy_process.stderr.readline()
+ return_code = self.sql_proxy_process.poll()
+ if line == '' and return_code is not None:
+ self.sql_proxy_process = None
+ raise AirflowException(
+ "The cloud_sql_proxy finished early with return code
{}!".format(
+ return_code))
+ if line != '':
+ self.log.info(line)
+ if "googleapi: Error" in line or "invalid instance name:" in
line:
+ self.stop_proxy()
+ raise AirflowException(
+ "Error when starting the cloud_sql_proxy {}!".format(
+ line))
+ if "Ready for new connections" in line:
+ return
+
+ def stop_proxy(self):
+ """
+ Stops running proxy.
+
+ You should stop the proxy after you stop using it.
+ """
+ if not self.sql_proxy_process:
+ raise AirflowException("The sql proxy is not started yet")
+ else:
+ self.log.info("Stopping the cloud_sql_proxy pid: {}".format(
+ self.sql_proxy_process.pid))
+ self.sql_proxy_process.kill()
+ self.sql_proxy_process = None
+ # Cleanup!
+ self.log.info("Removing the socket directory: {}".
+ format(self.cloud_sql_proxy_socket_directory))
+ shutil.rmtree(self.cloud_sql_proxy_socket_directory,
ignore_errors=True)
+ if self.sql_proxy_was_downloaded:
+ self.log.info("Removing downloaded proxy:
{}".format(self.sql_proxy_path))
+ # Silently ignore if the file has already been removed
(concurrency)
+ try:
+ os.remove(self.sql_proxy_path)
+ except OSError as e:
+ if not e.errno == errno.ENOENT:
+ raise
+ else:
+ self.log.info("Skipped removing proxy - it was not downloaded: {}".
+ format(self.sql_proxy_path))
+ if isfile(self.credentials_path):
+ self.log.info("Removing generated credentials file {}".
+ format(self.credentials_path))
+ # Here file cannot be delete by concurrent task (each task has its
own copy)
+ os.remove(self.credentials_path)
+
+ def get_proxy_version(self):
+ """
+ Returns version of the Cloud Sql Proxy.
+ """
+ self._download_sql_proxy_if_needed()
+ command_to_run = [self.sql_proxy_path]
+ command_to_run.extend(['--version'])
+ command_to_run.extend(self._get_credential_parameters())
+ result = subprocess.check_output(command_to_run)
+ pattern = re.compile("^.*[V|v]ersion ([^;]*);.*$")
+ m = pattern.match(result)
+ if m:
+ return m.group(1)
+ else:
+ return None
+
+ def get_socket_path(self):
+ """
+ Retrieves UNIX socket path used by Cloud Sql Proxy.
+
+ :return: The dynamically generated path for the socket created by the
proxy.
+ :rtype: str
+ """
+ return self.cloud_sql_proxy_socket_directory + "/" +
self.instance_specification
+
+
+CONNECTION_URIS = {
+ "postgres": {
+ "proxy": {
+ "tcp":
+
"postgresql://{user}:{password}@127.0.0.1:{proxy_port}/{database}",
+ "socket":
+ "postgresql://{user}:{password}@{socket_path}/{database}"
+ },
+ "public": {
+ "ssl":
+
"postgresql://{user}:{password}@{public_ip}:{public_port}/{database}?"
+ "sslmode=verify-ca&"
+ "sslcert={client_cert_file}&"
+ "sslkey={client_key_file}&"
+ "sslrootcert={server_ca_file}",
+ "non-ssl":
+
"postgresql://{user}:{password}@{public_ip}:{public_port}/{database}"
+ }
+ },
+ "mysql": {
+ "proxy": {
+ "tcp":
+ "mysql://{user}:{password}@127.0.0.1:{proxy_port}/{database}",
+ "socket":
+ "mysql://{user}:{password}@localhost/{database}?"
+ "unix_socket={socket_path}"
+ },
+ "public": {
+ "ssl":
+
"mysql://{user}:{password}@{public_ip}:{public_port}/{database}?"
+ "ssl={ssl_spec}",
+ "non-ssl":
+
"mysql://{user}:{password}@{public_ip}:{public_port}/{database}"
+ }
+ }
+}
+
+CLOUD_SQL_VALID_DATABASE_TYPES = ['postgres', 'mysql']
+
+
+# noinspection PyAbstractClass
+class CloudSqlDatabaseHook(BaseHook):
+ """Serves DB connection configuration for CloudSQL (Connections
+ of *cloudsql://* type).
+
+ The hook is a "meta" one - it does not perform an actual connection,
+ it is there to retrieve all the parameters configured in cloudsql://
connection,
+ start/stop Cloud Sql Proxy if needed, dynamically generate Postgres or
MySQL
+ connection in the database and return an actual Postgres or MySQL hook.
+ The returned Postgres/MySQL hooks are using direct connection or Cloud Sql
+ Proxy socket/tcp as configured.
+
+ Main parameters of the hook are retrieved from the standard URI components:
+
+ * **user** - User name to authenticate to the database (from login of the
URI).
+ * **password** - Password to authenticate to the database (from password
of the URI)
+ * **public_ip** - IP to connect to for public connection (from host of the
URI)
+ * **public_port** - Port to connect to for public connection (from port of
the URI)
+ * **database** - Database to connect to (from schema of the URI)
+
+ Remaining parameters are retrieved from the extras (URI query parameters):
+
+ * **project_id** - Google Cloud Platform project where the Cloud SQL
instance exists.
+ * **instance** - Name of the instance of the Cloud SQL database instance.
+ * **location** - The location of the cloud sql instance (for example
europe-west1).
+ * **database_type** - The type of the database instance (mysql or
postgres).
+ * **use_proxy** - (default False) Whether SQL proxy should be used to
connect to Cloud
+ SQL DB.
+ * **use_ssl** - (default False) Whether SSL should be used to connect to
Cloud SQL DB.
+ You cannot use proxy and ssl together.
+ * **sql_proxy_use_tcp** - (default False) If set to true, TCP is used to
connect via
+ proxy, otherwise UNIX sockets are used.
+ * **sql_proxy_binary_path** - Optional path to sql proxy binary. If the
binary is not
+ specified or the binary is not present, it is automatically downloaded.
+ * **sql_proxy_version** - Specific version of the proxy to download (for
example
+ v1.13). If not specified, the latest version is downloaded.
+ * **sslcert** - Path to client certificate to authenticate when SSL is
used.
+ * **sslkey** - Path to client private key to authenticate when SSL is used.
+ * **sslrootcert** - Path to server's certificate to authenticate when SSL
is used.
+ """
+ _conn = None
+
+ def __init__(self, cloudsql_conn_id='google_cloud_sql_default'):
Review comment:
Good point. Renamed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services