pohek321 commented on a change in pull request #22331:
URL: https://github.com/apache/airflow/pull/22331#discussion_r829272016



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})

Review comment:
       So, I thought this same thing based on Databricks docs, but I've run it 
and it doesn't fail. Should I still add the catch?

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})

Review comment:
       So, I thought this same thing based on Databricks docs, but I've run it 
on a pre-existing directory and it doesn't fail. Should I still add the catch?

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")

Review comment:
       Done.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})
+
+        # upload notebook
+        json = {
+            'path': f'/Shared/airflow/{notebook_name}',

Review comment:
       Added and tested. Works great!

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):

Review comment:
       Added and tested. Works great!

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")

Review comment:
       I simply removed the `raise ValueError` to see what the error would be 
in the event an unsupported language was sent to the API call, I just get back 
the following error: `"error_code":"INVALID_PARAMETER_VALUE"`. @eladkal, Is 
this sufficient? Or do we want to do further handling?

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})
+
+        # upload notebook
+        json = {
+            'path': f'/Shared/airflow/{notebook_name}',
+            'content': encodedStr,
+            'language': language,
+            'overwrite': str(overwrite).lower(),
+            'format': format
+        }
+        self._do_api_call(WORKSPACE_IMPORT_ENDPOINT, json)
+
+        return f'/Shared/airflow/{notebook_name}'

Review comment:
       The databricks api endpoints return nothing if the api call is 
successful. I'm assuming that we want to just return nothing at all instead of 
the DBFS path. Then, just handle the rest of the potential errors that could be 
raised from the initial api call. 

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})

Review comment:
       Despite what the docs say the `RESOURCE_ALREADY_EXISTS` error doesn't 
appear when I re-run the task. Nothing gets returned from the api endpoint.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
+
+        # encode notebook
+        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
+        encodedStr = str(encodedBytes, "utf-8")
+
+        # create parent directory if not exists
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})
+
+        # upload notebook
+        json = {
+            'path': f'/Shared/airflow/{notebook_name}',
+            'content': encodedStr,
+            'language': language,
+            'overwrite': str(overwrite).lower(),

Review comment:
       Added exception handling for this scenario.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")

Review comment:
       Added exception handling that refers users to Databricks docs for a list 
of acceptable values so that we don't have to maintain the list of acceptable 
values.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -352,3 +355,44 @@ def get_repo_by_path(self, path: str) -> Optional[str]:
             return str(result['object_id'])
 
         return None
+
+    def import_notebook(self, notebook_name: str, raw_code: str, language: 
str, overwrite: bool = True, format: str = 'SOURCE'):
+        """
+        Import a local notebook from Airflow into Databricks FS. Notebooks 
saved to /Shared/airflow dbfs
+
+        Utility function to call the ``2.0/workspace/import`` endpoint.
+
+        :param notebook_name: String name of notebook on Databricks FS
+        :param raw_code: String of non-encoded code
+        :param language: Use one of the following strings 'SCALA', 'PYTHON', 
'SQL', OR 'R'
+        :param overwrite: Boolean flag specifying whether to overwrite 
existing object. It is true by default
+        :return: full dbfs notebook path
+        """
+        #enforce language options
+        language_options = ['SCALA', 'PYTHON', 'SQL', 'R']
+        if language.upper() not in language_options:
+            raise ValueError(f"results: language must be one of the following: 
{str(language_options)}")
+
+        # enforce format options
+        format_options = ['SOURCE', 'HTML', 'JUPYTER', 'DBC']
+        if format.upper() not in format_options:
+            raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")

Review comment:
       After reading the docs, if a language is specified, then format should 
always be 'SOURCE'. Updated the method to follow this recommendation.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -379,20 +379,27 @@ def import_notebook(self, notebook_name: str, raw_code: 
str, language: str, over
             raise ValueError(f"results: format must be one of the following: 
{str(format_options)}")
 
         # encode notebook
-        encodedBytes = base64.b64encode(raw_code.encode("utf-8"))
-        encodedStr = str(encodedBytes, "utf-8")
+        encoded_bytes = base64.b64encode(raw_code.encode("utf-8"))
+        encoded_str = str(encoded_bytes, "utf-8")
 
         # create parent directory if not exists
-        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': 
"/Shared/airflow"})
+        path_parts = dbfs_path.split('/')
+        path_parts.pop(0)
+        path_parts = path_parts[:-1]
+
+        path = ''
+        for part in path_parts:
+            path += f'/{part}'
+        self._do_api_call(WORKSPACE_MKDIR_ENDPOINT, {'path': path})

Review comment:
       I think we might have a disconnect here. This isn't using the os.path 
lib, it's just a string. That being said, would it still need to be be tested 
on different machines?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to