alex-astronomer commented on a change in pull request #22331:
URL: https://github.com/apache/airflow/pull/22331#discussion_r829197490
##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
return str(result['object_id'])
return None
+
+ def import_notebook(self, notebook_name: str, raw_code: str, language:
str, overwrite: bool = True, format: str = 'SOURCE'):
+ """
+ Import a local notebook from Airflow into Databricks FS. Notebooks
saved to /Shared/airflow dbfs
+
+ Utility function to call the ``2.0/workspace/import`` endpoint.
+
+ :param notebook_name: String name of notebook on Databricks FS
+ :param raw_code: String of non-encoded code
+ :param language: Use one of the following strings 'SCALA', 'PYTHON',
'SQL', OR 'R'
+ :param overwrite: Boolean flag specifying whether to overwrite
existing object. It is true by default
+ :return: full dbfs notebook path
+ """
+ #enforce language options
+ language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+ if language.upper() not in language_options:
+ raise ValueError(f"results: language must be one of the following:
{str(language_options)}")
+
+ # enforce format options
+ format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+ if format.upper() not in format_options:
+ raise ValueError(f"results: format must be one of the following:
{str(format_options)}")
+
+ # encode notebook
+ encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+ encodedStr = str(encodedBytes, "utf-8")
+
+ # create parent directory if not exists
+ self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path':
"/Shared/airflow"})
+
+ # upload notebook
+ json = {
+ 'path': f'/Shared/airflow/{notebook_name}',
+ 'content': encodedStr,
+ 'language': language,
+ 'overwrite': str(overwrite).lower(),
Review comment:
If overwrite is false and the file exists the import endpoint will
return a [RESOURCE_ALREADY_EXISTS
error](https://docs.databricks.com/dev-tools/api/latest/workspace.html#import).
Maybe worth catching and printing an INFO message letting the user know that
their file was not uploaded.
##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
return str(result['object_id'])
return None
+
+ def import_notebook(self, notebook_name: str, raw_code: str, language:
str, overwrite: bool = True, format: str = 'SOURCE'):
+ """
+ Import a local notebook from Airflow into Databricks FS. Notebooks
saved to /Shared/airflow dbfs
+
+ Utility function to call the ``2.0/workspace/import`` endpoint.
+
+ :param notebook_name: String name of notebook on Databricks FS
+ :param raw_code: String of non-encoded code
+ :param language: Use one of the following strings 'SCALA', 'PYTHON',
'SQL', OR 'R'
+ :param overwrite: Boolean flag specifying whether to overwrite
existing object. It is true by default
+ :return: full dbfs notebook path
+ """
+ #enforce language options
+ language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+ if language.upper() not in language_options:
+ raise ValueError(f"results: language must be one of the following:
{str(language_options)}")
+
+ # enforce format options
+ format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+ if format.upper() not in format_options:
+ raise ValueError(f"results: format must be one of the following:
{str(format_options)}")
+
+ # encode notebook
+ encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+ encodedStr = str(encodedBytes, "utf-8")
+
+ # create parent directory if not exists
+ self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path':
"/Shared/airflow"})
Review comment:
This will return an [error if the directory already
exists](https://docs.databricks.com/dev-tools/api/latest/workspace.html#mkdirs).
Might be worth catching that error and printing out a DEBUG log saying that
the directory already exists.
##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
return str(result['object_id'])
return None
+
+ def import_notebook(self, notebook_name: str, raw_code: str, language:
str, overwrite: bool = True, format: str = 'SOURCE'):
+ """
+ Import a local notebook from Airflow into Databricks FS. Notebooks
saved to /Shared/airflow dbfs
+
+ Utility function to call the ``2.0/workspace/import`` endpoint.
+
+ :param notebook_name: String name of notebook on Databricks FS
+ :param raw_code: String of non-encoded code
+ :param language: Use one of the following strings 'SCALA', 'PYTHON',
'SQL', OR 'R'
+ :param overwrite: Boolean flag specifying whether to overwrite
existing object. It is true by default
+ :return: full dbfs notebook path
+ """
+ #enforce language options
+ language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+ if language.upper() not in language_options:
+ raise ValueError(f"results: language must be one of the following:
{str(language_options)}")
+
+ # enforce format options
+ format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+ if format.upper() not in format_options:
+ raise ValueError(f"results: format must be one of the following:
{str(format_options)}")
+
+ # encode notebook
+ encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+ encodedStr = str(encodedBytes, "utf-8")
Review comment:
Nitpick here, but snake case might be more appropriate according to
[Style Guide](https://peps.python.org/pep-0008/#function-and-variable-names)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]