Repository: incubator-gobblin Updated Branches: refs/heads/master 85ba58d51 -> 1c5fb6ec4
[GOBBLIN-434] support Salesforce refresh token Closes #2309 from haojiwu/salesforce_refresh_token Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1c5fb6ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1c5fb6ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1c5fb6ec Branch: refs/heads/master Commit: 1c5fb6ec48baf00839aeb3e8427c2c9ca3783e0b Parents: 85ba58d Author: Haoji Wu <[email protected]> Authored: Wed Mar 21 11:50:17 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Mar 21 11:50:17 2018 -0700 ---------------------------------------------------------------------- .../gobblin/salesforce/SalesforceConnector.java | 57 +++++++++++++++++--- .../gobblin/salesforce/SalesforceExtractor.java | 32 ++++++++--- 2 files changed, 74 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c5fb6ec/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java index f95e7f7..6ba7965 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConnector.java @@ -25,6 +25,7 @@ import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.message.BasicNameValuePair; import com.google.common.collect.Lists; @@ -46,9 +47,15 @@ public class SalesforceConnector extends RestApiConnector { private static final String DEFAULT_SERVICES_DATA_PATH = "/services/data"; private static final String DEFAULT_AUTH_TOKEN_PATH = "/services/oauth2/token"; + protected String refreshToken; public SalesforceConnector(State state) { super(state); + if (isPasswordGrant(state)) { + this.refreshToken = null; + } else { + this.refreshToken = state.getProp(ConfigurationKeys.SOURCE_CONN_REFRESH_TOKEN); + } } @Getter @@ -59,18 +66,26 @@ public class SalesforceConnector extends RestApiConnector { log.debug("Authenticating salesforce"); String clientId = this.state.getProp(ConfigurationKeys.SOURCE_CONN_CLIENT_ID); String clientSecret = this.state.getProp(ConfigurationKeys.SOURCE_CONN_CLIENT_SECRET); - String userName = this.state.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME); - String password = PasswordManager.getInstance(this.state) - .readPassword(this.state.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD)); - String securityToken = this.state.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN); String host = this.state.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME); List<NameValuePair> formParams = Lists.newArrayList(); - formParams.add(new BasicNameValuePair("grant_type", "password")); formParams.add(new BasicNameValuePair("client_id", clientId)); formParams.add(new BasicNameValuePair("client_secret", clientSecret)); - formParams.add(new BasicNameValuePair("username", userName)); - formParams.add(new BasicNameValuePair("password", password + securityToken)); + + if (refreshToken == null) { + log.info("Authenticating salesforce with username/password"); + String userName = this.state.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME); + String password = PasswordManager.getInstance(this.state) + .readPassword(this.state.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD)); + String securityToken = this.state.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN); + formParams.add(new BasicNameValuePair("grant_type", "password")); + formParams.add(new BasicNameValuePair("username", userName)); + formParams.add(new BasicNameValuePair("password", password + securityToken)); + } else { + log.info("Authenticating salesforce with refresh_token"); + formParams.add(new BasicNameValuePair("grant_type", "refresh_token")); + formParams.add(new BasicNameValuePair("refresh_token", refreshToken)); + } try { HttpPost post = new HttpPost(host + DEFAULT_AUTH_TOKEN_PATH); post.setEntity(new UrlEncodedFormEntity(formParams)); @@ -80,11 +95,29 @@ public class SalesforceConnector extends RestApiConnector { return httpEntity; } catch (Exception e) { - throw new RestApiConnectionException("Failed to authenticate salesforce using user:" + userName + " and host:" + throw new RestApiConnectionException("Failed to authenticate salesforce host:" + host + "; error-" + e.getMessage(), e); } } + @Override + protected void addHeaders(HttpRequestBase httpRequest) { + if (refreshToken == null) { + super.addHeaders(httpRequest); + } else { + if (this.accessToken != null) { + httpRequest.addHeader("Authorization", "Bearer " + this.accessToken); + } + httpRequest.addHeader("Content-Type", "application/json"); + } + } + + static boolean isPasswordGrant(State state) { + String userName = state.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME); + String securityToken = state.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN); + return (userName != null && securityToken != null); + } + private String getServiceBaseUrl() { String dataEnvPath = DEFAULT_SERVICES_DATA_PATH + "/v" + this.state.getProp(ConfigurationKeys.SOURCE_CONN_VERSION); this.servicesDataEnvPath = dataEnvPath; @@ -94,4 +127,12 @@ public class SalesforceConnector extends RestApiConnector { public String getFullUri(String resourcePath) { return StringUtils.removeEnd(getServiceBaseUrl(), "/") + StringUtils.removeEnd(resourcePath, "/"); } + + String getAccessToken() { + return accessToken; + } + + String getInstanceUrl() { + return instanceUrl; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c5fb6ec/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java index 0c16051..4442214 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; +import org.apache.gobblin.salesforce.SalesforceConfigurationKeys; import org.apache.http.HttpEntity; import org.apache.http.NameValuePair; import org.apache.http.client.methods.HttpGet; @@ -98,7 +99,7 @@ public class SalesforceExtractor extends RestApiExtractor { public static final String SALESFORCE_TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'.000Z'"; private static final String SALESFORCE_DATE_FORMAT = "yyyy-MM-dd"; private static final String SALESFORCE_HOUR_FORMAT = "HH"; - private static final String SALESFORCE_SOAP_AUTH_SERVICE = "/services/Soap/u"; + private static final String SALESFORCE_SOAP_SERVICE = "/services/Soap/u"; private static final Gson GSON = new Gson(); private static final int MAX_PK_CHUNKING_SIZE = 250000; private static final int MIN_PK_CHUNKING_SIZE = 100000; @@ -641,7 +642,7 @@ public class SalesforceExtractor extends RestApiExtractor { apiVersion = "29.0"; } - String soapAuthEndPoint = hostName + SALESFORCE_SOAP_AUTH_SERVICE + "/" + apiVersion; + String soapAuthEndPoint = hostName + SALESFORCE_SOAP_SERVICE + "/" + apiVersion; try { ConnectorConfig partnerConfig = new ConnectorConfig(); if (super.workUnitState.contains(ConfigurationKeys.SOURCE_CONN_USE_PROXY_URL) @@ -650,12 +651,29 @@ public class SalesforceExtractor extends RestApiExtractor { super.workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_CONN_USE_PROXY_PORT)); } - String securityToken = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN); - String password = PasswordManager.getInstance(this.workUnitState) - .readPassword(this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD)); - partnerConfig.setUsername(this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME)); - partnerConfig.setPassword(password + securityToken); + String accessToken = sfConnector.getAccessToken(); + + if (accessToken == null) { + boolean isConnectSuccess = sfConnector.connect(); + if (isConnectSuccess) { + accessToken = sfConnector.getAccessToken(); + } + } + + if (accessToken != null) { + String serviceEndpoint = sfConnector.getInstanceUrl() + SALESFORCE_SOAP_SERVICE + "/" + apiVersion; + partnerConfig.setSessionId(accessToken); + partnerConfig.setServiceEndpoint(serviceEndpoint); + } else { + String securityToken = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_SECURITY_TOKEN); + String password = PasswordManager.getInstance(this.workUnitState) + .readPassword(this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_PASSWORD)); + partnerConfig.setUsername(this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_USERNAME)); + partnerConfig.setPassword(password + securityToken); + } + partnerConfig.setAuthEndpoint(soapAuthEndPoint); + new PartnerConnection(partnerConfig); String soapEndpoint = partnerConfig.getServiceEndpoint(); String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/")) + "async/" + apiVersion;
