alex-astronomer commented on a change in pull request #22331:
URL: https://github.com/apache/airflow/pull/22331#discussion_r829194925



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})
+
+        # upload notebook
+        json = {
+            'path': f'/Shared/airflow/{notebook_name}',

Review comment:
       Maybe remove the Airflow here, so that users get the opportunity to 
define a path for themselves if their directory conventions are different.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):

Review comment:
       Maybe instead of notebook name we let the user define a notebook path so 
that they can define the directory structure on the FS.  See comment below as 
well in the json section.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})
+
+        # upload notebook
+        json = {
+            'path': f'/Shared/airflow/{notebook_name}',
+            'content': encodedStr,
+            'language': language,
+            'overwrite': str(overwrite).lower(),

Review comment:
       If overwrite is false and the file exists the import endpoint will 
return a [RESOURCE_ALREADY_EXISTS 
error](https://docs.databricks.com/dev-tools/api/latest/workspace.html#import). 
 Maybe worth catching and printing an INFO message letting the user know that 
their file was not uploaded.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})

Review comment:
       This will return an [error if the directory already 
exists](https://docs.databricks.com/dev-tools/api/latest/workspace.html#mkdirs).
  Might be worth catching that error and printing out a DEBUG log saying that 
the directory already exists.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")

Review comment:
       Nitpick here, but snake case might be more appropriate according to 
[Style Guide](https://peps.python.org/pep-0008/#function-and-variable-names)

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})

Review comment:
       Will be more applicable if the user is able to specify a path to create

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})
+
+        # upload notebook
+        json = {
+            'path': f'/Shared/airflow/{notebook_name}',
+            'content': encodedStr,
+            'language': language,
+            'overwrite': str(overwrite).lower(),
+            'format': format
+        }
+        self._do_api_call(WORKSPACE_IMPORT_ENDPOINT, json)
+
+        return f'/Shared/airflow/{notebook_name}'

Review comment:
       I believe that this function would become more testable and have more 
functionality if we return the response of the API call.  If we do error 
returns when the directory or file already exists this would be useful to know 
as a return from the FN.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})

Review comment:
       Looks like _do_api_call returns response.json, so I would take that 
value and then return it at the end.  So success, or RESOURCE_ALREADY_EXISTS.  
Rather than returning the path that the user already has because they passed it 
in via parameters.  Addresses @eladkal's return question at the same time.
   
   I think catch was the wrong word. Rather than "catching" the exception and 
running something else in an `except` block, more what I mean is saving that 
output and somehow reporting that to the user.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):

Review comment:
       Maybe instead of notebook name we let the user define a notebook path so 
that they can define the directory structure on the FS.  See comment above as 
well in the json section.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -379,20 +379,27 @@ def import_notebook(self, notebook_name: str, raw_code: 
str, language: str, over
             raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
 
         # encode notebook
-        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
-        encodedStr = str(encodedBytes, "utf-8")
+        encoded_bytes = base64.b64encode(raw_code.encode("utf-8"))
+        encoded_str = str(encoded_bytes, "utf-8")
 
         # create parent directory if not exists
-        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})
+        path_parts = dbfs_path.split('/')
+        path_parts.pop(0)
+        path_parts = path_parts[:-1]
+
+        path = ''
+        for part in path_parts:
+            path += f'/{part}'
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': path})

Review comment:
       Would be worth testing how this section behaves on different machines.  
I know the os.path lib does a lot of work standardizing this and I've had 
problem in the past with different file systems misbehaving when manipulating 
paths without os.path tools.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to