CAMEL-9925: Updated Salesforce component to use Jetty9 and cometd3
Conflicts:
components/camel-salesforce/camel-salesforce-component/pom.xml
components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
components/camel-salesforce/camel-salesforce-maven-plugin/pom.xml
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8dfd66bd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8dfd66bd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8dfd66bd
Branch: refs/heads/camel-2.17.x
Commit: 8dfd66bd731c6cea0885481f06e817dac0284b41
Parents: 7aa181a
Author: Dhiraj Bokde <[email protected]>
Authored: Thu Apr 28 19:20:18 2016 -0700
Committer: Dhiraj Bokde <[email protected]>
Committed: Thu Apr 28 22:13:34 2016 -0700
----------------------------------------------------------------------
.../camel-salesforce-component/pom.xml | 31 +-
.../salesforce/SalesforceComponent.java | 165 ++++++++---
.../salesforce/SalesforceEndpointConfig.java | 12 +-
.../salesforce/SalesforceHttpClient.java | 111 +++++++
.../component/salesforce/api/dto/Address.java | 10 +
.../salesforce/api/dto/RestResources.java | 24 ++
.../salesforce/internal/SalesforceSession.java | 287 ++++++++-----------
.../internal/client/AbstractClientBase.java | 195 +++++++------
.../client/DefaultAnalyticsApiClient.java | 95 +++---
.../internal/client/DefaultBulkApiClient.java | 93 +++---
.../internal/client/DefaultRestClient.java | 100 ++++---
.../internal/client/SalesforceExchange.java | 36 ---
.../internal/client/SalesforceHttpRequest.java | 38 +++
.../client/SalesforceSecurityHandler.java | 262 +++++++++++++++++
.../client/SalesforceSecurityListener.java | 192 -------------
.../internal/client/XStreamUtils.java | 2 +-
.../processor/AbstractRestProcessor.java | 3 +-
.../processor/AbstractSalesforceProcessor.java | 4 +-
.../processor/AnalyticsApiProcessor.java | 3 +-
.../internal/processor/JsonRestProcessor.java | 2 +-
.../internal/processor/XmlRestProcessor.java | 8 +-
.../internal/streaming/SubscriptionHelper.java | 38 +--
.../salesforce/AbstractBulkApiTestBase.java | 4 +-
.../salesforce/AbstractSalesforceTestBase.java | 9 +
.../salesforce/BulkApiIntegrationTest.java | 24 +-
.../salesforce/HttpProxyIntegrationTest.java | 44 ++-
.../salesforce/RestApiIntegrationTest.java | 79 +++--
.../internal/SessionIntegrationTest.java | 12 +-
.../camel-salesforce-maven-plugin/pom.xml | 20 +-
.../apache/camel/maven/CamelSalesforceMojo.java | 128 +++++++--
.../maven/HttpProxyMojoIntegrationTest.java | 43 ++-
components/camel-salesforce/pom.xml | 1 +
parent/pom.xml | 2 +-
33 files changed, 1263 insertions(+), 814 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/pom.xml
b/components/camel-salesforce/camel-salesforce-component/pom.xml
index 83c5a0a..b15fea8 100644
--- a/components/camel-salesforce/camel-salesforce-component/pom.xml
+++ b/components/camel-salesforce/camel-salesforce-component/pom.xml
@@ -50,17 +50,22 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
- <version>${jetty-version}</version>
+ <version>${jetty9-version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>${jetty-version}</version>
+ <version>${jetty9-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util-ajax</artifactId>
+ <version>${jetty9-version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
- <version>${jetty-version}</version>
+ <version>${jetty9-version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
@@ -78,11 +83,11 @@
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-util</artifactId>
+ <artifactId>*</artifactId>
</exclusion>
<exclusion>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-io</artifactId>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
@@ -122,7 +127,19 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>${jetty-version}</version>
+ <version>${jetty9-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty9-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-proxy</artifactId>
+ <version>${jetty9-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
----------------------------------------------------------------------
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index 84ad4b0..80f45c6 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.salesforce;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -43,10 +44,13 @@ import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.jsse.SSLContextParameters;
-import org.eclipse.jetty.client.Address;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.RedirectListener;
-import org.eclipse.jetty.client.security.ProxyAuthorization;
+import org.eclipse.jetty.client.HttpProxy;
+import org.eclipse.jetty.client.Origin;
+import org.eclipse.jetty.client.ProxyConfiguration;
+import org.eclipse.jetty.client.Socks4Proxy;
+import org.eclipse.jetty.client.api.Authentication;
+import org.eclipse.jetty.client.util.BasicAuthentication;
+import org.eclipse.jetty.client.util.DigestAuthentication;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +63,6 @@ public class SalesforceComponent extends UriEndpointComponent
implements Endpoin
private static final Logger LOG =
LoggerFactory.getLogger(SalesforceComponent.class);
private static final int CONNECTION_TIMEOUT = 60000;
- private static final long RESPONSE_TIMEOUT = 60000;
private static final Pattern SOBJECT_NAME_PATTERN =
Pattern.compile("^.*[\\?&]sObjectName=([^&,]+).*$");
private static final String APEX_CALL_PREFIX =
OperationName.APEX_CALL.value() + "/";
@@ -75,16 +78,23 @@ public class SalesforceComponent extends
UriEndpointComponent implements Endpoin
// Proxy host and port
private String httpProxyHost;
private Integer httpProxyPort;
+ private boolean isHttpProxySocks4;
+ private boolean isHttpProxySecure = true;
+ private Set<String> httpProxyIncludedAddresses;
+ private Set<String> httpProxyExcludedAddresses;
// Proxy basic authentication
private String httpProxyUsername;
private String httpProxyPassword;
+ private String httpProxyAuthUri;
+ private String httpProxyRealm;
+ private boolean httpProxyUseDigestAuth;
// DTO packages to scan
private String[] packages;
// component state
- private HttpClient httpClient;
+ private SalesforceHttpClient httpClient;
private SalesforceSession session;
private Map<String, Class<?>> classMap;
@@ -179,20 +189,18 @@ public class SalesforceComponent extends
UriEndpointComponent implements Endpoin
if (config != null && config.getHttpClient() != null) {
httpClient = config.getHttpClient();
} else {
- httpClient = new HttpClient();
- // default settings
-
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+ // set ssl context parameters if set
+ final SSLContextParameters contextParameters =
sslContextParameters != null
+ ? sslContextParameters : new SSLContextParameters();
+ final SslContextFactory sslContextFactory = new
SslContextFactory();
+
sslContextFactory.setSslContext(contextParameters.createSSLContext());
+
+ httpClient = new SalesforceHttpClient(sslContextFactory);
+ // default settings, use httpClientProperties to set other
properties
httpClient.setConnectTimeout(CONNECTION_TIMEOUT);
- httpClient.setTimeout(RESPONSE_TIMEOUT);
}
}
- // set ssl context parameters
- final SSLContextParameters contextParameters = sslContextParameters !=
null
- ? sslContextParameters : new SSLContextParameters();
- final SslContextFactory sslContextFactory =
httpClient.getSslContextFactory();
- sslContextFactory.setSslContext(contextParameters.createSSLContext());
-
// set HTTP client parameters
if (httpClientProperties != null && !httpClientProperties.isEmpty()) {
IntrospectionSupport.setProperties(getCamelContext().getTypeConverter(),
@@ -201,29 +209,46 @@ public class SalesforceComponent extends
UriEndpointComponent implements Endpoin
// set HTTP proxy settings
if (this.httpProxyHost != null && httpProxyPort != null) {
- httpClient.setProxy(new Address(this.httpProxyHost,
this.httpProxyPort));
+ Origin.Address proxyAddress = new
Origin.Address(this.httpProxyHost, this.httpProxyPort);
+ ProxyConfiguration.Proxy proxy;
+ if (isHttpProxySocks4) {
+ proxy = new Socks4Proxy(proxyAddress, isHttpProxySecure);
+ } else {
+ proxy = new HttpProxy(proxyAddress, isHttpProxySecure);
+ }
+ if (httpProxyIncludedAddresses != null &&
!httpProxyIncludedAddresses.isEmpty()) {
+
proxy.getIncludedAddresses().addAll(httpProxyIncludedAddresses);
+ }
+ if (httpProxyExcludedAddresses != null &&
!httpProxyExcludedAddresses.isEmpty()) {
+
proxy.getExcludedAddresses().addAll(httpProxyExcludedAddresses);
+ }
+ httpClient.getProxyConfiguration().getProxies().add(proxy);
}
if (this.httpProxyUsername != null && httpProxyPassword != null) {
- httpClient.setProxyAuthentication(new
ProxyAuthorization(this.httpProxyUsername, this.httpProxyPassword));
- }
- // add redirect listener to handle Salesforce redirects
- // this is ok to do since the RedirectListener is in the same
classloader as Jetty client
- String listenerClass = RedirectListener.class.getName();
- if (httpClient.getRegisteredListeners() == null
- ||
!httpClient.getRegisteredListeners().contains(listenerClass)) {
- httpClient.registerListener(listenerClass);
- }
- // SalesforceSecurityListener can't be registered the same way
- // since Jetty HttpClient's Class.forName() can't see it
+ ObjectHelper.notEmpty(httpProxyAuthUri, "httpProxyAuthUri");
+ ObjectHelper.notEmpty(httpProxyRealm, "httpProxyRealm");
- // start the Jetty client to initialize thread pool, etc.
- httpClient.start();
+ final Authentication authentication;
+ if (httpProxyUseDigestAuth) {
+ authentication = new DigestAuthentication(new
URI(httpProxyAuthUri),
+ httpProxyRealm, httpProxyUsername, httpProxyPassword);
+ } else {
+ authentication = new BasicAuthentication(new
URI(httpProxyAuthUri),
+ httpProxyRealm, httpProxyUsername, httpProxyPassword);
+ }
+
httpClient.getAuthenticationStore().addAuthentication(authentication);
+ }
// support restarts
if (null == this.session) {
- this.session = new SalesforceSession(httpClient, loginConfig);
+ this.session = new SalesforceSession(httpClient,
httpClient.getTimeout(), loginConfig);
}
+ // set session before calling start()
+ httpClient.setSession(this.session);
+
+ // start the Jetty client to initialize thread pool, etc.
+ httpClient.start();
// login at startup if lazyLogin is disabled
if (!loginConfig.isLazyLogin()) {
@@ -441,6 +466,83 @@ public class SalesforceComponent extends
UriEndpointComponent implements Endpoin
this.httpProxyPassword = httpProxyPassword;
}
+ public boolean isHttpProxySocks4() {
+ return isHttpProxySocks4;
+ }
+
+ /**
+ * Enable for Socks4 proxy, false by default
+ */
+ public void setIsHttpProxySocks4(boolean isHttpProxySocks4) {
+ this.isHttpProxySocks4 = isHttpProxySocks4;
+ }
+
+ public boolean isHttpProxySecure() {
+ return isHttpProxySecure;
+ }
+
+ /**
+ * Enable for TLS connections, true by default
+ */
+ public void setIsHttpProxySecure(boolean isHttpProxySecure) {
+ this.isHttpProxySecure = isHttpProxySecure;
+ }
+
+ public Set<String> getHttpProxyIncludedAddresses() {
+ return httpProxyIncludedAddresses;
+ }
+
+ /**
+ * HTTP proxy included addresses
+ */
+ public void setHttpProxyIncludedAddresses(Set<String>
httpProxyIncludedAddresses) {
+ this.httpProxyIncludedAddresses = httpProxyIncludedAddresses;
+ }
+
+ public Set<String> getHttpProxyExcludedAddresses() {
+ return httpProxyExcludedAddresses;
+ }
+
+ /**
+ * HTTP proxy excluded addresses
+ */
+ public void setHttpProxyExcludedAddresses(Set<String>
httpProxyExcludedAddresses) {
+ this.httpProxyExcludedAddresses = httpProxyExcludedAddresses;
+ }
+
+ public String getHttpProxyAuthUri() {
+ return httpProxyAuthUri;
+ }
+
+ /**
+ * HTTP proxy authentication URI
+ */
+ public void setHttpProxyAuthUri(String httpProxyAuthUri) {
+ this.httpProxyAuthUri = httpProxyAuthUri;
+ }
+
+ public String getHttpProxyRealm() {
+ return httpProxyRealm;
+ }
+
+ /**
+ * HTTP proxy authentication realm
+ */
+ public void setHttpProxyRealm(String httpProxyRealm) {
+ this.httpProxyRealm = httpProxyRealm;
+ }
+
+ public boolean isHttpProxyUseDigestAuth() {
+ return httpProxyUseDigestAuth;
+ }
+
+ /**
+ * Use HTTP proxy Digest authentication, false by default
+ */
+ public void setHttpProxyUseDigestAuth(boolean httpProxyUseDigestAuth) {
+ this.httpProxyUseDigestAuth = httpProxyUseDigestAuth;
+ }
+
public String[] getPackages() {
return packages;
}
@@ -469,5 +571,4 @@ public class SalesforceComponent extends
UriEndpointComponent implements Endpoin
public Map<String, Class<?>> getClassMap() {
return classMap;
}
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
----------------------------------------------------------------------
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
index e25198e5c..2bb6306 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
@@ -28,7 +28,6 @@ import
org.apache.camel.component.salesforce.internal.dto.NotifyForFieldsEnum;
import
org.apache.camel.component.salesforce.internal.dto.NotifyForOperationsEnum;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
-import org.eclipse.jetty.client.HttpClient;
/**
* Salesforce Endpoint configuration.
@@ -71,6 +70,9 @@ public class SalesforceEndpointConfig implements Cloneable {
public static final String REPORT_METADATA = "reportMetadata";
public static final String INSTANCE_ID = "instanceId";
+ // default maximum authentication retries on failed authentication or
expired session
+ public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4;
+
// general properties
@UriParam
private String apiVersion = DEFAULT_VERSION;
@@ -139,9 +141,9 @@ public class SalesforceEndpointConfig implements Cloneable {
@UriParam
private String instanceId;
- // Jetty HttpClient, set using reference
+ // Salesforce Jetty9 HttpClient, set using reference
@UriParam
- private HttpClient httpClient;
+ private SalesforceHttpClient httpClient;
public SalesforceEndpointConfig copy() {
try {
@@ -475,11 +477,11 @@ public class SalesforceEndpointConfig implements
Cloneable {
/**
* Custom Jetty Http Client to use to connect to Salesforce.
*/
- public void setHttpClient(HttpClient httpClient) {
+ public void setHttpClient(SalesforceHttpClient httpClient) {
this.httpClient = httpClient;
}
- public HttpClient getHttpClient() {
+ public SalesforceHttpClient getHttpClient() {
return httpClient;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
----------------------------------------------------------------------
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
new file mode 100644
index 0000000..6bca3f8
--- /dev/null
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import
org.apache.camel.component.salesforce.internal.client.SalesforceHttpRequest;
+import
org.apache.camel.component.salesforce.internal.client.SalesforceSecurityHandler;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpClientTransport;
+import org.eclipse.jetty.client.HttpConversation;
+import org.eclipse.jetty.client.HttpRequest;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+/**
+ * Custom Salesforce HTTP Client that creates {@link SalesforceHttpRequest}
requests.
+ */
+public class SalesforceHttpClient extends HttpClient {
+
+ // default total request timeout in msecs
+ static final long DEFAULT_TIMEOUT = 60000;
+
+ private static final int DEFAULT_MAX_RETRIES = 3;
+ private static final int DEFAULT_MAX_CONTENT_LENGTH = 4 * 1024 * 1024;
+
+ private SalesforceSession session;
+ private int maxRetries = DEFAULT_MAX_RETRIES;
+ private int maxContentLength = DEFAULT_MAX_CONTENT_LENGTH;
+ private long timeout = DEFAULT_TIMEOUT;
+
+ public SalesforceHttpClient() {
+ }
+
+ public SalesforceHttpClient(SslContextFactory sslContextFactory) {
+ super(sslContextFactory);
+ }
+
+ public SalesforceHttpClient(HttpClientTransport transport,
SslContextFactory sslContextFactory) {
+ super(transport, sslContextFactory);
+ }
+
+ @Override
+ public HttpRequest newHttpRequest(HttpConversation conversation, URI uri) {
+ final SalesforceHttpRequest request = new SalesforceHttpRequest(this,
conversation, uri);
+ request.timeout(timeout, TimeUnit.MILLISECONDS);
+ return request;
+ }
+
+ @Override
+ public Request copyRequest(HttpRequest oldRequest, URI newURI) {
+ return super.copyRequest(oldRequest, newURI);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (getSession() == null) {
+ throw new IllegalStateException("Missing SalesforceSession in
property session!");
+ }
+ getProtocolHandlers().add(new SalesforceSecurityHandler(this));
+ super.doStart();
+ }
+
+ public SalesforceSession getSession() {
+ return session;
+ }
+
+ public void setSession(SalesforceSession session) {
+ this.session = session;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public void setMaxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ }
+
+ public int getMaxContentLength() {
+ return maxContentLength;
+ }
+
+ public void setMaxContentLength(int maxContentLength) {
+ this.maxContentLength = maxContentLength;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
----------------------------------------------------------------------
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
index 2385f94..6096260 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
@@ -35,6 +35,8 @@ public class Address extends GeoLocation {
private String street;
+ private String geocodeAccuracy;
+
public String getCity() {
return city;
}
@@ -90,4 +92,12 @@ public class Address extends GeoLocation {
public void setStreet(String street) {
this.street = street;
}
+
+ public String getGeocodeAccuracy() {
+ return geocodeAccuracy;
+ }
+
+ public void setGeocodeAccuracy(String geocodeAccuracy) {
+ this.geocodeAccuracy = geocodeAccuracy;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
----------------------------------------------------------------------
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
index a07cb6a..46d6944 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.salesforce.api.dto;
import com.thoughtworks.xstream.annotations.XStreamAlias;
+import org.codehaus.jackson.annotate.JsonProperty;
+
/**
* DTO for Salesforce Resources.
*/
@@ -46,6 +48,12 @@ public class RestResources extends AbstractDTOBase {
private String actions;
private String tabs;
private String wave;
+ @JsonProperty("async-queries")
+ @XStreamAlias("async-queries")
+ private String asyncQueries;
+ @JsonProperty("exchange-connect")
+ @XStreamAlias("exchange-connect")
+ private String exchangeConnect;
public String getSobjects() {
return sobjects;
@@ -222,4 +230,20 @@ public class RestResources extends AbstractDTOBase {
public void setWave(String wave) {
this.wave = wave;
}
+
+ public String getAsyncQueries() {
+ return asyncQueries;
+ }
+
+ public void setAsyncQueries(String asyncQueries) {
+ this.asyncQueries = asyncQueries;
+ }
+
+ public String getExchangeConnect() {
+ return exchangeConnect;
+ }
+
+ public void setExchangeConnect(String exchangeConnect) {
+ this.exchangeConnect = exchangeConnect;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
----------------------------------------------------------------------
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
index bf3a395..0fbe8ec 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
@@ -17,12 +17,17 @@
package org.apache.camel.component.salesforce.internal;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.camel.Service;
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.SalesforceLoginConfig;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.api.dto.RestError;
@@ -30,15 +35,13 @@ import
org.apache.camel.component.salesforce.internal.dto.LoginError;
import org.apache.camel.component.salesforce.internal.dto.LoginToken;
import org.apache.camel.util.ObjectHelper;
import org.codehaus.jackson.map.ObjectMapper;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.client.HttpConversation;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.util.FormContentProvider;
+import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.io.Buffer;
-import org.eclipse.jetty.io.ByteArrayBuffer;
-import org.eclipse.jetty.util.StringUtil;
-import org.eclipse.jetty.util.UrlEncoded;
+import org.eclipse.jetty.util.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,9 +51,9 @@ public class SalesforceSession implements Service {
private static final String OAUTH2_TOKEN_PATH = "/services/oauth2/token";
private static final Logger LOG =
LoggerFactory.getLogger(SalesforceSession.class);
- private static final String FORM_CONTENT_TYPE =
"application/x-www-form-urlencoded;charset=utf-8";
- private final HttpClient httpClient;
+ private final SalesforceHttpClient httpClient;
+ private final long timeout;
private final SalesforceLoginConfig config;
@@ -60,7 +63,7 @@ public class SalesforceSession implements Service {
private volatile String accessToken;
private volatile String instanceUrl;
- public SalesforceSession(HttpClient httpClient, SalesforceLoginConfig
config) {
+ public SalesforceSession(SalesforceHttpClient httpClient, long timeout,
SalesforceLoginConfig config) {
// validate parameters
ObjectHelper.notNull(httpClient, "httpClient");
ObjectHelper.notNull(config, "SalesforceLoginConfig");
@@ -71,6 +74,7 @@ public class SalesforceSession implements Service {
ObjectHelper.notNull(config.getPassword(), "password");
this.httpClient = httpClient;
+ this.timeout = timeout;
this.config = config;
// strip trailing '/'
@@ -100,144 +104,133 @@ public class SalesforceSession implements Service {
}
// login to Salesforce and get session id
- final StatusExceptionExchange loginPost = new
StatusExceptionExchange(true);
- String url = config.getLoginUrl() + OAUTH2_TOKEN_PATH;
- loginPost.setURL(url);
- loginPost.setMethod(HttpMethods.POST);
- loginPost.setRequestContentType(FORM_CONTENT_TYPE);
-
- final UrlEncoded nvps = new UrlEncoded();
- nvps.put("grant_type", "password");
- nvps.put("client_id", config.getClientId());
- nvps.put("client_secret", config.getClientSecret());
- nvps.put("username", config.getUserName());
- nvps.put("password", config.getPassword());
- nvps.put("format", "json");
-
+ final Request loginPost = getLoginRequest(null);
try {
- LOG.info("Login user {} at Salesforce url: {}",
config.getUserName(), url);
-
- // set form content
- loginPost.setRequestContent(new ByteArrayBuffer(
- nvps.encode(StringUtil.__UTF8,
true).getBytes(StringUtil.__UTF8)));
- httpClient.send(loginPost);
-
- // wait for the login to finish
- final int exchangeState = loginPost.waitForDone();
-
- switch (exchangeState) {
- case HttpExchange.STATUS_COMPLETED:
- final byte[] responseContent =
loginPost.getResponseContentBytes();
- final int responseStatus = loginPost.getResponseStatus();
-
- switch (responseStatus) {
- case HttpStatus.OK_200:
- // parse the response to get token
- LoginToken token =
objectMapper.readValue(responseContent, LoginToken.class);
-
- // don't log token or instance URL for security reasons
- LOG.info("Login successful");
- accessToken = token.getAccessToken();
- instanceUrl = token.getInstanceUrl();
-
- // notify all listeners
- for (SalesforceSessionListener listener : listeners) {
- try {
- listener.onLogin(accessToken, instanceUrl);
- } catch (Throwable t) {
- LOG.warn("Unexpected error from listener {}:
{}", listener, t.getMessage());
- }
- }
-
- break;
-
- case HttpStatus.BAD_REQUEST_400:
- // parse the response to get error
- final LoginError error =
objectMapper.readValue(responseContent, LoginError.class);
- final String msg = String.format("Login error
code:[%s] description:[%s]",
- error.getError(), error.getErrorDescription());
- final List<RestError> errors = new
ArrayList<RestError>();
- errors.add(new RestError(msg,
error.getErrorDescription()));
- throw new SalesforceException(errors,
HttpStatus.BAD_REQUEST_400);
-
- default:
- throw new SalesforceException(String.format("Login
error status:[%s] reason:[%s]",
- responseStatus, loginPost.getReason()),
responseStatus);
- }
- break;
-
- case HttpExchange.STATUS_EXCEPTED:
- final Throwable ex = loginPost.getException();
- throw new SalesforceException(
- String.format("Unexpected login exception: %s",
ex.getMessage()), ex);
+ final ContentResponse loginResponse = loginPost.send();
+ parseLoginResponse(loginResponse,
loginResponse.getContentAsString());
- case HttpExchange.STATUS_CANCELLED:
- throw new SalesforceException("Login request CANCELLED!",
null);
-
- case HttpExchange.STATUS_EXPIRED:
- throw new SalesforceException("Login request TIMEOUT!",
null);
-
- default:
- throw new SalesforceException("Unknow status: " +
exchangeState, null);
- }
- } catch (IOException e) {
- String msg = "Login error: unexpected exception " +
e.getMessage();
- throw new SalesforceException(msg, e);
} catch (InterruptedException e) {
- String msg = "Login error: unexpected exception " +
e.getMessage();
- throw new SalesforceException(msg, e);
+ throw new SalesforceException("Login error: " +
e.getMessage(), e);
+ } catch (TimeoutException e) {
+ throw new SalesforceException("Login request timeout: " +
e.getMessage(), e);
+ } catch (ExecutionException e) {
+ throw new SalesforceException("Unexpected login error: " +
e.getCause().getMessage(), e.getCause());
}
}
return accessToken;
}
- public synchronized void logout() throws SalesforceException {
- if (accessToken == null) {
- return;
+ /**
+ * Creates login request, allows SalesforceSecurityHandler to create a
login request for a failed authentication conversation
+ * @return login POST request.
+ */
+ public Request getLoginRequest(HttpConversation conversation) {
+ final String loginUrl = (instanceUrl == null ? config.getLoginUrl() :
instanceUrl) + OAUTH2_TOKEN_PATH;
+ LOG.info("Login user {} at Salesforce loginUrl: {}",
config.getUserName(), loginUrl);
+ final Fields fields = new Fields(true);
+
+ fields.put("grant_type", "password");
+ fields.put("client_id", config.getClientId());
+ fields.put("client_secret", config.getClientSecret());
+ fields.put("username", config.getUserName());
+ fields.put("password", config.getPassword());
+ fields.put("format", "json");
+
+ final Request post;
+ if (conversation == null) {
+ post = httpClient.POST(loginUrl);
+ } else {
+ post = httpClient.newHttpRequest(conversation,
URI.create(loginUrl))
+ .method(HttpMethod.POST);
}
- StatusExceptionExchange logoutGet = new StatusExceptionExchange(true);
- logoutGet.setURL(config.getLoginUrl() + OAUTH2_REVOKE_PATH +
accessToken);
- logoutGet.setMethod(HttpMethods.GET);
+ return post.content(new FormContentProvider(fields))
+ .timeout(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Parses login response, allows SalesforceSecurityHandler to parse a
login request for a failed authentication conversation.
+ * @param loginResponse
+ * @param responseContent
+ * @throws SalesforceException
+ */
+ public synchronized void parseLoginResponse(ContentResponse loginResponse,
String responseContent) throws SalesforceException {
+ final int responseStatus = loginResponse.getStatus();
try {
- httpClient.send(logoutGet);
- final int done = logoutGet.waitForDone();
- switch (done) {
- case HttpExchange.STATUS_COMPLETED:
- final int statusCode = logoutGet.getResponseStatus();
- final String reason = logoutGet.getReason();
-
- if (statusCode == HttpStatus.OK_200) {
- LOG.info("Logout successful");
- } else {
- throw new SalesforceException(
- String.format("Logout error, code: [%s] reason:
[%s]",
- statusCode, reason),
- statusCode);
+ switch (responseStatus) {
+ case HttpStatus.OK_200:
+ // parse the response to get token
+ LoginToken token = objectMapper.readValue(responseContent,
LoginToken.class);
+
+ // don't log token or instance URL for security reasons
+ LOG.info("Login successful");
+ accessToken = token.getAccessToken();
+ instanceUrl = token.getInstanceUrl();
+
+ // notify all session listeners
+ for (SalesforceSessionListener listener : listeners) {
+ try {
+ listener.onLogin(accessToken, instanceUrl);
+ } catch (Throwable t) {
+ LOG.warn("Unexpected error from listener {}: {}",
listener, t.getMessage());
+ }
}
+
break;
- case HttpExchange.STATUS_EXCEPTED:
- final Throwable ex = logoutGet.getException();
- throw new SalesforceException("Unexpected logout exception: "
+ ex.getMessage(), ex);
+ case HttpStatus.BAD_REQUEST_400:
+ // parse the response to get error
+ final LoginError error =
objectMapper.readValue(responseContent, LoginError.class);
+ final String msg = String.format("Login error code:[%s]
description:[%s]",
+ error.getError(), error.getErrorDescription());
+ final List<RestError> errors = new ArrayList<RestError>();
+ errors.add(new RestError(msg, error.getErrorDescription()));
+ throw new SalesforceException(errors,
HttpStatus.BAD_REQUEST_400);
- case HttpExchange.STATUS_CANCELLED:
- throw new SalesforceException("Logout request CANCELLED!",
null);
+ default:
+ throw new SalesforceException(String.format("Login error
status:[%s] reason:[%s]",
+ responseStatus, loginResponse.getReason()),
responseStatus);
+ }
+ } catch (IOException e) {
+ String msg = "Login error: response parse exception " +
e.getMessage();
+ throw new SalesforceException(msg, e);
+ }
+ }
- case HttpExchange.STATUS_EXPIRED:
- throw new SalesforceException("Logout request TIMEOUT!", null);
+ public synchronized void logout() throws SalesforceException {
+ if (accessToken == null) {
+ return;
+ }
- default:
- throw new SalesforceException("Unknown status: " + done, null);
+ try {
+ String logoutUrl = (instanceUrl == null ? config.getLoginUrl() :
instanceUrl) + OAUTH2_REVOKE_PATH + accessToken;
+ final Request logoutGet = httpClient.newRequest(logoutUrl)
+ .timeout(timeout, TimeUnit.MILLISECONDS);
+ final ContentResponse logoutResponse = logoutGet.send();
+
+ final int statusCode = logoutResponse.getStatus();
+ final String reason = logoutResponse.getReason();
+
+ if (statusCode == HttpStatus.OK_200) {
+ LOG.info("Logout successful");
+ } else {
+ throw new SalesforceException(
+ String.format("Logout error, code: [%s] reason: [%s]",
+ statusCode, reason),
+ statusCode);
}
- } catch (SalesforceException e) {
- throw e;
- } catch (Exception e) {
+
+ } catch (InterruptedException e) {
String msg = "Logout error: " + e.getMessage();
throw new SalesforceException(msg, e);
+ } catch (ExecutionException e) {
+ final Throwable ex = e.getCause();
+ throw new SalesforceException("Unexpected logout exception: " +
ex.getMessage(), ex);
+ } catch (TimeoutException e) {
+ throw new SalesforceException("Logout request TIMEOUT!", null);
} finally {
// reset session
accessToken = null;
@@ -281,45 +274,8 @@ public class SalesforceSession implements Service {
logout();
}
- /**
- * Records status line, and exception from exchange.
- */
- private static class StatusExceptionExchange extends ContentExchange {
-
- private String reason;
- private Throwable exception;
-
- public StatusExceptionExchange(boolean cacheFields) {
- super(cacheFields);
- }
-
- @Override
- protected synchronized void onResponseStatus(Buffer version, int
status, Buffer reason) throws IOException {
- // remember reason
- this.reason = reason.toString(StringUtil.__ISO_8859_1);
- super.onResponseStatus(version, status, reason);
- }
-
- @Override
- protected void onConnectionFailed(Throwable x) {
- this.exception = x;
- super.onConnectionFailed(x);
- }
-
- @Override
- protected void onException(Throwable x) {
- this.exception = x;
- super.onException(x);
- }
-
- public String getReason() {
- return reason;
- }
-
- public Throwable getException() {
- return exception;
- }
-
+ public long getTimeout() {
+ return timeout;
}
public interface SalesforceSessionListener {
@@ -327,5 +283,4 @@ public class SalesforceSession implements Service {
void onLogout();
}
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
----------------------------------------------------------------------
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
index b61c161..757293e 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
@@ -16,21 +16,27 @@
*/
package org.apache.camel.component.salesforce.internal.client;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.Service;
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpEventListenerWrapper;
-import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.http.HttpSchemes;
+import org.eclipse.jetty.client.HttpContentResponse;
+import org.eclipse.jetty.client.api.ContentProvider;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.ByteBufferContentProvider;
+import org.eclipse.jetty.client.util.InputStreamContentProvider;
+import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.io.Buffer;
-import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,15 +47,15 @@ public abstract class AbstractClientBase implements
SalesforceSession.Salesforce
protected final Logger log = LoggerFactory.getLogger(getClass());
- protected final HttpClient httpClient;
+ protected final SalesforceHttpClient httpClient;
protected final SalesforceSession session;
protected final String version;
protected String accessToken;
protected String instanceUrl;
- public AbstractClientBase(String version,
- SalesforceSession session, HttpClient
httpClient) throws SalesforceException {
+ public AbstractClientBase(String version, SalesforceSession session,
+ SalesforceHttpClient httpClient) throws
SalesforceException {
this.version = version;
this.session = session;
@@ -89,111 +95,102 @@ public abstract class AbstractClientBase implements
SalesforceSession.Salesforce
// SalesforceSecurityListener will auto login!
}
- protected SalesforceExchange getContentExchange(String method, String url)
{
- SalesforceExchange get = new SalesforceExchange();
- get.setMethod(method);
- get.setURL(url);
- get.setClient(this);
- return get;
+ protected Request getRequest(HttpMethod method, String url) {
+ return getRequest(method.asString(), url);
+ }
+
+ protected Request getRequest(String method, String url) {
+ SalesforceHttpRequest request = (SalesforceHttpRequest)
httpClient.newRequest(url)
+ .method(method)
+ .timeout(session.getTimeout(), TimeUnit.MILLISECONDS);
+
request.getConversation().setAttribute(SalesforceSecurityHandler.CLIENT_ATTRIBUTE,
this);
+ return request;
}
protected interface ClientResponseCallback {
void onResponse(InputStream response, SalesforceException ex);
}
- protected void doHttpRequest(final ContentExchange request, final
ClientResponseCallback callback) {
+ protected void doHttpRequest(final Request request, final
ClientResponseCallback callback) {
+ // Highly memory inefficient,
+ // but buffer the request content to allow it to be replayed for
authentication retries
+ final ContentProvider content = request.getContent();
+ if (content instanceof InputStreamContentProvider) {
+ final List<ByteBuffer> buffers = new ArrayList<>();
+ for (ByteBuffer buffer : content) {
+ buffers.add(buffer);
+ }
+ request.content(new ByteBufferContentProvider(buffers.toArray(new
ByteBuffer[buffers.size()])));
+ buffers.clear();
+ }
- // use SalesforceSecurityListener for security login retries
- final SalesforceSecurityListener securityListener;
- try {
- final boolean isHttps =
HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme()));
- securityListener = new SalesforceSecurityListener(
- httpClient.getDestination(request.getAddress(), isHttps),
- request, session, accessToken) {
+ // execute the request
+ request.send(new
BufferingResponseListener(httpClient.getMaxContentLength()) {
+ @Override
+ public void onComplete(Result result) {
+ Response response = result.getResponse();
+ if (result.isFailed()) {
+
+ // Failure!!!
+ // including Salesforce errors reported as exception from
SalesforceSecurityHandler
+ Throwable failure = result.getFailure();
+ if (failure instanceof SalesforceException) {
+ callback.onResponse(null, (SalesforceException)
failure);
+ } else {
+ final String msg = String.format("Unexpected error
{%s:%s} executing {%s:%s}",
+ response.getStatus(), response.getReason(),
request.getMethod(), request.getURI());
+ callback.onResponse(null, new SalesforceException(msg,
response.getStatus(), failure));
+ }
+ } else {
- private String reason;
+ // HTTP error status
+ final int status = response.getStatus();
+ SalesforceHttpRequest request = (SalesforceHttpRequest)
((SalesforceHttpRequest) result.getRequest())
+ .getConversation()
+
.getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE);
- @Override
- public void onResponseStatus(Buffer version, int status,
Buffer reason) throws IOException {
- super.onResponseStatus(version, status, reason);
- // remember status reason
- this.reason = reason.toString(StringUtil.__ISO_8859_1);
- }
+ if (status == HttpStatus.BAD_REQUEST_400 && request !=
null) {
+ // parse login error
+ ContentResponse contentResponse = new
HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
+ try {
- @Override
- protected SalesforceException createExceptionResponse() {
- final int responseStatus = request.getResponseStatus();
- if (responseStatus < HttpStatus.OK_200 || responseStatus
>= HttpStatus.MULTIPLE_CHOICES_300) {
- final String msg = String.format("Error {%s:%s}
executing {%s:%s}",
- responseStatus, reason, request.getMethod(),
request.getRequestURI());
- return new SalesforceException(msg, responseStatus,
createRestException(request, reason));
- } else {
- return super.createExceptionResponse();
- }
- }
- };
- } catch (IOException e) {
- // propagate exception
- callback.onResponse(null, new SalesforceException(
- String.format("Error registering security listener: %s",
e.getMessage()),
- e));
- return;
- }
+ session.parseLoginResponse(contentResponse,
getContentAsString());
+ final String msg = String.format("Unexpected Error
{%s:%s} executing {%s:%s}",
+ status, response.getReason(),
request.getMethod(), request.getURI());
+ callback.onResponse(null, new
SalesforceException(msg, null));
- // use HttpEventListener for lifecycle events
- request.setEventListener(new
HttpEventListenerWrapper(request.getEventListener(), true) {
+ } catch (SalesforceException e) {
- @Override
- public void onConnectionFailed(Throwable ex) {
- super.onConnectionFailed(ex);
- callback.onResponse(null,
- new SalesforceException("Connection error: " +
ex.getMessage(), ex));
- }
+ final String msg = String.format("Error {%s:%s}
executing {%s:%s}",
+ status, response.getReason(),
request.getMethod(), request.getURI());
+ callback.onResponse(null, new
SalesforceException(msg, response.getStatus(), e));
- @Override
- public void onException(Throwable ex) {
- super.onException(ex);
- callback.onResponse(null,
- new SalesforceException("Unexpected exception: " +
ex.getMessage(), ex));
- }
+ }
+ } else if (status < HttpStatus.OK_200 || status >=
HttpStatus.MULTIPLE_CHOICES_300) {
- @Override
- public void onExpire() {
- super.onExpire();
- callback.onResponse(null,
- new SalesforceException("Request expired", null));
- }
+ // Salesforce HTTP failure!
+ request = (SalesforceHttpRequest) result.getRequest();
+ final String msg = String.format("Error {%s:%s}
executing {%s:%s}",
+ status, response.getReason(), request.getMethod(),
request.getURI());
+ final SalesforceException cause =
createRestException(response, getContentAsInputStream());
+ callback.onResponse(null, new SalesforceException(msg,
response.getStatus(), cause));
- @Override
- public void onResponseComplete() throws IOException {
- super.onResponseComplete();
+ } else {
- SalesforceException e =
securityListener.getExceptionResponse();
- if (e != null) {
- callback.onResponse(null, e);
- } else {
- // TODO not memory efficient for large response messages,
- // doesn't seem to be possible in Jetty 7 to directly
stream to response parsers
- final byte[] bytes = request.getResponseContentBytes();
- callback.onResponse(bytes != null ? new
ByteArrayInputStream(bytes) : null, null);
+ // Success!!!
+ callback.onResponse(getContentAsInputStream(), null);
+ }
}
+ }
+ @Override
+ public InputStream getContentAsInputStream() {
+ if (getContent().length == 0) {
+ return null;
+ }
+ return super.getContentAsInputStream();
}
});
-
- // wrap the above lifecycle event listener with
SalesforceSecurityListener
- securityListener.setEventListener(request.getEventListener());
- request.setEventListener(securityListener);
-
- // execute the request
- try {
- httpClient.send(request);
- } catch (IOException e) {
- String msg = "Unexpected Error: " + e.getMessage();
- // send error through callback
- callback.onResponse(null, new SalesforceException(msg, e));
- }
-
}
public void setAccessToken(String accessToken) {
@@ -204,8 +201,8 @@ public abstract class AbstractClientBase implements
SalesforceSession.Salesforce
this.instanceUrl = instanceUrl;
}
- protected abstract void setAccessToken(HttpExchange httpExchange);
+ protected abstract void setAccessToken(Request request);
- protected abstract SalesforceException createRestException(ContentExchange
httpExchange, String reason);
+ protected abstract SalesforceException createRestException(Response
response, InputStream responseContent);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
----------------------------------------------------------------------
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
index 1041f7f..f6e72dc 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.api.dto.RestError;
import
org.apache.camel.component.salesforce.api.dto.analytics.reports.AsyncReportResults;
@@ -33,12 +34,11 @@ import
org.apache.camel.component.salesforce.api.dto.analytics.reports.SyncRepor
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.http.HttpHeaders;
-import org.eclipse.jetty.http.HttpMethods;
-import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.util.StringUtil;
/**
@@ -51,7 +51,8 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
private ObjectMapper objectMapper;
- public DefaultAnalyticsApiClient(String version, SalesforceSession
session, HttpClient httpClient) throws SalesforceException {
+ public DefaultAnalyticsApiClient(String version, SalesforceSession session,
+ SalesforceHttpClient httpClient) throws
SalesforceException {
super(version, session, httpClient);
objectMapper = new ObjectMapper();
@@ -60,16 +61,16 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
@Override
public void getRecentReports(final RecentReportsResponseCallback callback)
{
- final ContentExchange contentExchange =
getContentExchange(HttpMethods.GET, reportsUrl());
+ final Request Request = getRequest(HttpMethod.GET, reportsUrl());
- doHttpRequest(contentExchange, new ClientResponseCallback() {
+ doHttpRequest(Request, new ClientResponseCallback() {
@Override
@SuppressWarnings("unchecked")
public void onResponse(InputStream response, SalesforceException
ex) {
List<RecentReport> recentReports = null;
if (response != null) {
try {
- recentReports = unmarshalResponse(response,
contentExchange,
+ recentReports = unmarshalResponse(response, Request,
new TypeReference<List<RecentReport>>() {
}
);
@@ -85,14 +86,14 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
@Override
public void getReportDescription(String reportId, final
ReportDescriptionResponseCallback callback) {
- final ContentExchange contentExchange =
getContentExchange(HttpMethods.GET, reportsDescribeUrl(reportId));
+ final Request Request = getRequest(HttpMethod.GET,
reportsDescribeUrl(reportId));
- doHttpRequest(contentExchange, new ClientResponseCallback() {
+ doHttpRequest(Request, new ClientResponseCallback() {
@Override
public void onResponse(InputStream response, SalesforceException
ex) {
ReportDescription reportDescription = null;
try {
- reportDescription = unmarshalResponse(response,
contentExchange, ReportDescription.class);
+ reportDescription = unmarshalResponse(response, Request,
ReportDescription.class);
} catch (SalesforceException e) {
ex = e;
}
@@ -106,8 +107,8 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
final ReportResultsResponseCallback
callback) {
final boolean useGet = reportMetadata == null;
- final ContentExchange contentExchange = getContentExchange(
- useGet ? HttpMethods.GET : HttpMethods.POST, reportsUrl(reportId,
includeDetails));
+ final Request Request = getRequest(
+ useGet ? HttpMethod.GET : HttpMethod.POST, reportsUrl(reportId,
includeDetails));
// set POST data
if (!useGet) {
@@ -115,19 +116,19 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
// wrap reportMetadata in a map
final HashMap<String, Object> request = new HashMap<String,
Object>();
request.put("reportMetadata", reportMetadata);
- marshalRequest(request, contentExchange);
+ marshalRequest(request, Request);
} catch (SalesforceException e) {
callback.onResponse(null, e);
return;
}
}
- doHttpRequest(contentExchange, new ClientResponseCallback() {
+ doHttpRequest(Request, new ClientResponseCallback() {
@Override
public void onResponse(InputStream response, SalesforceException
ex) {
SyncReportResults reportResults = null;
try {
- reportResults = unmarshalResponse(response,
contentExchange, SyncReportResults.class);
+ reportResults = unmarshalResponse(response, Request,
SyncReportResults.class);
} catch (SalesforceException e) {
ex = e;
}
@@ -140,7 +141,7 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
public void executeAsyncReport(String reportId, Boolean includeDetails,
ReportMetadata reportMetadata,
final ReportInstanceResponseCallback
callback) {
- final ContentExchange contentExchange =
getContentExchange(HttpMethods.POST,
+ final Request Request = getRequest(HttpMethod.POST,
reportInstancesUrl(reportId, includeDetails));
// set POST data
@@ -149,19 +150,19 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
// wrap reportMetadata in a map
final HashMap<String, Object> request = new HashMap<String,
Object>();
request.put("reportMetadata", reportMetadata);
- marshalRequest(request, contentExchange);
+ marshalRequest(request, Request);
} catch (SalesforceException e) {
callback.onResponse(null, e);
return;
}
}
- doHttpRequest(contentExchange, new ClientResponseCallback() {
+ doHttpRequest(Request, new ClientResponseCallback() {
@Override
public void onResponse(InputStream response, SalesforceException
ex) {
ReportInstance reportInstance = null;
try {
- reportInstance = unmarshalResponse(response,
contentExchange, ReportInstance.class);
+ reportInstance = unmarshalResponse(response, Request,
ReportInstance.class);
} catch (SalesforceException e) {
ex = e;
}
@@ -173,16 +174,16 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
@Override
public void getReportInstances(String reportId, final
ReportInstanceListResponseCallback callback) {
- final ContentExchange contentExchange =
getContentExchange(HttpMethods.GET, reportInstancesUrl(reportId));
+ final Request Request = getRequest(HttpMethod.GET,
reportInstancesUrl(reportId));
- doHttpRequest(contentExchange, new ClientResponseCallback() {
+ doHttpRequest(Request, new ClientResponseCallback() {
@Override
@SuppressWarnings("unchecked")
public void onResponse(InputStream response, SalesforceException
ex) {
List<ReportInstance> reportInstances = null;
if (response != null) {
try {
- reportInstances = unmarshalResponse(response,
contentExchange,
+ reportInstances = unmarshalResponse(response, Request,
new TypeReference<List<ReportInstance>>() {
}
);
@@ -198,15 +199,15 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
@Override
public void getReportResults(String reportId, String instanceId, final
ReportResultsResponseCallback callback) {
- final ContentExchange contentExchange =
getContentExchange(HttpMethods.GET,
+ final Request Request = getRequest(HttpMethod.GET,
reportInstancesUrl(reportId, instanceId));
- doHttpRequest(contentExchange, new ClientResponseCallback() {
+ doHttpRequest(Request, new ClientResponseCallback() {
@Override
public void onResponse(InputStream response, SalesforceException
ex) {
AsyncReportResults reportResults = null;
try {
- reportResults = unmarshalResponse(response,
contentExchange, AsyncReportResults.class);
+ reportResults = unmarshalResponse(response, Request,
AsyncReportResults.class);
} catch (SalesforceException e) {
ex = e;
}
@@ -247,16 +248,15 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
}
@Override
- protected void setAccessToken(HttpExchange httpExchange) {
- httpExchange.setRequestHeader(HttpHeaders.AUTHORIZATION, TOKEN_PREFIX
+ accessToken);
+ protected void setAccessToken(Request request) {
+ // replace old token
+ request.getHeaders().put(HttpHeader.AUTHORIZATION, TOKEN_PREFIX +
accessToken);
}
@Override
- protected SalesforceException createRestException(ContentExchange
httpExchange, String reason) {
- final int statusCode = httpExchange.getResponseStatus();
- String responseContent = null;
+ protected SalesforceException createRestException(Response response,
InputStream responseContent) {
+ final int statusCode = response.getStatus();
try {
- responseContent = httpExchange.getResponseContent();
if (responseContent != null) {
// unmarshal RestError
final List<RestError> errors =
objectMapper.readValue(responseContent,
@@ -277,36 +277,37 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
}
// just report HTTP status info
- return new SalesforceException("Unexpected error: " + reason + ", with
content: " + responseContent,
- statusCode);
+ String message = String.format("Unexpected error: %s, with content:
%s",
+ response.getReason(), responseContent);
+ return new SalesforceException(message, statusCode);
}
@Override
- protected void doHttpRequest(ContentExchange request,
ClientResponseCallback callback) {
+ protected void doHttpRequest(Request request, ClientResponseCallback
callback) {
// set access token for all requests
setAccessToken(request);
// set request and response content type and charset, which is always
JSON for analytics API
- request.setRequestHeader(HttpHeaders.CONTENT_TYPE,
APPLICATION_JSON_UTF8);
- request.setRequestHeader(HttpHeaders.ACCEPT, APPLICATION_JSON_UTF8);
- request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET,
StringUtil.__UTF8);
+ request.header(HttpHeader.CONTENT_TYPE, APPLICATION_JSON_UTF8);
+ request.header(HttpHeader.ACCEPT, APPLICATION_JSON_UTF8);
+ request.header(HttpHeader.ACCEPT_CHARSET, StringUtil.__UTF8);
super.doHttpRequest(request, callback);
}
- private void marshalRequest(Object input, ContentExchange request) throws
SalesforceException {
+ private void marshalRequest(Object input, Request request) throws
SalesforceException {
try {
- request.setRequestContent(new
ByteArrayBuffer(objectMapper.writeValueAsBytes(input)));
+ request.content(new
BytesContentProvider(objectMapper.writeValueAsBytes(input)));
} catch (IOException e) {
throw new SalesforceException(
String.format("Error marshaling request for {%s:%s} : %s",
- request.getMethod(), request.getRequestURI(),
e.getMessage()),
+ request.getMethod(), request.getURI(), e.getMessage()),
e);
}
}
- private <T> T unmarshalResponse(InputStream response, ContentExchange
request,
+ private <T> T unmarshalResponse(InputStream response, Request request,
TypeReference<T> responseTypeReference)
throws SalesforceException {
@@ -315,12 +316,12 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
} catch (IOException e) {
throw new SalesforceException(
String.format("Error unmarshaling response {%s:%s} : %s",
- request.getMethod(), request.getRequestURI(),
e.getMessage()),
+ request.getMethod(), request.getURI(), e.getMessage()),
e);
}
}
- private <T> T unmarshalResponse(InputStream response, ContentExchange
request, Class<T> responseClass)
+ private <T> T unmarshalResponse(InputStream response, Request request,
Class<T> responseClass)
throws SalesforceException {
if (response == null) {
@@ -332,7 +333,7 @@ public class DefaultAnalyticsApiClient extends
AbstractClientBase implements Ana
} catch (IOException e) {
throw new SalesforceException(
String.format("Error unmarshaling response {%s:%s} : %s",
- request.getMethod(), request.getRequestURI(),
e.getMessage()),
+ request.getMethod(), request.getURI(), e.getMessage()),
e);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8dfd66bd/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
----------------------------------------------------------------------
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
index 3ab4227..1f024db 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
@@ -16,9 +16,9 @@
*/
package org.apache.camel.component.salesforce.internal.client;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Collections;
import javax.xml.bind.JAXBContext;
@@ -28,6 +28,7 @@ import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.stream.StreamSource;
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.api.dto.RestError;
import org.apache.camel.component.salesforce.api.dto.bulk.BatchInfo;
@@ -39,12 +40,12 @@ import
org.apache.camel.component.salesforce.api.dto.bulk.JobStateEnum;
import org.apache.camel.component.salesforce.api.dto.bulk.ObjectFactory;
import org.apache.camel.component.salesforce.api.dto.bulk.QueryResultList;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.http.HttpHeaders;
-import org.eclipse.jetty.http.HttpMethods;
-import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.client.util.InputStreamContentProvider;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.util.StringUtil;
public class DefaultBulkApiClient extends AbstractClientBase implements
BulkApiClient {
@@ -55,7 +56,7 @@ public class DefaultBulkApiClient extends AbstractClientBase
implements BulkApiC
private JAXBContext context;
private ObjectFactory objectFactory;
- public DefaultBulkApiClient(String version, SalesforceSession session,
HttpClient httpClient)
+ public DefaultBulkApiClient(String version, SalesforceSession session,
SalesforceHttpClient httpClient)
throws SalesforceException {
super(version, session, httpClient);
@@ -74,7 +75,7 @@ public class DefaultBulkApiClient extends AbstractClientBase
implements BulkApiC
// clear system fields if set
sanitizeJobRequest(request);
- final ContentExchange post = getContentExchange(HttpMethods.POST,
jobUrl(null));
+ final Request post = getRequest(HttpMethod.POST, jobUrl(null));
try {
marshalRequest(objectFactory.createJobInfo(request), post,
APPLICATION_XML_UTF8);
} catch (SalesforceException e) {
@@ -123,7 +124,7 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
@Override
public void getJob(String jobId, final JobInfoResponseCallback callback) {
- final ContentExchange get = getContentExchange(HttpMethods.GET,
jobUrl(jobId));
+ final Request get = getRequest(HttpMethod.GET, jobUrl(jobId));
// make the call and parse the result
doHttpRequest(get, new ClientResponseCallback() {
@@ -145,7 +146,7 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
final JobInfo request = new JobInfo();
request.setState(JobStateEnum.CLOSED);
- final ContentExchange post = getContentExchange(HttpMethods.POST,
jobUrl(jobId));
+ final Request post = getRequest(HttpMethod.POST, jobUrl(jobId));
try {
marshalRequest(objectFactory.createJobInfo(request), post,
APPLICATION_XML_UTF8);
} catch (SalesforceException e) {
@@ -173,7 +174,7 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
final JobInfo request = new JobInfo();
request.setState(JobStateEnum.ABORTED);
- final ContentExchange post = getContentExchange(HttpMethods.POST,
jobUrl(jobId));
+ final Request post = getRequest(HttpMethod.POST, jobUrl(jobId));
try {
marshalRequest(objectFactory.createJobInfo(request), post,
APPLICATION_XML_UTF8);
} catch (SalesforceException e) {
@@ -199,9 +200,9 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
@Override
public void createBatch(InputStream batchStream, String jobId, ContentType
contentTypeEnum,
final BatchInfoResponseCallback callback) {
- final ContentExchange post = getContentExchange(HttpMethods.POST,
batchUrl(jobId, null));
- post.setRequestContentSource(batchStream);
- post.setRequestContentType(getContentType(contentTypeEnum) +
";charset=" + StringUtil.__UTF8);
+ final Request post = getRequest(HttpMethod.POST, batchUrl(jobId,
null));
+ post.content(new InputStreamContentProvider(batchStream));
+ post.header(HttpHeader.CONTENT_TYPE, getContentType(contentTypeEnum) +
";charset=" + StringUtil.__UTF8);
// make the call and parse the result
doHttpRequest(post, new ClientResponseCallback() {
@@ -220,7 +221,7 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
@Override
public void getBatch(String jobId, String batchId, final
BatchInfoResponseCallback callback) {
- final ContentExchange get = getContentExchange(HttpMethods.GET,
batchUrl(jobId, batchId));
+ final Request get = getRequest(HttpMethod.GET, batchUrl(jobId,
batchId));
// make the call and parse the result
doHttpRequest(get, new ClientResponseCallback() {
@@ -239,7 +240,7 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
@Override
public void getAllBatches(String jobId, final
BatchInfoListResponseCallback callback) {
- final ContentExchange get = getContentExchange(HttpMethods.GET,
batchUrl(jobId, null));
+ final Request get = getRequest(HttpMethod.GET, batchUrl(jobId, null));
// make the call and parse the result
doHttpRequest(get, new ClientResponseCallback() {
@@ -258,7 +259,7 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
@Override
public void getRequest(String jobId, String batchId, final
StreamResponseCallback callback) {
- final ContentExchange get = getContentExchange(HttpMethods.GET,
batchUrl(jobId, batchId));
+ final Request get = getRequest(HttpMethod.GET, batchUrl(jobId,
batchId));
// make the call and parse the result
doHttpRequest(get, new ClientResponseCallback() {
@@ -271,7 +272,7 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
@Override
public void getResults(String jobId, String batchId, final
StreamResponseCallback callback) {
- final ContentExchange get = getContentExchange(HttpMethods.GET,
batchResultUrl(jobId, batchId, null));
+ final Request get = getRequest(HttpMethod.GET, batchResultUrl(jobId,
batchId, null));
// make the call and return the result
doHttpRequest(get, new ClientResponseCallback() {
@@ -285,10 +286,16 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
@Override
public void createBatchQuery(String jobId, String soqlQuery, ContentType
jobContentType,
final BatchInfoResponseCallback callback) {
- final ContentExchange post = getContentExchange(HttpMethods.POST,
batchUrl(jobId, null));
- byte[] queryBytes = soqlQuery.getBytes(StringUtil.__UTF8_CHARSET);
- post.setRequestContent(new ByteArrayBuffer(queryBytes));
- post.setRequestContentType(getContentType(jobContentType) +
";charset=" + StringUtil.__UTF8);
+ final Request post = getRequest(HttpMethod.POST, batchUrl(jobId,
null));
+ final byte[] queryBytes;
+ try {
+ queryBytes = soqlQuery.getBytes(StringUtil.__UTF8);
+ } catch (UnsupportedEncodingException e) {
+ callback.onResponse(null, new SalesforceException("Unexpected
exception: " + e.getMessage(), e));
+ return;
+ }
+ post.content(new BytesContentProvider(queryBytes));
+ post.header(HttpHeader.CONTENT_TYPE, getContentType(jobContentType) +
";charset=" + StringUtil.__UTF8);
// make the call and parse the result
doHttpRequest(post, new ClientResponseCallback() {
@@ -307,7 +314,7 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
@Override
public void getQueryResultIds(String jobId, String batchId, final
QueryResultIdsCallback callback) {
- final ContentExchange get = getContentExchange(HttpMethods.GET,
batchResultUrl(jobId, batchId, null));
+ final Request get = getRequest(HttpMethod.GET, batchResultUrl(jobId,
batchId, null));
// make the call and parse the result
doHttpRequest(get, new ClientResponseCallback() {
@@ -326,7 +333,7 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
@Override
public void getQueryResult(String jobId, String batchId, String resultId,
final StreamResponseCallback callback) {
- final ContentExchange get = getContentExchange(HttpMethods.GET,
batchResultUrl(jobId, batchId, resultId));
+ final Request get = getRequest(HttpMethod.GET, batchResultUrl(jobId,
batchId, resultId));
// make the call and parse the result
doHttpRequest(get, new ClientResponseCallback() {
@@ -338,23 +345,24 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
}
@Override
- protected void setAccessToken(HttpExchange httpExchange) {
- httpExchange.setRequestHeader(TOKEN_HEADER, accessToken);
+ protected void setAccessToken(Request request) {
+ // replace old token
+ request.getHeaders().put(TOKEN_HEADER, accessToken);
}
@Override
- protected void doHttpRequest(ContentExchange request,
ClientResponseCallback callback) {
+ protected void doHttpRequest(Request request, ClientResponseCallback
callback) {
// set access token for all requests
setAccessToken(request);
// set default charset
- request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET,
StringUtil.__UTF8);
+ request.header(HttpHeader.ACCEPT_CHARSET, StringUtil.__UTF8);
// TODO check if this is really needed or not, since SF response
content type seems fixed
// check if the default accept content type must be used
- if (!request.getRequestFields().containsKey(HttpHeaders.ACCEPT)) {
+ if (!request.getHeaders().contains(HttpHeader.ACCEPT)) {
final String contentType = getContentType(DEFAULT_ACCEPT_TYPE);
- request.setRequestHeader(HttpHeaders.ACCEPT, contentType);
+ request.header(HttpHeader.ACCEPT, contentType);
// request content type and charset is set by the request entity
}
@@ -386,24 +394,23 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
}
@Override
- protected SalesforceException createRestException(ContentExchange request,
String reason) {
+ protected SalesforceException createRestException(Response response,
InputStream responseContent) {
// this must be of type Error
try {
- final Error error = unmarshalResponse(new
ByteArrayInputStream(request.getResponseContentBytes()),
- request, Error.class);
+ final Error error = unmarshalResponse(responseContent,
response.getRequest(), Error.class);
final RestError restError = new RestError();
restError.setErrorCode(error.getExceptionCode());
restError.setMessage(error.getExceptionMessage());
- return new SalesforceException(Arrays.asList(restError),
request.getResponseStatus());
+ return new SalesforceException(Arrays.asList(restError),
response.getStatus());
} catch (SalesforceException e) {
String msg = "Error un-marshaling Salesforce Error: " +
e.getMessage();
return new SalesforceException(msg, e);
}
}
- private <T> T unmarshalResponse(InputStream response, ContentExchange
request, Class<T> resultClass)
+ private <T> T unmarshalResponse(InputStream response, Request request,
Class<T> resultClass)
throws SalesforceException {
try {
Unmarshaller unmarshaller = context.createUnmarshaller();
@@ -412,32 +419,32 @@ public class DefaultBulkApiClient extends
AbstractClientBase implements BulkApiC
} catch (JAXBException e) {
throw new SalesforceException(
String.format("Error unmarshaling response {%s:%s} : %s",
- request.getMethod(), request.getRequestURI(),
e.getMessage()),
+ request.getMethod(), request.getURI(),
e.getMessage()),
e);
} catch (IllegalArgumentException e) {
throw new SalesforceException(
String.format("Error unmarshaling response for {%s:%s} :
%s",
- request.getMethod(), request.getRequestURI(),
e.getMessage()),
+ request.getMethod(), request.getURI(),
e.getMessage()),
e);
}
}
- private void marshalRequest(Object input, ContentExchange request, String
contentType) throws SalesforceException {
+ private void marshalRequest(Object input, Request request, String
contentType) throws SalesforceException {
try {
Marshaller marshaller = context.createMarshaller();
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
marshaller.marshal(input, byteStream);
- request.setRequestContent(new
ByteArrayBuffer(byteStream.toByteArray()));
- request.setRequestContentType(contentType);
+
+ request.content(new BytesContentProvider(contentType,
byteStream.toByteArray()));
} catch (JAXBException e) {
throw new SalesforceException(
String.format("Error marshaling request for {%s:%s} : %s",
- request.getMethod(), request.getRequestURI(),
e.getMessage()),
+ request.getMethod(), request.getURI(),
e.getMessage()),
e);
} catch (IllegalArgumentException e) {
throw new SalesforceException(
String.format("Error marshaling request for {%s:%s} : %s",
- request.getMethod(), request.getRequestURI(),
e.getMessage()),
+ request.getMethod(), request.getURI(),
e.getMessage()),
e);
}
}