This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new ff342fc Added SalesforceHook missing method to return only dataframe
(#8565) (#8644)
ff342fc is described below
commit ff342fc230982dc5d88acfd5e5eab75187256b58
Author: Pranjal Mittal <[email protected]>
AuthorDate: Sun May 17 20:39:04 2020 +0530
Added SalesforceHook missing method to return only dataframe (#8565) (#8644)
* add feature for skipping writing to file
* add SalesforceHook missing method to return dataframe only
function write_object_to_file is divided to object_to_df which returns df
and then the write_object_to_file can uses object_to_df as the first step
before exporting to file
* fixed exception message
* fix review comments - removed filename check for None
---
airflow/providers/salesforce/hooks/salesforce.py | 76 +++++++++++++++-------
.../providers/salesforce/hooks/test_salesforce.py | 45 +++++++++++++
2 files changed, 97 insertions(+), 24 deletions(-)
diff --git a/airflow/providers/salesforce/hooks/salesforce.py
b/airflow/providers/salesforce/hooks/salesforce.py
index 9f224c6..66aa0dd 100644
--- a/airflow/providers/salesforce/hooks/salesforce.py
+++ b/airflow/providers/salesforce/hooks/salesforce.py
@@ -236,6 +236,58 @@ class SalesforceHook(BaseHook):
if fmt not in ['csv', 'json', 'ndjson']:
raise ValueError("Format value is not recognized: {}".format(fmt))
+ df = self.object_to_df(query_results=query_results,
coerce_to_timestamp=coerce_to_timestamp,
+ record_time_added=record_time_added)
+
+ # write the CSV or JSON file depending on the option
+ # NOTE:
+ # datetimes here are an issue.
+ # There is no good way to manage the difference
+ # for to_json, the options are an epoch or a ISO string
+ # but for to_csv, it will be a string output by datetime
+ # For JSON we decided to output the epoch timestamp in seconds
+ # (as is fairly standard for JavaScript)
+ # And for csv, we do a string
+ if fmt == "csv":
+ # there are also a ton of newline objects that mess up our ability
to write to csv
+ # we remove these newlines so that the output is a valid CSV format
+ self.log.info("Cleaning data and writing to CSV")
+ possible_strings = df.columns[df.dtypes == "object"]
+ df[possible_strings] = df[possible_strings].astype(str).apply(
+ lambda x: x.str.replace("\r\n", "").str.replace("\n", "")
+ )
+ # write the dataframe
+ df.to_csv(filename, index=False)
+ elif fmt == "json":
+ df.to_json(filename, "records", date_unit="s")
+ elif fmt == "ndjson":
+ df.to_json(filename, "records", lines=True, date_unit="s")
+
+ return df
+
+ def object_to_df(self, query_results, coerce_to_timestamp=False,
+ record_time_added=False):
+ """
+ Export query results to dataframe.
+
+ By default, this function will try and leave all values as they are
represented in Salesforce.
+ You use the `coerce_to_timestamp` flag to force all datetimes to
become Unix timestamps (UTC).
+ This is can be greatly beneficial as it will make all of your datetime
fields look the same,
+ and makes it easier to work with in other database environments
+
+ :param query_results: the results from a SQL query
+ :type query_results: list of dict
+ :param coerce_to_timestamp: True if you want all datetime fields to be
converted into Unix timestamps.
+ False if you want them to be left in the same format as they were
in Salesforce.
+ Leaving the value as False will result in datetimes being strings.
Default: False
+ :type coerce_to_timestamp: bool
+ :param record_time_added: True if you want to add a Unix timestamp
field
+ to the resulting data that marks when the data was fetched from
Salesforce. Default: False
+ :type record_time_added: bool
+ :return: the dataframe.
+ :rtype: pd.Dataframe
+ """
+
# this line right here will convert all integers to floats
# if there are any None/np.nan values in the column
# that's because None/np.nan cannot exist in an integer column
@@ -272,28 +324,4 @@ class SalesforceHook(BaseHook):
fetched_time = time.time()
df["time_fetched_from_salesforce"] = fetched_time
- # write the CSV or JSON file depending on the option
- # NOTE:
- # datetimes here are an issue.
- # There is no good way to manage the difference
- # for to_json, the options are an epoch or a ISO string
- # but for to_csv, it will be a string output by datetime
- # For JSON we decided to output the epoch timestamp in seconds
- # (as is fairly standard for JavaScript)
- # And for csv, we do a string
- if fmt == "csv":
- # there are also a ton of newline objects that mess up our ability
to write to csv
- # we remove these newlines so that the output is a valid CSV format
- self.log.info("Cleaning data and writing to CSV")
- possible_strings = df.columns[df.dtypes == "object"]
- df[possible_strings] = df[possible_strings].astype(str).apply(
- lambda x: x.str.replace("\r\n", "").str.replace("\n", "")
- )
- # write the dataframe
- df.to_csv(filename, index=False)
- elif fmt == "json":
- df.to_json(filename, "records", date_unit="s")
- elif fmt == "ndjson":
- df.to_json(filename, "records", lines=True, date_unit="s")
-
return df
diff --git a/tests/providers/salesforce/hooks/test_salesforce.py
b/tests/providers/salesforce/hooks/test_salesforce.py
index b6828ae..1c9dfdc 100644
--- a/tests/providers/salesforce/hooks/test_salesforce.py
+++ b/tests/providers/salesforce/hooks/test_salesforce.py
@@ -176,3 +176,48 @@ class TestSalesforceHook(unittest.TestCase):
}
),
)
+
+ @patch(
+
"airflow.providers.salesforce.hooks.salesforce.SalesforceHook.describe_object",
+ return_value={"fields": [{"name": "field_1", "type": "date"}]},
+ )
+ @patch(
+
"airflow.providers.salesforce.hooks.salesforce.pd.DataFrame.from_records",
+ return_value=pd.DataFrame({"test": [1, 2, 3], "field_1":
["2019-01-01", "2019-01-02", "2019-01-03"]}),
+ )
+ def test_obect_to_df_with_timestamp_conversion(self, mock_data_frame,
mock_describe_object):
+ obj_name = "obj_name"
+
+ data_frame = self.salesforce_hook.object_to_df(
+ query_results=[{"attributes": {"type": obj_name}}],
+ coerce_to_timestamp=True,
+ )
+
+ mock_describe_object.assert_called_once_with(obj_name)
+ pd.testing.assert_frame_equal(
+ data_frame, pd.DataFrame({"test": [1, 2, 3], "field_1":
[1.546301e09, 1.546387e09, 1.546474e09]})
+ )
+
+ @patch("airflow.providers.salesforce.hooks.salesforce.time.time",
return_value=1.23)
+ @patch(
+
"airflow.providers.salesforce.hooks.salesforce.pd.DataFrame.from_records",
+ return_value=pd.DataFrame({"test": [1, 2, 3]}),
+ )
+ def test_object_to_df_with_record_time(self, mock_data_frame, mock_time):
+ data_frame = self.salesforce_hook.object_to_df(
+ query_results=[], record_time_added=True
+ )
+
+ pd.testing.assert_frame_equal(
+ data_frame,
+ pd.DataFrame(
+ {
+ "test": [1, 2, 3],
+ "time_fetched_from_salesforce": [
+ mock_time.return_value,
+ mock_time.return_value,
+ mock_time.return_value,
+ ],
+ }
+ ),
+ )