This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new ff342fc  Added SalesforceHook missing method to return only dataframe 
(#8565) (#8644)
ff342fc is described below

commit ff342fc230982dc5d88acfd5e5eab75187256b58
Author: Pranjal Mittal <[email protected]>
AuthorDate: Sun May 17 20:39:04 2020 +0530

    Added SalesforceHook missing method to return only dataframe (#8565) (#8644)
    
    * add feature for skipping writing to file
    
    * add SalesforceHook missing method to return dataframe only
    
    function write_object_to_file is divided to object_to_df which returns df 
and then the write_object_to_file can uses object_to_df as the first step 
before exporting to file
    
    * fixed exception message
    
    * fix review comments - removed filename check for None
---
 airflow/providers/salesforce/hooks/salesforce.py   | 76 +++++++++++++++-------
 .../providers/salesforce/hooks/test_salesforce.py  | 45 +++++++++++++
 2 files changed, 97 insertions(+), 24 deletions(-)

diff --git a/airflow/providers/salesforce/hooks/salesforce.py 
b/airflow/providers/salesforce/hooks/salesforce.py
index 9f224c6..66aa0dd 100644
--- a/airflow/providers/salesforce/hooks/salesforce.py
+++ b/airflow/providers/salesforce/hooks/salesforce.py
@@ -236,6 +236,58 @@ class SalesforceHook(BaseHook):
         if fmt not in ['csv', 'json', 'ndjson']:
             raise ValueError("Format value is not recognized: {}".format(fmt))
 
+        df = self.object_to_df(query_results=query_results, 
coerce_to_timestamp=coerce_to_timestamp,
+                               record_time_added=record_time_added)
+
+        # write the CSV or JSON file depending on the option
+        # NOTE:
+        #   datetimes here are an issue.
+        #   There is no good way to manage the difference
+        #   for to_json, the options are an epoch or a ISO string
+        #   but for to_csv, it will be a string output by datetime
+        #   For JSON we decided to output the epoch timestamp in seconds
+        #   (as is fairly standard for JavaScript)
+        #   And for csv, we do a string
+        if fmt == "csv":
+            # there are also a ton of newline objects that mess up our ability 
to write to csv
+            # we remove these newlines so that the output is a valid CSV format
+            self.log.info("Cleaning data and writing to CSV")
+            possible_strings = df.columns[df.dtypes == "object"]
+            df[possible_strings] = df[possible_strings].astype(str).apply(
+                lambda x: x.str.replace("\r\n", "").str.replace("\n", "")
+            )
+            # write the dataframe
+            df.to_csv(filename, index=False)
+        elif fmt == "json":
+            df.to_json(filename, "records", date_unit="s")
+        elif fmt == "ndjson":
+            df.to_json(filename, "records", lines=True, date_unit="s")
+
+        return df
+
+    def object_to_df(self, query_results, coerce_to_timestamp=False,
+                     record_time_added=False):
+        """
+        Export query results to dataframe.
+
+        By default, this function will try and leave all values as they are 
represented in Salesforce.
+        You use the `coerce_to_timestamp` flag to force all datetimes to 
become Unix timestamps (UTC).
+        This is can be greatly beneficial as it will make all of your datetime 
fields look the same,
+        and makes it easier to work with in other database environments
+
+        :param query_results: the results from a SQL query
+        :type query_results: list of dict
+        :param coerce_to_timestamp: True if you want all datetime fields to be 
converted into Unix timestamps.
+            False if you want them to be left in the same format as they were 
in Salesforce.
+            Leaving the value as False will result in datetimes being strings. 
Default: False
+        :type coerce_to_timestamp: bool
+        :param record_time_added: True if you want to add a Unix timestamp 
field
+            to the resulting data that marks when the data was fetched from 
Salesforce. Default: False
+        :type record_time_added: bool
+        :return: the dataframe.
+        :rtype: pd.Dataframe
+        """
+
         # this line right here will convert all integers to floats
         # if there are any None/np.nan values in the column
         # that's because None/np.nan cannot exist in an integer column
@@ -272,28 +324,4 @@ class SalesforceHook(BaseHook):
             fetched_time = time.time()
             df["time_fetched_from_salesforce"] = fetched_time
 
-        # write the CSV or JSON file depending on the option
-        # NOTE:
-        #   datetimes here are an issue.
-        #   There is no good way to manage the difference
-        #   for to_json, the options are an epoch or a ISO string
-        #   but for to_csv, it will be a string output by datetime
-        #   For JSON we decided to output the epoch timestamp in seconds
-        #   (as is fairly standard for JavaScript)
-        #   And for csv, we do a string
-        if fmt == "csv":
-            # there are also a ton of newline objects that mess up our ability 
to write to csv
-            # we remove these newlines so that the output is a valid CSV format
-            self.log.info("Cleaning data and writing to CSV")
-            possible_strings = df.columns[df.dtypes == "object"]
-            df[possible_strings] = df[possible_strings].astype(str).apply(
-                lambda x: x.str.replace("\r\n", "").str.replace("\n", "")
-            )
-            # write the dataframe
-            df.to_csv(filename, index=False)
-        elif fmt == "json":
-            df.to_json(filename, "records", date_unit="s")
-        elif fmt == "ndjson":
-            df.to_json(filename, "records", lines=True, date_unit="s")
-
         return df
diff --git a/tests/providers/salesforce/hooks/test_salesforce.py 
b/tests/providers/salesforce/hooks/test_salesforce.py
index b6828ae..1c9dfdc 100644
--- a/tests/providers/salesforce/hooks/test_salesforce.py
+++ b/tests/providers/salesforce/hooks/test_salesforce.py
@@ -176,3 +176,48 @@ class TestSalesforceHook(unittest.TestCase):
                 }
             ),
         )
+
+    @patch(
+        
"airflow.providers.salesforce.hooks.salesforce.SalesforceHook.describe_object",
+        return_value={"fields": [{"name": "field_1", "type": "date"}]},
+    )
+    @patch(
+        
"airflow.providers.salesforce.hooks.salesforce.pd.DataFrame.from_records",
+        return_value=pd.DataFrame({"test": [1, 2, 3], "field_1": 
["2019-01-01", "2019-01-02", "2019-01-03"]}),
+    )
+    def test_obect_to_df_with_timestamp_conversion(self, mock_data_frame, 
mock_describe_object):
+        obj_name = "obj_name"
+
+        data_frame = self.salesforce_hook.object_to_df(
+            query_results=[{"attributes": {"type": obj_name}}],
+            coerce_to_timestamp=True,
+        )
+
+        mock_describe_object.assert_called_once_with(obj_name)
+        pd.testing.assert_frame_equal(
+            data_frame, pd.DataFrame({"test": [1, 2, 3], "field_1": 
[1.546301e09, 1.546387e09, 1.546474e09]})
+        )
+
+    @patch("airflow.providers.salesforce.hooks.salesforce.time.time", 
return_value=1.23)
+    @patch(
+        
"airflow.providers.salesforce.hooks.salesforce.pd.DataFrame.from_records",
+        return_value=pd.DataFrame({"test": [1, 2, 3]}),
+    )
+    def test_object_to_df_with_record_time(self, mock_data_frame, mock_time):
+        data_frame = self.salesforce_hook.object_to_df(
+            query_results=[], record_time_added=True
+        )
+
+        pd.testing.assert_frame_equal(
+            data_frame,
+            pd.DataFrame(
+                {
+                    "test": [1, 2, 3],
+                    "time_fetched_from_salesforce": [
+                        mock_time.return_value,
+                        mock_time.return_value,
+                        mock_time.return_value,
+                    ],
+                }
+            ),
+        )

Reply via email to