This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 763919d  Adding custom Salesforce connection type + 
SalesforceToS3Operator updates (#17162)
763919d is described below

commit 763919d4152ffa13433e2489fec85ed286b7b196
Author: josh-fell <[email protected]>
AuthorDate: Sun Jul 25 06:05:59 2021 -0400

    Adding custom Salesforce connection type + SalesforceToS3Operator updates 
(#17162)
---
 .../aws/example_dags/example_salesforce_to_s3.py   |  7 ++-
 .../amazon/aws/transfers/salesforce_to_s3.py       |  6 +--
 .../google/cloud/transfers/salesforce_to_gcs.py    |  2 +-
 airflow/providers/salesforce/hooks/salesforce.py   | 58 ++++++++++++++++------
 airflow/providers/salesforce/provider.yaml         |  1 +
 .../connections/salesforce.rst                     | 49 +++++-------------
 .../amazon/aws/transfers/test_salesforce_to_s3.py  |  8 +--
 .../providers/salesforce/hooks/test_salesforce.py  | 11 ++--
 8 files changed, 71 insertions(+), 71 deletions(-)

diff --git 
a/airflow/providers/amazon/aws/example_dags/example_salesforce_to_s3.py 
b/airflow/providers/amazon/aws/example_dags/example_salesforce_to_s3.py
index ab8edcb..f8fd0db 100644
--- a/airflow/providers/amazon/aws/example_dags/example_salesforce_to_s3.py
+++ b/airflow/providers/amazon/aws/example_dags/example_salesforce_to_s3.py
@@ -54,19 +54,18 @@ with DAG(
 
     store_to_s3_data_lake = S3CopyObjectOperator(
         task_id="store_to_s3_data_lake",
-        
source_bucket_key=upload_salesforce_data_to_s3_landing.output["s3_uri"],
+        source_bucket_key=upload_salesforce_data_to_s3_landing.output,
         dest_bucket_name="data_lake",
         dest_bucket_key=f"{BASE_PATH}/{date_prefixes}/{FILE_NAME}",
     )
 
     delete_data_from_s3_landing = S3DeleteObjectsOperator(
         task_id="delete_data_from_s3_landing",
-        bucket=upload_salesforce_data_to_s3_landing.output["s3_bucket_name"],
-        keys=upload_salesforce_data_to_s3_landing.output["s3_key"],
+        bucket=upload_salesforce_data_to_s3_landing.s3_bucket_name,
+        keys=upload_salesforce_data_to_s3_landing.s3_key,
     )
 
     store_to_s3_data_lake >> delete_data_from_s3_landing
 
     # Task dependencies created via `XComArgs`:
     #   upload_salesforce_data_to_s3_landing >> store_to_s3_data_lake
-    #   upload_salesforce_data_to_s3_landing >> delete_data_from_s3_landing
diff --git a/airflow/providers/amazon/aws/transfers/salesforce_to_s3.py 
b/airflow/providers/amazon/aws/transfers/salesforce_to_s3.py
index 25df2fb..09b49c1 100644
--- a/airflow/providers/amazon/aws/transfers/salesforce_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/salesforce_to_s3.py
@@ -106,8 +106,8 @@ class SalesforceToS3Operator(BaseOperator):
         self.gzip = gzip
         self.acl_policy = acl_policy
 
-    def execute(self, context: Dict) -> Dict:
-        salesforce_hook = SalesforceHook(conn_id=self.salesforce_conn_id)
+    def execute(self, context: Dict) -> str:
+        salesforce_hook = 
SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
         response = salesforce_hook.make_query(
             query=self.salesforce_query,
             include_deleted=self.include_deleted,
@@ -138,4 +138,4 @@ class SalesforceToS3Operator(BaseOperator):
             s3_uri = f"s3://{self.s3_bucket_name}/{self.s3_key}"
             self.log.info(f"Salesforce data uploaded to S3 at {s3_uri}.")
 
-            return {"s3_uri": s3_uri, "s3_bucket_name": self.s3_bucket_name, 
"s3_key": self.s3_key}
+            return s3_uri
diff --git a/airflow/providers/google/cloud/transfers/salesforce_to_gcs.py 
b/airflow/providers/google/cloud/transfers/salesforce_to_gcs.py
index d8179e7..5564b41 100644
--- a/airflow/providers/google/cloud/transfers/salesforce_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/salesforce_to_gcs.py
@@ -97,7 +97,7 @@ class SalesforceToGcsOperator(BaseOperator):
         self.query_params = query_params
 
     def execute(self, context: Dict):
-        salesforce = SalesforceHook(conn_id=self.salesforce_conn_id)
+        salesforce = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
         response = salesforce.make_query(
             query=self.query, include_deleted=self.include_deleted, 
query_params=self.query_params
         )
diff --git a/airflow/providers/salesforce/hooks/salesforce.py 
b/airflow/providers/salesforce/hooks/salesforce.py
index d76baac..1b32cc9 100644
--- a/airflow/providers/salesforce/hooks/salesforce.py
+++ b/airflow/providers/salesforce/hooks/salesforce.py
@@ -25,7 +25,7 @@ retrieve data from it, and write that data to a file for 
other uses.
 """
 import logging
 import time
-from typing import Iterable, List, Optional
+from typing import Any, Dict, Iterable, List, Optional
 
 import pandas as pd
 from simple_salesforce import Salesforce, api
@@ -37,29 +37,58 @@ log = logging.getLogger(__name__)
 
 class SalesforceHook(BaseHook):
     """
-    Create new connection to Salesforce and allows you to pull data out of 
SFDC and save it to a file.
+    Creates new connection to Salesforce and allows you to pull data out of 
SFDC and save it to a file.
 
     You can then use that file with other Airflow operators to move the data 
into another data source.
 
-    :param conn_id: the name of the connection that has the parameters we need 
to connect to Salesforce.
-        The connection should be type `http` and include a user's security 
token in the `Extras` field.
+    :param conn_id: The name of the connection that has the parameters needed 
to connect to Salesforce.
+        The connection should be of type `Salesforce`.
     :type conn_id: str
 
     .. note::
-        For the HTTP connection type, you can include a
-        JSON structure in the `Extras` field.
-        We need a user's security token to connect to Salesforce.
-        So we define it in the `Extras` field as 
`{"security_token":"YOUR_SECURITY_TOKEN"}`
-
-        For sandbox mode, add `{"domain":"test"}` in the `Extras` field
+        To connect to Salesforce make sure the connection includes a Username, 
Password, and Security Token.
+        If in sandbox, enter a Domain value of 'test'.  Login methods such as 
IP filtering and JWT are not
+        supported currently.
 
     """
 
-    def __init__(self, conn_id: str) -> None:
+    conn_name_attr = "salesforce_conn_id"
+    default_conn_name = "salesforce_default"
+    conn_type = "salesforce"
+    hook_name = "Salesforce"
+
+    def __init__(self, salesforce_conn_id: str = default_conn_name) -> None:
         super().__init__()
-        self.conn_id = conn_id
+        self.conn_id = salesforce_conn_id
         self.conn = None
 
+    @staticmethod
+    def get_connection_form_widgets() -> Dict[str, Any]:
+        """Returns connection widgets to add to connection form"""
+        from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, 
BS3TextFieldWidget
+        from flask_babel import lazy_gettext
+        from wtforms import PasswordField, StringField
+
+        return {
+            "extra__salesforce__security_token": PasswordField(
+                lazy_gettext("Security Token"), widget=BS3PasswordFieldWidget()
+            ),
+            "extra__salesforce__domain": StringField(lazy_gettext("Domain"), 
widget=BS3TextFieldWidget()),
+        }
+
+    @staticmethod
+    def get_ui_field_behaviour() -> Dict:
+        """Returns custom field behaviour"""
+        return {
+            "hidden_fields": ["schema", "port", "extra", "host"],
+            "relabeling": {
+                "login": "Username",
+            },
+            "placeholders": {
+                "extra__salesforce__domain": "(Optional)  Set to 'test' if 
working in sandbox mode.",
+            },
+        }
+
     def get_conn(self) -> api.Salesforce:
         """Sign into Salesforce, only if we are not already signed in."""
         if not self.conn:
@@ -68,9 +97,8 @@ class SalesforceHook(BaseHook):
             self.conn = Salesforce(
                 username=connection.login,
                 password=connection.password,
-                security_token=extras['security_token'],
-                instance_url=connection.host,
-                domain=extras.get('domain'),
+                security_token=extras["extra__salesforce__security_token"],
+                domain=extras["extra__salesforce__domain"] or "login",
             )
         return self.conn
 
diff --git a/airflow/providers/salesforce/provider.yaml 
b/airflow/providers/salesforce/provider.yaml
index edfc925..9eadab7 100644
--- a/airflow/providers/salesforce/provider.yaml
+++ b/airflow/providers/salesforce/provider.yaml
@@ -55,4 +55,5 @@ hooks:
       - airflow.providers.salesforce.hooks.salesforce
 
 hook-class-names:
+  - airflow.providers.salesforce.hooks.salesforce.SalesforceHook
   - airflow.providers.salesforce.hooks.tableau.TableauHook
diff --git 
a/docs/apache-airflow-providers-salesforce/connections/salesforce.rst 
b/docs/apache-airflow-providers-salesforce/connections/salesforce.rst
index 0508e5c..baab53f 100644
--- a/docs/apache-airflow-providers-salesforce/connections/salesforce.rst
+++ b/docs/apache-airflow-providers-salesforce/connections/salesforce.rst
@@ -19,55 +19,32 @@
 
 Salesforce Connection
 =====================
-The HTTP connection type provides connection to Salesforce.
+The Salesforce connection type provides connection to Salesforce.
 
 Configuring the Connection
 --------------------------
-Host (required)
-    specify the host address to connect: 
``https://your_host.lightning.force.com``
-
-Login (required)
+Username (required)
     Specify the email address used to login to your account.
 
 Password (required)
     Specify the password associated with the account.
 
-Extra (required)
-    Specify the extra parameters (as json dictionary) that can be used in 
Salesforce
-    connection.
-    The following parameter is required:
-
-    * ``security_token``: Salesforce token.
-
-    The following parameter is optional:
-
-    * ``domain``: set to ``test`` if working in sandbox mode.
-
-    For security reason we suggest you to use one of the secrets Backend to 
create this
-    connection (Using ENVIRONMENT VARIABLE or Hashicorp Vault, GCP Secrets 
Manager etc).
-
-
-    When specifying the connection as URI (in :envvar:`AIRFLOW_CONN_{CONN_ID}` 
variable) you should specify it
-    following the standard syntax of DB connections - where extras are passed 
as parameters
-    of the URI.
-
-    For example:
-
-    .. code-block:: bash
+Security Token (required)
+    Specify the Salesforce security token for the username.
 
-       export 
AIRFLOW_CONN_SALESFORCE_DEFAULT='http://your_username:your_password@https%3A%2F%2Fyour_host.lightning.force.com?security_token=your_token'
+Domain (optional)
+    The domain to using for connecting to Salesforce. Use common domains, such 
as 'login'
+    or 'test', or Salesforce My domain. If not used, will default to 'login'.
 
+For security reason we suggest you to use one of the secrets Backend to create 
this
+connection (Using ENVIRONMENT VARIABLE or Hashicorp Vault, GCP Secrets Manager 
etc).
 
-Examples for the **Extra** field
---------------------------------
-Setting up sandbox mode:
+When specifying the connection as URI (in :envvar:`AIRFLOW_CONN_{CONN_ID}` 
variable) you should specify it
+following the standard syntax of DB connections - where extras are passed as 
parameters of the URI. For example:
 
-.. code-block:: json
+  .. code-block:: bash
 
-    {
-      "security_token": "your_token",
-      "domain":"test"
-    }
+    export 
AIRFLOW_CONN_SALESFORCE_DEFAULT='http://your_username:your_password@https%3A%2F%2Fyour_host.lightning.force.com?security_token=your_token'
 
 .. note::
   Airflow currently does not support other login methods such as IP filtering 
and JWT.
diff --git a/tests/providers/amazon/aws/transfers/test_salesforce_to_s3.py 
b/tests/providers/amazon/aws/transfers/test_salesforce_to_s3.py
index 9c9fe82..8a0d15a 100644
--- a/tests/providers/amazon/aws/transfers/test_salesforce_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_salesforce_to_s3.py
@@ -95,13 +95,7 @@ class TestSalesforceToGcsOperator(unittest.TestCase):
         assert operator.gzip == GZIP
         assert operator.acl_policy == ACL_POLICY
 
-        expected_op_output = {
-            "s3_uri": f"s3://{S3_BUCKET}/{S3_KEY}",
-            "s3_bucket_name": S3_BUCKET,
-            "s3_key": S3_KEY,
-        }
-
-        assert expected_op_output == operator.execute({})
+        assert f"s3://{S3_BUCKET}/{S3_KEY}" == operator.execute({})
 
         mock_make_query.assert_called_once_with(
             query=QUERY, include_deleted=INCLUDE_DELETED, 
query_params=QUERY_PARAMS
diff --git a/tests/providers/salesforce/hooks/test_salesforce.py 
b/tests/providers/salesforce/hooks/test_salesforce.py
index bf6041b..6eb9591 100644
--- a/tests/providers/salesforce/hooks/test_salesforce.py
+++ b/tests/providers/salesforce/hooks/test_salesforce.py
@@ -31,7 +31,7 @@ from airflow.providers.salesforce.hooks.salesforce import 
SalesforceHook
 
 class TestSalesforceHook(unittest.TestCase):
     def setUp(self):
-        self.salesforce_hook = SalesforceHook(conn_id="conn_id")
+        self.salesforce_hook = SalesforceHook(salesforce_conn_id="conn_id")
 
     def test_get_conn_exists(self):
         self.salesforce_hook.conn = Mock(spec=Salesforce)
@@ -43,7 +43,9 @@ class TestSalesforceHook(unittest.TestCase):
     @patch(
         
"airflow.providers.salesforce.hooks.salesforce.SalesforceHook.get_connection",
         return_value=Connection(
-            login="username", password="password", extra='{"security_token": 
"token", "domain": "test"}'
+            login="username",
+            password="password",
+            extra='{"extra__salesforce__security_token": "token", 
"extra__salesforce__domain": "login"}',
         ),
     )
     @patch("airflow.providers.salesforce.hooks.salesforce.Salesforce")
@@ -54,9 +56,8 @@ class TestSalesforceHook(unittest.TestCase):
         mock_salesforce.assert_called_once_with(
             username=mock_get_connection.return_value.login,
             password=mock_get_connection.return_value.password,
-            
security_token=mock_get_connection.return_value.extra_dejson["security_token"],
-            instance_url=mock_get_connection.return_value.host,
-            domain=mock_get_connection.return_value.extra_dejson.get("domain"),
+            
security_token=mock_get_connection.return_value.extra_dejson["extra__salesforce__security_token"],
+            
domain=mock_get_connection.return_value.extra_dejson.get("extra__salesforce__domain"),
         )
 
     @patch("airflow.providers.salesforce.hooks.salesforce.Salesforce")

Reply via email to