CAMEL-9925: Updated Salesforce component to use Jetty9 and cometd3
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ec90c0b4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ec90c0b4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ec90c0b4 Branch: refs/heads/master Commit: ec90c0b4b2c73f6323e1c320327cffa00d3e4b98 Parents: ff713bd Author: Dhiraj Bokde <dhira...@yahoo.com> Authored: Thu Apr 28 19:20:18 2016 -0700 Committer: Dhiraj Bokde <dhira...@yahoo.com> Committed: Thu Apr 28 19:47:05 2016 -0700 ---------------------------------------------------------------------- .../camel-salesforce-component/pom.xml | 34 ++- .../salesforce/SalesforceComponent.java | 165 ++++++++--- .../salesforce/SalesforceEndpointConfig.java | 12 +- .../salesforce/SalesforceHttpClient.java | 111 +++++++ .../component/salesforce/api/dto/Address.java | 10 + .../salesforce/api/dto/RestResources.java | 24 ++ .../salesforce/internal/SalesforceSession.java | 287 ++++++++----------- .../internal/client/AbstractClientBase.java | 195 +++++++------ .../client/DefaultAnalyticsApiClient.java | 95 +++--- .../internal/client/DefaultBulkApiClient.java | 93 +++--- .../internal/client/DefaultRestClient.java | 100 ++++--- .../internal/client/SalesforceExchange.java | 36 --- .../internal/client/SalesforceHttpRequest.java | 38 +++ .../client/SalesforceSecurityHandler.java | 262 +++++++++++++++++ .../client/SalesforceSecurityListener.java | 192 ------------- .../internal/client/XStreamUtils.java | 2 +- .../processor/AbstractRestProcessor.java | 3 +- .../processor/AbstractSalesforceProcessor.java | 4 +- .../processor/AnalyticsApiProcessor.java | 3 +- .../internal/processor/JsonRestProcessor.java | 2 +- .../internal/processor/XmlRestProcessor.java | 8 +- .../internal/streaming/SubscriptionHelper.java | 38 +-- .../salesforce/AbstractBulkApiTestBase.java | 4 +- .../salesforce/AbstractSalesforceTestBase.java | 9 + .../salesforce/BulkApiIntegrationTest.java | 24 +- .../salesforce/HttpProxyIntegrationTest.java | 44 ++- .../salesforce/RestApiIntegrationTest.java | 79 +++-- .../internal/SessionIntegrationTest.java | 12 +- .../camel-salesforce-maven-plugin/pom.xml | 25 +- .../apache/camel/maven/CamelSalesforceMojo.java | 128 +++++++-- .../maven/HttpProxyMojoIntegrationTest.java | 43 ++- components/camel-salesforce/pom.xml | 1 + parent/pom.xml | 2 +- 33 files changed, 1263 insertions(+), 822 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/pom.xml b/components/camel-salesforce/camel-salesforce-component/pom.xml index 79267a0..ac12d73 100644 --- a/components/camel-salesforce/camel-salesforce-component/pom.xml +++ b/components/camel-salesforce/camel-salesforce-component/pom.xml @@ -31,9 +31,6 @@ <description>Camel Salesforce support</description> <properties> - <!-- TODO: upgrade to jetty 9 --> - <jetty8-version>8.1.17.v20150415</jetty8-version> - <camel.osgi.import.before.defaults> org.joda.time.*;version="[1.6,3)" </camel.osgi.import.before.defaults> @@ -53,17 +50,22 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-client</artifactId> - <version>${jetty8-version}</version> + <version>${jetty9-version}</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> - <version>${jetty8-version}</version> + <version>${jetty9-version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util-ajax</artifactId> + <version>${jetty9-version}</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-io</artifactId> - <version>${jetty8-version}</version> + <version>${jetty9-version}</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> @@ -81,11 +83,11 @@ <exclusions> <exclusion> <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-util</artifactId> + <artifactId>*</artifactId> </exclusion> <exclusion> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-io</artifactId> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> @@ -125,7 +127,19 @@ <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> - <version>${jetty8-version}</version> + <version>${jetty9-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>${jetty9-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-proxy</artifactId> + <version>${jetty9-version}</version> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java index 5f93441..600dcbf 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java @@ -18,6 +18,7 @@ package org.apache.camel.component.salesforce; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -43,10 +44,13 @@ import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ReflectionHelper; import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.jsse.SSLContextParameters; -import org.eclipse.jetty.client.Address; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.RedirectListener; -import org.eclipse.jetty.client.security.ProxyAuthorization; +import org.eclipse.jetty.client.HttpProxy; +import org.eclipse.jetty.client.Origin; +import org.eclipse.jetty.client.ProxyConfiguration; +import org.eclipse.jetty.client.Socks4Proxy; +import org.eclipse.jetty.client.api.Authentication; +import org.eclipse.jetty.client.util.BasicAuthentication; +import org.eclipse.jetty.client.util.DigestAuthentication; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +63,6 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin private static final Logger LOG = LoggerFactory.getLogger(SalesforceComponent.class); private static final int CONNECTION_TIMEOUT = 60000; - private static final long RESPONSE_TIMEOUT = 60000; private static final Pattern SOBJECT_NAME_PATTERN = Pattern.compile("^.*[\\?&]sObjectName=([^&,]+).*$"); private static final String APEX_CALL_PREFIX = OperationName.APEX_CALL.value() + "/"; @@ -75,16 +78,23 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin // Proxy host and port private String httpProxyHost; private Integer httpProxyPort; + private boolean isHttpProxySocks4; + private boolean isHttpProxySecure = true; + private Set<String> httpProxyIncludedAddresses; + private Set<String> httpProxyExcludedAddresses; // Proxy basic authentication private String httpProxyUsername; private String httpProxyPassword; + private String httpProxyAuthUri; + private String httpProxyRealm; + private boolean httpProxyUseDigestAuth; // DTO packages to scan private String[] packages; // component state - private HttpClient httpClient; + private SalesforceHttpClient httpClient; private SalesforceSession session; private Map<String, Class<?>> classMap; @@ -179,20 +189,18 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin if (config != null && config.getHttpClient() != null) { httpClient = config.getHttpClient(); } else { - httpClient = new HttpClient(); - // default settings - httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + // set ssl context parameters if set + final SSLContextParameters contextParameters = sslContextParameters != null + ? sslContextParameters : new SSLContextParameters(); + final SslContextFactory sslContextFactory = new SslContextFactory(); + sslContextFactory.setSslContext(contextParameters.createSSLContext(getCamelContext())); + + httpClient = new SalesforceHttpClient(sslContextFactory); + // default settings, use httpClientProperties to set other properties httpClient.setConnectTimeout(CONNECTION_TIMEOUT); - httpClient.setTimeout(RESPONSE_TIMEOUT); } } - // set ssl context parameters - final SSLContextParameters contextParameters = sslContextParameters != null - ? sslContextParameters : new SSLContextParameters(); - final SslContextFactory sslContextFactory = httpClient.getSslContextFactory(); - sslContextFactory.setSslContext(contextParameters.createSSLContext(getCamelContext())); - // set HTTP client parameters if (httpClientProperties != null && !httpClientProperties.isEmpty()) { IntrospectionSupport.setProperties(getCamelContext().getTypeConverter(), @@ -201,29 +209,46 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin // set HTTP proxy settings if (this.httpProxyHost != null && httpProxyPort != null) { - httpClient.setProxy(new Address(this.httpProxyHost, this.httpProxyPort)); + Origin.Address proxyAddress = new Origin.Address(this.httpProxyHost, this.httpProxyPort); + ProxyConfiguration.Proxy proxy; + if (isHttpProxySocks4) { + proxy = new Socks4Proxy(proxyAddress, isHttpProxySecure); + } else { + proxy = new HttpProxy(proxyAddress, isHttpProxySecure); + } + if (httpProxyIncludedAddresses != null && !httpProxyIncludedAddresses.isEmpty()) { + proxy.getIncludedAddresses().addAll(httpProxyIncludedAddresses); + } + if (httpProxyExcludedAddresses != null && !httpProxyExcludedAddresses.isEmpty()) { + proxy.getExcludedAddresses().addAll(httpProxyExcludedAddresses); + } + httpClient.getProxyConfiguration().getProxies().add(proxy); } if (this.httpProxyUsername != null && httpProxyPassword != null) { - httpClient.setProxyAuthentication(new ProxyAuthorization(this.httpProxyUsername, this.httpProxyPassword)); - } - // add redirect listener to handle Salesforce redirects - // this is ok to do since the RedirectListener is in the same classloader as Jetty client - String listenerClass = RedirectListener.class.getName(); - if (httpClient.getRegisteredListeners() == null - || !httpClient.getRegisteredListeners().contains(listenerClass)) { - httpClient.registerListener(listenerClass); - } - // SalesforceSecurityListener can't be registered the same way - // since Jetty HttpClient's Class.forName() can't see it + ObjectHelper.notEmpty(httpProxyAuthUri, "httpProxyAuthUri"); + ObjectHelper.notEmpty(httpProxyRealm, "httpProxyRealm"); - // start the Jetty client to initialize thread pool, etc. - httpClient.start(); + final Authentication authentication; + if (httpProxyUseDigestAuth) { + authentication = new DigestAuthentication(new URI(httpProxyAuthUri), + httpProxyRealm, httpProxyUsername, httpProxyPassword); + } else { + authentication = new BasicAuthentication(new URI(httpProxyAuthUri), + httpProxyRealm, httpProxyUsername, httpProxyPassword); + } + httpClient.getAuthenticationStore().addAuthentication(authentication); + } // support restarts if (null == this.session) { - this.session = new SalesforceSession(httpClient, loginConfig); + this.session = new SalesforceSession(httpClient, httpClient.getTimeout(), loginConfig); } + // set session before calling start() + httpClient.setSession(this.session); + + // start the Jetty client to initialize thread pool, etc. + httpClient.start(); // login at startup if lazyLogin is disabled if (!loginConfig.isLazyLogin()) { @@ -441,6 +466,83 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin this.httpProxyPassword = httpProxyPassword; } + public boolean isHttpProxySocks4() { + return isHttpProxySocks4; + } + + /** + * Enable for Socks4 proxy, false by default + */ + public void setIsHttpProxySocks4(boolean isHttpProxySocks4) { + this.isHttpProxySocks4 = isHttpProxySocks4; + } + + public boolean isHttpProxySecure() { + return isHttpProxySecure; + } + + /** + * Enable for TLS connections, true by default + */ + public void setIsHttpProxySecure(boolean isHttpProxySecure) { + this.isHttpProxySecure = isHttpProxySecure; + } + + public Set<String> getHttpProxyIncludedAddresses() { + return httpProxyIncludedAddresses; + } + + /** + * HTTP proxy included addresses + */ + public void setHttpProxyIncludedAddresses(Set<String> httpProxyIncludedAddresses) { + this.httpProxyIncludedAddresses = httpProxyIncludedAddresses; + } + + public Set<String> getHttpProxyExcludedAddresses() { + return httpProxyExcludedAddresses; + } + + /** + * HTTP proxy excluded addresses + */ + public void setHttpProxyExcludedAddresses(Set<String> httpProxyExcludedAddresses) { + this.httpProxyExcludedAddresses = httpProxyExcludedAddresses; + } + + public String getHttpProxyAuthUri() { + return httpProxyAuthUri; + } + + /** + * HTTP proxy authentication URI + */ + public void setHttpProxyAuthUri(String httpProxyAuthUri) { + this.httpProxyAuthUri = httpProxyAuthUri; + } + + public String getHttpProxyRealm() { + return httpProxyRealm; + } + + /** + * HTTP proxy authentication realm + */ + public void setHttpProxyRealm(String httpProxyRealm) { + this.httpProxyRealm = httpProxyRealm; + } + + public boolean isHttpProxyUseDigestAuth() { + return httpProxyUseDigestAuth; + } + + /** + * Use HTTP proxy Digest authentication, false by default + */ + public void setHttpProxyUseDigestAuth(boolean httpProxyUseDigestAuth) { + this.httpProxyUseDigestAuth = httpProxyUseDigestAuth; + } + public String[] getPackages() { return packages; } @@ -469,5 +571,4 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin public Map<String, Class<?>> getClassMap() { return classMap; } - } http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java index e25198e5c..2bb6306 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java @@ -28,7 +28,6 @@ import org.apache.camel.component.salesforce.internal.dto.NotifyForFieldsEnum; import org.apache.camel.component.salesforce.internal.dto.NotifyForOperationsEnum; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; -import org.eclipse.jetty.client.HttpClient; /** * Salesforce Endpoint configuration. @@ -71,6 +70,9 @@ public class SalesforceEndpointConfig implements Cloneable { public static final String REPORT_METADATA = "reportMetadata"; public static final String INSTANCE_ID = "instanceId"; + // default maximum authentication retries on failed authentication or expired session + public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4; + // general properties @UriParam private String apiVersion = DEFAULT_VERSION; @@ -139,9 +141,9 @@ public class SalesforceEndpointConfig implements Cloneable { @UriParam private String instanceId; - // Jetty HttpClient, set using reference + // Salesforce Jetty9 HttpClient, set using reference @UriParam - private HttpClient httpClient; + private SalesforceHttpClient httpClient; public SalesforceEndpointConfig copy() { try { @@ -475,11 +477,11 @@ public class SalesforceEndpointConfig implements Cloneable { /** * Custom Jetty Http Client to use to connect to Salesforce. */ - public void setHttpClient(HttpClient httpClient) { + public void setHttpClient(SalesforceHttpClient httpClient) { this.httpClient = httpClient; } - public HttpClient getHttpClient() { + public SalesforceHttpClient getHttpClient() { return httpClient; } http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java new file mode 100644 index 0000000..6bca3f8 --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.component.salesforce.internal.SalesforceSession; +import org.apache.camel.component.salesforce.internal.client.SalesforceHttpRequest; +import org.apache.camel.component.salesforce.internal.client.SalesforceSecurityHandler; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.HttpConversation; +import org.eclipse.jetty.client.HttpRequest; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +/** + * Custom Salesforce HTTP Client that creates {@link SalesforceHttpRequest} requests. + */ +public class SalesforceHttpClient extends HttpClient { + + // default total request timeout in msecs + static final long DEFAULT_TIMEOUT = 60000; + + private static final int DEFAULT_MAX_RETRIES = 3; + private static final int DEFAULT_MAX_CONTENT_LENGTH = 4 * 1024 * 1024; + + private SalesforceSession session; + private int maxRetries = DEFAULT_MAX_RETRIES; + private int maxContentLength = DEFAULT_MAX_CONTENT_LENGTH; + private long timeout = DEFAULT_TIMEOUT; + + public SalesforceHttpClient() { + } + + public SalesforceHttpClient(SslContextFactory sslContextFactory) { + super(sslContextFactory); + } + + public SalesforceHttpClient(HttpClientTransport transport, SslContextFactory sslContextFactory) { + super(transport, sslContextFactory); + } + + @Override + public HttpRequest newHttpRequest(HttpConversation conversation, URI uri) { + final SalesforceHttpRequest request = new SalesforceHttpRequest(this, conversation, uri); + request.timeout(timeout, TimeUnit.MILLISECONDS); + return request; + } + + @Override + public Request copyRequest(HttpRequest oldRequest, URI newURI) { + return super.copyRequest(oldRequest, newURI); + } + + @Override + protected void doStart() throws Exception { + if (getSession() == null) { + throw new IllegalStateException("Missing SalesforceSession in property session!"); + } + getProtocolHandlers().add(new SalesforceSecurityHandler(this)); + super.doStart(); + } + + public SalesforceSession getSession() { + return session; + } + + public void setSession(SalesforceSession session) { + this.session = session; + } + + public int getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + + public int getMaxContentLength() { + return maxContentLength; + } + + public void setMaxContentLength(int maxContentLength) { + this.maxContentLength = maxContentLength; + } + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java index 2385f94..6096260 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java @@ -35,6 +35,8 @@ public class Address extends GeoLocation { private String street; + private String geocodeAccuracy; + public String getCity() { return city; } @@ -90,4 +92,12 @@ public class Address extends GeoLocation { public void setStreet(String street) { this.street = street; } + + public String getGeocodeAccuracy() { + return geocodeAccuracy; + } + + public void setGeocodeAccuracy(String geocodeAccuracy) { + this.geocodeAccuracy = geocodeAccuracy; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java index a07cb6a..46d6944 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java @@ -18,6 +18,8 @@ package org.apache.camel.component.salesforce.api.dto; import com.thoughtworks.xstream.annotations.XStreamAlias; +import org.codehaus.jackson.annotate.JsonProperty; + /** * DTO for Salesforce Resources. */ @@ -46,6 +48,12 @@ public class RestResources extends AbstractDTOBase { private String actions; private String tabs; private String wave; + @JsonProperty("async-queries") + @XStreamAlias("async-queries") + private String asyncQueries; + @JsonProperty("exchange-connect") + @XStreamAlias("exchange-connect") + private String exchangeConnect; public String getSobjects() { return sobjects; @@ -222,4 +230,20 @@ public class RestResources extends AbstractDTOBase { public void setWave(String wave) { this.wave = wave; } + + public String getAsyncQueries() { + return asyncQueries; + } + + public void setAsyncQueries(String asyncQueries) { + this.asyncQueries = asyncQueries; + } + + public String getExchangeConnect() { + return exchangeConnect; + } + + public void setExchangeConnect(String exchangeConnect) { + this.exchangeConnect = exchangeConnect; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java index bf3a395..0fbe8ec 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java @@ -17,12 +17,17 @@ package org.apache.camel.component.salesforce.internal; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.camel.Service; +import org.apache.camel.component.salesforce.SalesforceHttpClient; import org.apache.camel.component.salesforce.SalesforceLoginConfig; import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.api.dto.RestError; @@ -30,15 +35,13 @@ import org.apache.camel.component.salesforce.internal.dto.LoginError; import org.apache.camel.component.salesforce.internal.dto.LoginToken; import org.apache.camel.util.ObjectHelper; import org.codehaus.jackson.map.ObjectMapper; -import org.eclipse.jetty.client.ContentExchange; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpExchange; -import org.eclipse.jetty.http.HttpMethods; +import org.eclipse.jetty.client.HttpConversation; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.util.FormContentProvider; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.io.ByteArrayBuffer; -import org.eclipse.jetty.util.StringUtil; -import org.eclipse.jetty.util.UrlEncoded; +import org.eclipse.jetty.util.Fields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,9 +51,9 @@ public class SalesforceSession implements Service { private static final String OAUTH2_TOKEN_PATH = "/services/oauth2/token"; private static final Logger LOG = LoggerFactory.getLogger(SalesforceSession.class); - private static final String FORM_CONTENT_TYPE = "application/x-www-form-urlencoded;charset=utf-8"; - private final HttpClient httpClient; + private final SalesforceHttpClient httpClient; + private final long timeout; private final SalesforceLoginConfig config; @@ -60,7 +63,7 @@ public class SalesforceSession implements Service { private volatile String accessToken; private volatile String instanceUrl; - public SalesforceSession(HttpClient httpClient, SalesforceLoginConfig config) { + public SalesforceSession(SalesforceHttpClient httpClient, long timeout, SalesforceLoginConfig config) { // validate parameters ObjectHelper.notNull(httpClient, "httpClient"); ObjectHelper.notNull(config, "SalesforceLoginConfig"); @@ -71,6 +74,7 @@ public class SalesforceSession implements Service { ObjectHelper.notNull(config.getPassword(), "password"); this.httpClient = httpClient; + this.timeout = timeout; this.config = config; // strip trailing '/' @@ -100,144 +104,133 @@ public class SalesforceSession implements Service { } // login to Salesforce and get session id - final StatusExceptionExchange loginPost = new StatusExceptionExchange(true); - String url = config.getLoginUrl() + OAUTH2_TOKEN_PATH; - loginPost.setURL(url); - loginPost.setMethod(HttpMethods.POST); - loginPost.setRequestContentType(FORM_CONTENT_TYPE); - - final UrlEncoded nvps = new UrlEncoded(); - nvps.put("grant_type", "password"); - nvps.put("client_id", config.getClientId()); - nvps.put("client_secret", config.getClientSecret()); - nvps.put("username", config.getUserName()); - nvps.put("password", config.getPassword()); - nvps.put("format", "json"); - + final Request loginPost = getLoginRequest(null); try { - LOG.info("Login user {} at Salesforce url: {}", config.getUserName(), url); - - // set form content - loginPost.setRequestContent(new ByteArrayBuffer( - nvps.encode(StringUtil.__UTF8, true).getBytes(StringUtil.__UTF8))); - httpClient.send(loginPost); - - // wait for the login to finish - final int exchangeState = loginPost.waitForDone(); - - switch (exchangeState) { - case HttpExchange.STATUS_COMPLETED: - final byte[] responseContent = loginPost.getResponseContentBytes(); - final int responseStatus = loginPost.getResponseStatus(); - - switch (responseStatus) { - case HttpStatus.OK_200: - // parse the response to get token - LoginToken token = objectMapper.readValue(responseContent, LoginToken.class); - - // don't log token or instance URL for security reasons - LOG.info("Login successful"); - accessToken = token.getAccessToken(); - instanceUrl = token.getInstanceUrl(); - - // notify all listeners - for (SalesforceSessionListener listener : listeners) { - try { - listener.onLogin(accessToken, instanceUrl); - } catch (Throwable t) { - LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage()); - } - } - - break; - - case HttpStatus.BAD_REQUEST_400: - // parse the response to get error - final LoginError error = objectMapper.readValue(responseContent, LoginError.class); - final String msg = String.format("Login error code:[%s] description:[%s]", - error.getError(), error.getErrorDescription()); - final List<RestError> errors = new ArrayList<RestError>(); - errors.add(new RestError(msg, error.getErrorDescription())); - throw new SalesforceException(errors, HttpStatus.BAD_REQUEST_400); - - default: - throw new SalesforceException(String.format("Login error status:[%s] reason:[%s]", - responseStatus, loginPost.getReason()), responseStatus); - } - break; - - case HttpExchange.STATUS_EXCEPTED: - final Throwable ex = loginPost.getException(); - throw new SalesforceException( - String.format("Unexpected login exception: %s", ex.getMessage()), ex); + final ContentResponse loginResponse = loginPost.send(); + parseLoginResponse(loginResponse, loginResponse.getContentAsString()); - case HttpExchange.STATUS_CANCELLED: - throw new SalesforceException("Login request CANCELLED!", null); - - case HttpExchange.STATUS_EXPIRED: - throw new SalesforceException("Login request TIMEOUT!", null); - - default: - throw new SalesforceException("Unknow status: " + exchangeState, null); - } - } catch (IOException e) { - String msg = "Login error: unexpected exception " + e.getMessage(); - throw new SalesforceException(msg, e); } catch (InterruptedException e) { - String msg = "Login error: unexpected exception " + e.getMessage(); - throw new SalesforceException(msg, e); + throw new SalesforceException("Login error: " + e.getMessage(), e); + } catch (TimeoutException e) { + throw new SalesforceException("Login request timeout: " + e.getMessage(), e); + } catch (ExecutionException e) { + throw new SalesforceException("Unexpected login error: " + e.getCause().getMessage(), e.getCause()); } } return accessToken; } - public synchronized void logout() throws SalesforceException { - if (accessToken == null) { - return; + /** + * Creates login request, allows SalesforceSecurityHandler to create a login request for a failed authentication conversation + * @return login POST request. + */ + public Request getLoginRequest(HttpConversation conversation) { + final String loginUrl = (instanceUrl == null ? config.getLoginUrl() : instanceUrl) + OAUTH2_TOKEN_PATH; + LOG.info("Login user {} at Salesforce loginUrl: {}", config.getUserName(), loginUrl); + final Fields fields = new Fields(true); + + fields.put("grant_type", "password"); + fields.put("client_id", config.getClientId()); + fields.put("client_secret", config.getClientSecret()); + fields.put("username", config.getUserName()); + fields.put("password", config.getPassword()); + fields.put("format", "json"); + + final Request post; + if (conversation == null) { + post = httpClient.POST(loginUrl); + } else { + post = httpClient.newHttpRequest(conversation, URI.create(loginUrl)) + .method(HttpMethod.POST); } - StatusExceptionExchange logoutGet = new StatusExceptionExchange(true); - logoutGet.setURL(config.getLoginUrl() + OAUTH2_REVOKE_PATH + accessToken); - logoutGet.setMethod(HttpMethods.GET); + return post.content(new FormContentProvider(fields)) + .timeout(timeout, TimeUnit.MILLISECONDS); + } + + /** + * Parses login response, allows SalesforceSecurityHandler to parse a login request for a failed authentication conversation. + * @param loginResponse + * @param responseContent + * @throws SalesforceException + */ + public synchronized void parseLoginResponse(ContentResponse loginResponse, String responseContent) throws SalesforceException { + final int responseStatus = loginResponse.getStatus(); try { - httpClient.send(logoutGet); - final int done = logoutGet.waitForDone(); - switch (done) { - case HttpExchange.STATUS_COMPLETED: - final int statusCode = logoutGet.getResponseStatus(); - final String reason = logoutGet.getReason(); - - if (statusCode == HttpStatus.OK_200) { - LOG.info("Logout successful"); - } else { - throw new SalesforceException( - String.format("Logout error, code: [%s] reason: [%s]", - statusCode, reason), - statusCode); + switch (responseStatus) { + case HttpStatus.OK_200: + // parse the response to get token + LoginToken token = objectMapper.readValue(responseContent, LoginToken.class); + + // don't log token or instance URL for security reasons + LOG.info("Login successful"); + accessToken = token.getAccessToken(); + instanceUrl = token.getInstanceUrl(); + + // notify all session listeners + for (SalesforceSessionListener listener : listeners) { + try { + listener.onLogin(accessToken, instanceUrl); + } catch (Throwable t) { + LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage()); + } } + break; - case HttpExchange.STATUS_EXCEPTED: - final Throwable ex = logoutGet.getException(); - throw new SalesforceException("Unexpected logout exception: " + ex.getMessage(), ex); + case HttpStatus.BAD_REQUEST_400: + // parse the response to get error + final LoginError error = objectMapper.readValue(responseContent, LoginError.class); + final String msg = String.format("Login error code:[%s] description:[%s]", + error.getError(), error.getErrorDescription()); + final List<RestError> errors = new ArrayList<RestError>(); + errors.add(new RestError(msg, error.getErrorDescription())); + throw new SalesforceException(errors, HttpStatus.BAD_REQUEST_400); - case HttpExchange.STATUS_CANCELLED: - throw new SalesforceException("Logout request CANCELLED!", null); + default: + throw new SalesforceException(String.format("Login error status:[%s] reason:[%s]", + responseStatus, loginResponse.getReason()), responseStatus); + } + } catch (IOException e) { + String msg = "Login error: response parse exception " + e.getMessage(); + throw new SalesforceException(msg, e); + } + } - case HttpExchange.STATUS_EXPIRED: - throw new SalesforceException("Logout request TIMEOUT!", null); + public synchronized void logout() throws SalesforceException { + if (accessToken == null) { + return; + } - default: - throw new SalesforceException("Unknown status: " + done, null); + try { + String logoutUrl = (instanceUrl == null ? config.getLoginUrl() : instanceUrl) + OAUTH2_REVOKE_PATH + accessToken; + final Request logoutGet = httpClient.newRequest(logoutUrl) + .timeout(timeout, TimeUnit.MILLISECONDS); + final ContentResponse logoutResponse = logoutGet.send(); + + final int statusCode = logoutResponse.getStatus(); + final String reason = logoutResponse.getReason(); + + if (statusCode == HttpStatus.OK_200) { + LOG.info("Logout successful"); + } else { + throw new SalesforceException( + String.format("Logout error, code: [%s] reason: [%s]", + statusCode, reason), + statusCode); } - } catch (SalesforceException e) { - throw e; - } catch (Exception e) { + + } catch (InterruptedException e) { String msg = "Logout error: " + e.getMessage(); throw new SalesforceException(msg, e); + } catch (ExecutionException e) { + final Throwable ex = e.getCause(); + throw new SalesforceException("Unexpected logout exception: " + ex.getMessage(), ex); + } catch (TimeoutException e) { + throw new SalesforceException("Logout request TIMEOUT!", null); } finally { // reset session accessToken = null; @@ -281,45 +274,8 @@ public class SalesforceSession implements Service { logout(); } - /** - * Records status line, and exception from exchange. - */ - private static class StatusExceptionExchange extends ContentExchange { - - private String reason; - private Throwable exception; - - public StatusExceptionExchange(boolean cacheFields) { - super(cacheFields); - } - - @Override - protected synchronized void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException { - // remember reason - this.reason = reason.toString(StringUtil.__ISO_8859_1); - super.onResponseStatus(version, status, reason); - } - - @Override - protected void onConnectionFailed(Throwable x) { - this.exception = x; - super.onConnectionFailed(x); - } - - @Override - protected void onException(Throwable x) { - this.exception = x; - super.onException(x); - } - - public String getReason() { - return reason; - } - - public Throwable getException() { - return exception; - } - + public long getTimeout() { + return timeout; } public interface SalesforceSessionListener { @@ -327,5 +283,4 @@ public class SalesforceSession implements Service { void onLogout(); } - } http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java index b61c161..757293e 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java @@ -16,21 +16,27 @@ */ package org.apache.camel.component.salesforce.internal.client; -import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.camel.Service; +import org.apache.camel.component.salesforce.SalesforceHttpClient; import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.internal.SalesforceSession; -import org.eclipse.jetty.client.ContentExchange; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpEventListenerWrapper; -import org.eclipse.jetty.client.HttpExchange; -import org.eclipse.jetty.http.HttpSchemes; +import org.eclipse.jetty.client.HttpContentResponse; +import org.eclipse.jetty.client.api.ContentProvider; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.ByteBufferContentProvider; +import org.eclipse.jetty.client.util.InputStreamContentProvider; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,15 +47,15 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce protected final Logger log = LoggerFactory.getLogger(getClass()); - protected final HttpClient httpClient; + protected final SalesforceHttpClient httpClient; protected final SalesforceSession session; protected final String version; protected String accessToken; protected String instanceUrl; - public AbstractClientBase(String version, - SalesforceSession session, HttpClient httpClient) throws SalesforceException { + public AbstractClientBase(String version, SalesforceSession session, + SalesforceHttpClient httpClient) throws SalesforceException { this.version = version; this.session = session; @@ -89,111 +95,102 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce // SalesforceSecurityListener will auto login! } - protected SalesforceExchange getContentExchange(String method, String url) { - SalesforceExchange get = new SalesforceExchange(); - get.setMethod(method); - get.setURL(url); - get.setClient(this); - return get; + protected Request getRequest(HttpMethod method, String url) { + return getRequest(method.asString(), url); + } + + protected Request getRequest(String method, String url) { + SalesforceHttpRequest request = (SalesforceHttpRequest) httpClient.newRequest(url) + .method(method) + .timeout(session.getTimeout(), TimeUnit.MILLISECONDS); + request.getConversation().setAttribute(SalesforceSecurityHandler.CLIENT_ATTRIBUTE, this); + return request; } protected interface ClientResponseCallback { void onResponse(InputStream response, SalesforceException ex); } - protected void doHttpRequest(final ContentExchange request, final ClientResponseCallback callback) { + protected void doHttpRequest(final Request request, final ClientResponseCallback callback) { + // Highly memory inefficient, + // but buffer the request content to allow it to be replayed for authentication retries + final ContentProvider content = request.getContent(); + if (content instanceof InputStreamContentProvider) { + final List<ByteBuffer> buffers = new ArrayList<>(); + for (ByteBuffer buffer : content) { + buffers.add(buffer); + } + request.content(new ByteBufferContentProvider(buffers.toArray(new ByteBuffer[buffers.size()]))); + buffers.clear(); + } - // use SalesforceSecurityListener for security login retries - final SalesforceSecurityListener securityListener; - try { - final boolean isHttps = HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme())); - securityListener = new SalesforceSecurityListener( - httpClient.getDestination(request.getAddress(), isHttps), - request, session, accessToken) { + // execute the request + request.send(new BufferingResponseListener(httpClient.getMaxContentLength()) { + @Override + public void onComplete(Result result) { + Response response = result.getResponse(); + if (result.isFailed()) { + + // Failure!!! + // including Salesforce errors reported as exception from SalesforceSecurityHandler + Throwable failure = result.getFailure(); + if (failure instanceof SalesforceException) { + callback.onResponse(null, (SalesforceException) failure); + } else { + final String msg = String.format("Unexpected error {%s:%s} executing {%s:%s}", + response.getStatus(), response.getReason(), request.getMethod(), request.getURI()); + callback.onResponse(null, new SalesforceException(msg, response.getStatus(), failure)); + } + } else { - private String reason; + // HTTP error status + final int status = response.getStatus(); + SalesforceHttpRequest request = (SalesforceHttpRequest) ((SalesforceHttpRequest) result.getRequest()) + .getConversation() + .getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE); - @Override - public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException { - super.onResponseStatus(version, status, reason); - // remember status reason - this.reason = reason.toString(StringUtil.__ISO_8859_1); - } + if (status == HttpStatus.BAD_REQUEST_400 && request != null) { + // parse login error + ContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding()); + try { - @Override - protected SalesforceException createExceptionResponse() { - final int responseStatus = request.getResponseStatus(); - if (responseStatus < HttpStatus.OK_200 || responseStatus >= HttpStatus.MULTIPLE_CHOICES_300) { - final String msg = String.format("Error {%s:%s} executing {%s:%s}", - responseStatus, reason, request.getMethod(), request.getRequestURI()); - return new SalesforceException(msg, responseStatus, createRestException(request, reason)); - } else { - return super.createExceptionResponse(); - } - } - }; - } catch (IOException e) { - // propagate exception - callback.onResponse(null, new SalesforceException( - String.format("Error registering security listener: %s", e.getMessage()), - e)); - return; - } + session.parseLoginResponse(contentResponse, getContentAsString()); + final String msg = String.format("Unexpected Error {%s:%s} executing {%s:%s}", + status, response.getReason(), request.getMethod(), request.getURI()); + callback.onResponse(null, new SalesforceException(msg, null)); - // use HttpEventListener for lifecycle events - request.setEventListener(new HttpEventListenerWrapper(request.getEventListener(), true) { + } catch (SalesforceException e) { - @Override - public void onConnectionFailed(Throwable ex) { - super.onConnectionFailed(ex); - callback.onResponse(null, - new SalesforceException("Connection error: " + ex.getMessage(), ex)); - } + final String msg = String.format("Error {%s:%s} executing {%s:%s}", + status, response.getReason(), request.getMethod(), request.getURI()); + callback.onResponse(null, new SalesforceException(msg, response.getStatus(), e)); - @Override - public void onException(Throwable ex) { - super.onException(ex); - callback.onResponse(null, - new SalesforceException("Unexpected exception: " + ex.getMessage(), ex)); - } + } + } else if (status < HttpStatus.OK_200 || status >= HttpStatus.MULTIPLE_CHOICES_300) { - @Override - public void onExpire() { - super.onExpire(); - callback.onResponse(null, - new SalesforceException("Request expired", null)); - } + // Salesforce HTTP failure! + request = (SalesforceHttpRequest) result.getRequest(); + final String msg = String.format("Error {%s:%s} executing {%s:%s}", + status, response.getReason(), request.getMethod(), request.getURI()); + final SalesforceException cause = createRestException(response, getContentAsInputStream()); + callback.onResponse(null, new SalesforceException(msg, response.getStatus(), cause)); - @Override - public void onResponseComplete() throws IOException { - super.onResponseComplete(); + } else { - SalesforceException e = securityListener.getExceptionResponse(); - if (e != null) { - callback.onResponse(null, e); - } else { - // TODO not memory efficient for large response messages, - // doesn't seem to be possible in Jetty 7 to directly stream to response parsers - final byte[] bytes = request.getResponseContentBytes(); - callback.onResponse(bytes != null ? new ByteArrayInputStream(bytes) : null, null); + // Success!!! + callback.onResponse(getContentAsInputStream(), null); + } } + } + @Override + public InputStream getContentAsInputStream() { + if (getContent().length == 0) { + return null; + } + return super.getContentAsInputStream(); } }); - - // wrap the above lifecycle event listener with SalesforceSecurityListener - securityListener.setEventListener(request.getEventListener()); - request.setEventListener(securityListener); - - // execute the request - try { - httpClient.send(request); - } catch (IOException e) { - String msg = "Unexpected Error: " + e.getMessage(); - // send error through callback - callback.onResponse(null, new SalesforceException(msg, e)); - } - } public void setAccessToken(String accessToken) { @@ -204,8 +201,8 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce this.instanceUrl = instanceUrl; } - protected abstract void setAccessToken(HttpExchange httpExchange); + protected abstract void setAccessToken(Request request); - protected abstract SalesforceException createRestException(ContentExchange httpExchange, String reason); + protected abstract SalesforceException createRestException(Response response, InputStream responseContent); } http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java index 1041f7f..f6e72dc 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java @@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.List; +import org.apache.camel.component.salesforce.SalesforceHttpClient; import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.api.dto.RestError; import org.apache.camel.component.salesforce.api.dto.analytics.reports.AsyncReportResults; @@ -33,12 +34,11 @@ import org.apache.camel.component.salesforce.api.dto.analytics.reports.SyncRepor import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; -import org.eclipse.jetty.client.ContentExchange; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpExchange; -import org.eclipse.jetty.http.HttpHeaders; -import org.eclipse.jetty.http.HttpMethods; -import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.util.StringUtil; /** @@ -51,7 +51,8 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana private ObjectMapper objectMapper; - public DefaultAnalyticsApiClient(String version, SalesforceSession session, HttpClient httpClient) throws SalesforceException { + public DefaultAnalyticsApiClient(String version, SalesforceSession session, + SalesforceHttpClient httpClient) throws SalesforceException { super(version, session, httpClient); objectMapper = new ObjectMapper(); @@ -60,16 +61,16 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana @Override public void getRecentReports(final RecentReportsResponseCallback callback) { - final ContentExchange contentExchange = getContentExchange(HttpMethods.GET, reportsUrl()); + final Request Request = getRequest(HttpMethod.GET, reportsUrl()); - doHttpRequest(contentExchange, new ClientResponseCallback() { + doHttpRequest(Request, new ClientResponseCallback() { @Override @SuppressWarnings("unchecked") public void onResponse(InputStream response, SalesforceException ex) { List<RecentReport> recentReports = null; if (response != null) { try { - recentReports = unmarshalResponse(response, contentExchange, + recentReports = unmarshalResponse(response, Request, new TypeReference<List<RecentReport>>() { } ); @@ -85,14 +86,14 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana @Override public void getReportDescription(String reportId, final ReportDescriptionResponseCallback callback) { - final ContentExchange contentExchange = getContentExchange(HttpMethods.GET, reportsDescribeUrl(reportId)); + final Request Request = getRequest(HttpMethod.GET, reportsDescribeUrl(reportId)); - doHttpRequest(contentExchange, new ClientResponseCallback() { + doHttpRequest(Request, new ClientResponseCallback() { @Override public void onResponse(InputStream response, SalesforceException ex) { ReportDescription reportDescription = null; try { - reportDescription = unmarshalResponse(response, contentExchange, ReportDescription.class); + reportDescription = unmarshalResponse(response, Request, ReportDescription.class); } catch (SalesforceException e) { ex = e; } @@ -106,8 +107,8 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana final ReportResultsResponseCallback callback) { final boolean useGet = reportMetadata == null; - final ContentExchange contentExchange = getContentExchange( - useGet ? HttpMethods.GET : HttpMethods.POST, reportsUrl(reportId, includeDetails)); + final Request Request = getRequest( + useGet ? HttpMethod.GET : HttpMethod.POST, reportsUrl(reportId, includeDetails)); // set POST data if (!useGet) { @@ -115,19 +116,19 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana // wrap reportMetadata in a map final HashMap<String, Object> request = new HashMap<String, Object>(); request.put("reportMetadata", reportMetadata); - marshalRequest(request, contentExchange); + marshalRequest(request, Request); } catch (SalesforceException e) { callback.onResponse(null, e); return; } } - doHttpRequest(contentExchange, new ClientResponseCallback() { + doHttpRequest(Request, new ClientResponseCallback() { @Override public void onResponse(InputStream response, SalesforceException ex) { SyncReportResults reportResults = null; try { - reportResults = unmarshalResponse(response, contentExchange, SyncReportResults.class); + reportResults = unmarshalResponse(response, Request, SyncReportResults.class); } catch (SalesforceException e) { ex = e; } @@ -140,7 +141,7 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana public void executeAsyncReport(String reportId, Boolean includeDetails, ReportMetadata reportMetadata, final ReportInstanceResponseCallback callback) { - final ContentExchange contentExchange = getContentExchange(HttpMethods.POST, + final Request Request = getRequest(HttpMethod.POST, reportInstancesUrl(reportId, includeDetails)); // set POST data @@ -149,19 +150,19 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana // wrap reportMetadata in a map final HashMap<String, Object> request = new HashMap<String, Object>(); request.put("reportMetadata", reportMetadata); - marshalRequest(request, contentExchange); + marshalRequest(request, Request); } catch (SalesforceException e) { callback.onResponse(null, e); return; } } - doHttpRequest(contentExchange, new ClientResponseCallback() { + doHttpRequest(Request, new ClientResponseCallback() { @Override public void onResponse(InputStream response, SalesforceException ex) { ReportInstance reportInstance = null; try { - reportInstance = unmarshalResponse(response, contentExchange, ReportInstance.class); + reportInstance = unmarshalResponse(response, Request, ReportInstance.class); } catch (SalesforceException e) { ex = e; } @@ -173,16 +174,16 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana @Override public void getReportInstances(String reportId, final ReportInstanceListResponseCallback callback) { - final ContentExchange contentExchange = getContentExchange(HttpMethods.GET, reportInstancesUrl(reportId)); + final Request Request = getRequest(HttpMethod.GET, reportInstancesUrl(reportId)); - doHttpRequest(contentExchange, new ClientResponseCallback() { + doHttpRequest(Request, new ClientResponseCallback() { @Override @SuppressWarnings("unchecked") public void onResponse(InputStream response, SalesforceException ex) { List<ReportInstance> reportInstances = null; if (response != null) { try { - reportInstances = unmarshalResponse(response, contentExchange, + reportInstances = unmarshalResponse(response, Request, new TypeReference<List<ReportInstance>>() { } ); @@ -198,15 +199,15 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana @Override public void getReportResults(String reportId, String instanceId, final ReportResultsResponseCallback callback) { - final ContentExchange contentExchange = getContentExchange(HttpMethods.GET, + final Request Request = getRequest(HttpMethod.GET, reportInstancesUrl(reportId, instanceId)); - doHttpRequest(contentExchange, new ClientResponseCallback() { + doHttpRequest(Request, new ClientResponseCallback() { @Override public void onResponse(InputStream response, SalesforceException ex) { AsyncReportResults reportResults = null; try { - reportResults = unmarshalResponse(response, contentExchange, AsyncReportResults.class); + reportResults = unmarshalResponse(response, Request, AsyncReportResults.class); } catch (SalesforceException e) { ex = e; } @@ -247,16 +248,15 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana } @Override - protected void setAccessToken(HttpExchange httpExchange) { - httpExchange.setRequestHeader(HttpHeaders.AUTHORIZATION, TOKEN_PREFIX + accessToken); + protected void setAccessToken(Request request) { + // replace old token + request.getHeaders().put(HttpHeader.AUTHORIZATION, TOKEN_PREFIX + accessToken); } @Override - protected SalesforceException createRestException(ContentExchange httpExchange, String reason) { - final int statusCode = httpExchange.getResponseStatus(); - String responseContent = null; + protected SalesforceException createRestException(Response response, InputStream responseContent) { + final int statusCode = response.getStatus(); try { - responseContent = httpExchange.getResponseContent(); if (responseContent != null) { // unmarshal RestError final List<RestError> errors = objectMapper.readValue(responseContent, @@ -277,36 +277,37 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana } // just report HTTP status info - return new SalesforceException("Unexpected error: " + reason + ", with content: " + responseContent, - statusCode); + String message = String.format("Unexpected error: %s, with content: %s", + response.getReason(), responseContent); + return new SalesforceException(message, statusCode); } @Override - protected void doHttpRequest(ContentExchange request, ClientResponseCallback callback) { + protected void doHttpRequest(Request request, ClientResponseCallback callback) { // set access token for all requests setAccessToken(request); // set request and response content type and charset, which is always JSON for analytics API - request.setRequestHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON_UTF8); - request.setRequestHeader(HttpHeaders.ACCEPT, APPLICATION_JSON_UTF8); - request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, StringUtil.__UTF8); + request.header(HttpHeader.CONTENT_TYPE, APPLICATION_JSON_UTF8); + request.header(HttpHeader.ACCEPT, APPLICATION_JSON_UTF8); + request.header(HttpHeader.ACCEPT_CHARSET, StringUtil.__UTF8); super.doHttpRequest(request, callback); } - private void marshalRequest(Object input, ContentExchange request) throws SalesforceException { + private void marshalRequest(Object input, Request request) throws SalesforceException { try { - request.setRequestContent(new ByteArrayBuffer(objectMapper.writeValueAsBytes(input))); + request.content(new BytesContentProvider(objectMapper.writeValueAsBytes(input))); } catch (IOException e) { throw new SalesforceException( String.format("Error marshaling request for {%s:%s} : %s", - request.getMethod(), request.getRequestURI(), e.getMessage()), + request.getMethod(), request.getURI(), e.getMessage()), e); } } - private <T> T unmarshalResponse(InputStream response, ContentExchange request, + private <T> T unmarshalResponse(InputStream response, Request request, TypeReference<T> responseTypeReference) throws SalesforceException { @@ -315,12 +316,12 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana } catch (IOException e) { throw new SalesforceException( String.format("Error unmarshaling response {%s:%s} : %s", - request.getMethod(), request.getRequestURI(), e.getMessage()), + request.getMethod(), request.getURI(), e.getMessage()), e); } } - private <T> T unmarshalResponse(InputStream response, ContentExchange request, Class<T> responseClass) + private <T> T unmarshalResponse(InputStream response, Request request, Class<T> responseClass) throws SalesforceException { if (response == null) { @@ -332,7 +333,7 @@ public class DefaultAnalyticsApiClient extends AbstractClientBase implements Ana } catch (IOException e) { throw new SalesforceException( String.format("Error unmarshaling response {%s:%s} : %s", - request.getMethod(), request.getRequestURI(), e.getMessage()), + request.getMethod(), request.getURI(), e.getMessage()), e); } } http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java index 3ab4227..1f024db 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java @@ -16,9 +16,9 @@ */ package org.apache.camel.component.salesforce.internal.client; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; +import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.Collections; import javax.xml.bind.JAXBContext; @@ -28,6 +28,7 @@ import javax.xml.bind.Marshaller; import javax.xml.bind.Unmarshaller; import javax.xml.transform.stream.StreamSource; +import org.apache.camel.component.salesforce.SalesforceHttpClient; import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.api.dto.RestError; import org.apache.camel.component.salesforce.api.dto.bulk.BatchInfo; @@ -39,12 +40,12 @@ import org.apache.camel.component.salesforce.api.dto.bulk.JobStateEnum; import org.apache.camel.component.salesforce.api.dto.bulk.ObjectFactory; import org.apache.camel.component.salesforce.api.dto.bulk.QueryResultList; import org.apache.camel.component.salesforce.internal.SalesforceSession; -import org.eclipse.jetty.client.ContentExchange; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpExchange; -import org.eclipse.jetty.http.HttpHeaders; -import org.eclipse.jetty.http.HttpMethods; -import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.client.util.InputStreamContentProvider; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.util.StringUtil; public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiClient { @@ -55,7 +56,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC private JAXBContext context; private ObjectFactory objectFactory; - public DefaultBulkApiClient(String version, SalesforceSession session, HttpClient httpClient) + public DefaultBulkApiClient(String version, SalesforceSession session, SalesforceHttpClient httpClient) throws SalesforceException { super(version, session, httpClient); @@ -74,7 +75,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC // clear system fields if set sanitizeJobRequest(request); - final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(null)); + final Request post = getRequest(HttpMethod.POST, jobUrl(null)); try { marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8); } catch (SalesforceException e) { @@ -123,7 +124,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC @Override public void getJob(String jobId, final JobInfoResponseCallback callback) { - final ContentExchange get = getContentExchange(HttpMethods.GET, jobUrl(jobId)); + final Request get = getRequest(HttpMethod.GET, jobUrl(jobId)); // make the call and parse the result doHttpRequest(get, new ClientResponseCallback() { @@ -145,7 +146,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC final JobInfo request = new JobInfo(); request.setState(JobStateEnum.CLOSED); - final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(jobId)); + final Request post = getRequest(HttpMethod.POST, jobUrl(jobId)); try { marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8); } catch (SalesforceException e) { @@ -173,7 +174,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC final JobInfo request = new JobInfo(); request.setState(JobStateEnum.ABORTED); - final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(jobId)); + final Request post = getRequest(HttpMethod.POST, jobUrl(jobId)); try { marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8); } catch (SalesforceException e) { @@ -199,9 +200,9 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC @Override public void createBatch(InputStream batchStream, String jobId, ContentType contentTypeEnum, final BatchInfoResponseCallback callback) { - final ContentExchange post = getContentExchange(HttpMethods.POST, batchUrl(jobId, null)); - post.setRequestContentSource(batchStream); - post.setRequestContentType(getContentType(contentTypeEnum) + ";charset=" + StringUtil.__UTF8); + final Request post = getRequest(HttpMethod.POST, batchUrl(jobId, null)); + post.content(new InputStreamContentProvider(batchStream)); + post.header(HttpHeader.CONTENT_TYPE, getContentType(contentTypeEnum) + ";charset=" + StringUtil.__UTF8); // make the call and parse the result doHttpRequest(post, new ClientResponseCallback() { @@ -220,7 +221,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC @Override public void getBatch(String jobId, String batchId, final BatchInfoResponseCallback callback) { - final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, batchId)); + final Request get = getRequest(HttpMethod.GET, batchUrl(jobId, batchId)); // make the call and parse the result doHttpRequest(get, new ClientResponseCallback() { @@ -239,7 +240,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC @Override public void getAllBatches(String jobId, final BatchInfoListResponseCallback callback) { - final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, null)); + final Request get = getRequest(HttpMethod.GET, batchUrl(jobId, null)); // make the call and parse the result doHttpRequest(get, new ClientResponseCallback() { @@ -258,7 +259,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC @Override public void getRequest(String jobId, String batchId, final StreamResponseCallback callback) { - final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, batchId)); + final Request get = getRequest(HttpMethod.GET, batchUrl(jobId, batchId)); // make the call and parse the result doHttpRequest(get, new ClientResponseCallback() { @@ -271,7 +272,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC @Override public void getResults(String jobId, String batchId, final StreamResponseCallback callback) { - final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, null)); + final Request get = getRequest(HttpMethod.GET, batchResultUrl(jobId, batchId, null)); // make the call and return the result doHttpRequest(get, new ClientResponseCallback() { @@ -285,10 +286,16 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC @Override public void createBatchQuery(String jobId, String soqlQuery, ContentType jobContentType, final BatchInfoResponseCallback callback) { - final ContentExchange post = getContentExchange(HttpMethods.POST, batchUrl(jobId, null)); - byte[] queryBytes = soqlQuery.getBytes(StringUtil.__UTF8_CHARSET); - post.setRequestContent(new ByteArrayBuffer(queryBytes)); - post.setRequestContentType(getContentType(jobContentType) + ";charset=" + StringUtil.__UTF8); + final Request post = getRequest(HttpMethod.POST, batchUrl(jobId, null)); + final byte[] queryBytes; + try { + queryBytes = soqlQuery.getBytes(StringUtil.__UTF8); + } catch (UnsupportedEncodingException e) { + callback.onResponse(null, new SalesforceException("Unexpected exception: " + e.getMessage(), e)); + return; + } + post.content(new BytesContentProvider(queryBytes)); + post.header(HttpHeader.CONTENT_TYPE, getContentType(jobContentType) + ";charset=" + StringUtil.__UTF8); // make the call and parse the result doHttpRequest(post, new ClientResponseCallback() { @@ -307,7 +314,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC @Override public void getQueryResultIds(String jobId, String batchId, final QueryResultIdsCallback callback) { - final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, null)); + final Request get = getRequest(HttpMethod.GET, batchResultUrl(jobId, batchId, null)); // make the call and parse the result doHttpRequest(get, new ClientResponseCallback() { @@ -326,7 +333,7 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC @Override public void getQueryResult(String jobId, String batchId, String resultId, final StreamResponseCallback callback) { - final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, resultId)); + final Request get = getRequest(HttpMethod.GET, batchResultUrl(jobId, batchId, resultId)); // make the call and parse the result doHttpRequest(get, new ClientResponseCallback() { @@ -338,23 +345,24 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC } @Override - protected void setAccessToken(HttpExchange httpExchange) { - httpExchange.setRequestHeader(TOKEN_HEADER, accessToken); + protected void setAccessToken(Request request) { + // replace old token + request.getHeaders().put(TOKEN_HEADER, accessToken); } @Override - protected void doHttpRequest(ContentExchange request, ClientResponseCallback callback) { + protected void doHttpRequest(Request request, ClientResponseCallback callback) { // set access token for all requests setAccessToken(request); // set default charset - request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, StringUtil.__UTF8); + request.header(HttpHeader.ACCEPT_CHARSET, StringUtil.__UTF8); // TODO check if this is really needed or not, since SF response content type seems fixed // check if the default accept content type must be used - if (!request.getRequestFields().containsKey(HttpHeaders.ACCEPT)) { + if (!request.getHeaders().contains(HttpHeader.ACCEPT)) { final String contentType = getContentType(DEFAULT_ACCEPT_TYPE); - request.setRequestHeader(HttpHeaders.ACCEPT, contentType); + request.header(HttpHeader.ACCEPT, contentType); // request content type and charset is set by the request entity } @@ -386,24 +394,23 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC } @Override - protected SalesforceException createRestException(ContentExchange request, String reason) { + protected SalesforceException createRestException(Response response, InputStream responseContent) { // this must be of type Error try { - final Error error = unmarshalResponse(new ByteArrayInputStream(request.getResponseContentBytes()), - request, Error.class); + final Error error = unmarshalResponse(responseContent, response.getRequest(), Error.class); final RestError restError = new RestError(); restError.setErrorCode(error.getExceptionCode()); restError.setMessage(error.getExceptionMessage()); - return new SalesforceException(Arrays.asList(restError), request.getResponseStatus()); + return new SalesforceException(Arrays.asList(restError), response.getStatus()); } catch (SalesforceException e) { String msg = "Error un-marshaling Salesforce Error: " + e.getMessage(); return new SalesforceException(msg, e); } } - private <T> T unmarshalResponse(InputStream response, ContentExchange request, Class<T> resultClass) + private <T> T unmarshalResponse(InputStream response, Request request, Class<T> resultClass) throws SalesforceException { try { Unmarshaller unmarshaller = context.createUnmarshaller(); @@ -412,32 +419,32 @@ public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiC } catch (JAXBException e) { throw new SalesforceException( String.format("Error unmarshaling response {%s:%s} : %s", - request.getMethod(), request.getRequestURI(), e.getMessage()), + request.getMethod(), request.getURI(), e.getMessage()), e); } catch (IllegalArgumentException e) { throw new SalesforceException( String.format("Error unmarshaling response for {%s:%s} : %s", - request.getMethod(), request.getRequestURI(), e.getMessage()), + request.getMethod(), request.getURI(), e.getMessage()), e); } } - private void marshalRequest(Object input, ContentExchange request, String contentType) throws SalesforceException { + private void marshalRequest(Object input, Request request, String contentType) throws SalesforceException { try { Marshaller marshaller = context.createMarshaller(); ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); marshaller.marshal(input, byteStream); - request.setRequestContent(new ByteArrayBuffer(byteStream.toByteArray())); - request.setRequestContentType(contentType); + + request.content(new BytesContentProvider(contentType, byteStream.toByteArray())); } catch (JAXBException e) { throw new SalesforceException( String.format("Error marshaling request for {%s:%s} : %s", - request.getMethod(), request.getRequestURI(), e.getMessage()), + request.getMethod(), request.getURI(), e.getMessage()), e); } catch (IllegalArgumentException e) { throw new SalesforceException( String.format("Error marshaling request for {%s:%s} : %s", - request.getMethod(), request.getRequestURI(), e.getMessage()), + request.getMethod(), request.getURI(), e.getMessage()), e); } }