Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0fd6c2c39 -> b93e6519c


[AIRFLOW-628] Adding SalesforceHook to contrib/hooks

Also added a salesforce option to setup.py

Closes #1881 from Jalepeno112/feature/salesforce-
hook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b93e6519
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b93e6519
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b93e6519

Branch: refs/heads/master
Commit: b93e6519cc6f23e9a229b44dc8b721d568c28cbb
Parents: 0fd6c2c
Author: Giovanni Briggs <[email protected]>
Authored: Fri Nov 18 11:10:32 2016 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Fri Nov 18 11:10:32 2016 -0800

----------------------------------------------------------------------
 airflow/contrib/hooks/salesforce_hook.py | 295 ++++++++++++++++++++++++++
 setup.py                                 |   2 +
 2 files changed, 297 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b93e6519/airflow/contrib/hooks/salesforce_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/salesforce_hook.py 
b/airflow/contrib/hooks/salesforce_hook.py
new file mode 100644
index 0000000..efb75d0
--- /dev/null
+++ b/airflow/contrib/hooks/salesforce_hook.py
@@ -0,0 +1,295 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+This module contains a Salesforce Hook
+which allows you to connect to your Salesforce instance,
+retrieve data from it, and write that data to a file
+for other uses.
+
+NOTE:   this hook also relies on the simple_salesforce package:
+        https://github.com/simple-salesforce/simple-salesforce
+"""
+from simple_salesforce import Salesforce
+from airflow.hooks.base_hook import BaseHook
+
+import logging
+import json
+
+import pandas as pd
+import time
+
+
+class SalesforceHook(BaseHook):
+    def __init__(
+            self,
+            conn_id,
+            *args,
+            **kwargs
+    ):
+        """
+        Create new connection to Salesforce
+        and allows you to pull data out of SFDC and save it to a file.
+
+        You can then use that file with other
+        Airflow operators to move the data into another data source
+
+        :param conn_id:     the name of the connection that has the parameters
+                            we need to connect to Salesforce.
+                            The conenction shoud be type `http` and include a
+                            user's security token in the `Extras` field.
+        .. note::
+            For the HTTP connection type, you can include a
+            JSON structure in the `Extras` field.
+            We need a user's security token to connect to Salesforce.
+            So we define it in the `Extras` field as:
+                `{"security_token":"YOUR_SECRUITY_TOKEN"}`
+        """
+        self.conn_id = conn_id
+        self._args = args
+        self._kwargs = kwargs
+
+        # get the connection parameters
+        self.connection = self.get_connection(conn_id)
+        self.extras = self.connection.extra_dejson
+
+    def sign_in(self):
+        """
+        Sign into Salesforce.
+
+        If we have already signed it, this will just return the original object
+        """
+        if hasattr(self, 'sf'):
+            return self.sf
+
+        # connect to Salesforce
+        sf = Salesforce(
+            username=self.connection.login,
+            password=self.connection.password,
+            security_token=self.extras['security_token'],
+            instance_url=self.connection.host
+        )
+        self.sf = sf
+        return sf
+
+    def make_query(self, query):
+        """
+        Make a query to Salesforce.  Returns result in dictionary
+
+        :param query:    The query to make to Salesforce
+        """
+        self.sign_in()
+
+        logging.info("Querying for all objects")
+        query = self.sf.query_all(query)
+
+        logging.info(
+            "Received results: Total size: {0}; Done: {1}".format(
+                query['totalSize'], query['done']
+            )
+        )
+
+        query = json.loads(json.dumps(query))
+        return query
+
+    def describe_object(self, obj):
+        """
+        Get the description of an object from Salesforce.
+
+        This description is the object's schema
+        and some extra metadata that Salesforce stores for each object
+
+        :param obj:     Name of the Salesforce object
+                        that we are getting a description of.
+        """
+        self.sign_in()
+
+        return json.loads(json.dumps(self.sf.__getattr__(obj).describe()))
+
+    def get_available_fields(self, obj):
+        """
+        Get a list of all available fields for an object.
+
+        This only returns the names of the fields.
+        """
+        self.sign_in()
+
+        desc = self.describe_object(obj)
+
+        return [f['name'] for f in desc['fields']]
+
+    def _build_field_list(self, fields):
+        # join all of the fields in a comma seperated list
+        return ",".join(fields)
+
+    def get_object_from_salesforce(self, obj, fields):
+        """
+        Get all instances of the `object` from Salesforce.
+        For each model, only get the fields specified in fields.
+
+        All we really do underneath the hood is run:
+            SELECT <fields> FROM <obj>;
+        """
+        field_string = self._build_field_list(fields)
+
+        query = "SELECT {0} FROM {1}".format(field_string, obj)
+        logging.info(
+            "Making query to salesforce: {0}".format(
+                query if len(query) < 30
+                else " ... ".join([query[:15], query[-15:]])
+            )
+        )
+        return self.make_query(query)
+
+    @classmethod
+    def _to_timestamp(cls, col):
+        """
+        Convert a column of a dataframe to UNIX timestamps if applicable
+
+        :param col:     A Series object representing a column of a dataframe.
+        """
+        # try and convert the column to datetimes
+        # the column MUST have a four digit year somewhere in the string
+        # there should be a better way to do this,
+        # but just letting pandas try and convert every column without a format
+        # caused it to convert floats as well
+        # For example, a column of integers
+        # between 0 and 10 are turned into timestamps
+        # if the column cannot be converted,
+        # just return the original column untouched
+        try:
+            col = pd.to_datetime(col)
+        except ValueError:
+            return col
+
+        # now convert the newly created datetimes into timestamps
+        # we have to be careful here
+        # because NaT cannot be converted to a timestamp
+        # so we have to return NaN
+        converted = []
+        for i in col:
+            try:
+                converted.append(i.timestamp())
+            except ValueError:
+                converted.append(pd.np.NaN)
+            except AttributeError:
+                converted.append(pd.np.NaN)
+
+        # return a new series that maintains the same index as the original
+        return pd.Series(converted, index=col.index)
+
+    def write_object_to_file(
+        self,
+        query_results,
+        filename,
+        fmt="csv",
+        coerce_to_timestamp=False,
+        record_time_added=False
+    ):
+        """
+        Write query results to file.
+
+        Acceptable formats are:
+            - csv:
+                comma-seperated-values file.  This is the default format.
+            - json:
+                JSON array.  Each element in the array is a different row.
+            - ndjson:
+                JSON array but each element is new-line deliminated
+                instead of comman deliminated like in `json`
+
+        This requires a significant amount of cleanup.
+        Pandas doesn't handle output to CSV and json in a uniform way.
+        This is especially painful for datetime types.
+        Pandas wants to write them as strings in CSV,
+        but as milisecond Unix timestamps.
+
+        By default, this function will try and leave all values as
+        they are represented in Salesforce.
+        You use the `coerce_to_timestamp` flag to force all datetimes
+        to become Unix timestamps (UTC).
+        This is can be greatly beneficial as it will make all of your
+        datetime fields look the same,
+        and makes it easier to work with in other database environments
+
+        :param query_results:       the results from a SQL query
+        :param filename:            the name of the file where the data
+                                    should be dumped to
+        :param fmt:                 the format you want the output in.
+                                    *Default:* csv.
+        :param coerce_to_timestamp: True if you want all datetime fields to be
+                                    converted into Unix timestamps.
+                                    False if you want them to be left in the
+                                    same format as they were in Salesforce.
+                                    *Defaults to False*
+        :param record_time_added:   *(optional)* True if you want to add a
+                                    Unix timestamp field to the resulting data
+                                    that marks when the data
+                                    was fetched from Salesforce.
+                                    *Default: False*.
+        """
+        fmt = fmt.lower()
+        if fmt not in ['csv', 'json', 'ndjson']:
+            raise ValueError("Format value is not recognized: {0}".format(fmt))
+
+        # this line right here will convert all integers to floats if there are
+        # any None/np.nan values in the column
+        # that's because None/np.nan cannot exist in an integer column
+        # we should write all of our timestamps as FLOATS in our final schema
+        df = pd.DataFrame.from_records(query_results, exclude=["attributes"])
+
+        df.columns = [c.lower() for c in df.columns]
+
+        # convert columns with datetime strings to datetimes
+        # not all strings will be datetimes, so we ignore any errors that occur
+        if coerce_to_timestamp:
+            possible_timestamp_cols = df.columns[df.dtypes == "object"]
+            df[possible_timestamp_cols] = df[possible_timestamp_cols].apply(
+                lambda x: self._to_timestamp(x)
+            )
+
+        if record_time_added:
+            fetched_time = time.time()
+            df["time_fetched_from_salesforce"] = fetched_time
+
+        # write the CSV or JSON file depending on the option
+        # NOTE:
+        #   datetimes here are an issue.
+        #   There is no good way to manage the difference
+        #   for to_json, the options are an epoch or a ISO string
+        #   but for to_csv, it will be a string output by datetime
+        #   For JSON we decided to output the epoch timestamp in seconds
+        #   (as is fairly standard for JavaScript)
+        #   And for csv, we do a string
+        if fmt == "csv":
+            # there are also a ton of newline objects
+            # that mess up our ability to write to csv
+            # we remove these newlines so that the output is a valid CSV format
+            logging.info("Cleaning data and writing to CSV")
+            possible_strings = df.columns[df.dtypes == "object"]
+            df[possible_strings] = df[possible_strings].apply(
+                lambda x: x.str.replace("\r\n", "")
+            )
+            df[possible_strings] = df[possible_strings].apply(
+                lambda x: x.str.replace("\n", "")
+            )
+
+            # write the dataframe
+            df.to_csv(filename, index=False)
+        elif fmt == "json":
+            df.to_json(filename, "records", date_unit="s")
+        elif fmt == "ndjson":
+            df.to_json(filename, "records", lines=True, date_unit="s")
+
+        return df

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b93e6519/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index baa496a..8364d92 100644
--- a/setup.py
+++ b/setup.py
@@ -139,6 +139,7 @@ mysql = ['mysqlclient>=1.3.6']
 rabbitmq = ['librabbitmq>=1.6.1']
 oracle = ['cx_Oracle>=5.1.2']
 postgres = ['psycopg2>=2.6']
+salesforce = ['simple-salesforce>=0.72']
 s3 = [
     'boto>=2.36.0',
     'filechunkio>=1.6',
@@ -236,6 +237,7 @@ def do_setup():
             'rabbitmq': rabbitmq,
             's3': s3,
             'emr': emr,
+            'salesforce': salesforce,
             'samba': samba,
             'slack': slack,
             'statsd': statsd,

Reply via email to