CAMEL-9925: Updated Salesforce component to use Jetty9 and cometd3

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ec90c0b4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ec90c0b4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ec90c0b4

Branch: refs/heads/master
Commit: ec90c0b4b2c73f6323e1c320327cffa00d3e4b98
Parents: ff713bd
Author: Dhiraj Bokde <dhira...@yahoo.com>
Authored: Thu Apr 28 19:20:18 2016 -0700
Committer: Dhiraj Bokde <dhira...@yahoo.com>
Committed: Thu Apr 28 19:47:05 2016 -0700

----------------------------------------------------------------------
 .../camel-salesforce-component/pom.xml          |  34 ++-
 .../salesforce/SalesforceComponent.java         | 165 ++++++++---
 .../salesforce/SalesforceEndpointConfig.java    |  12 +-
 .../salesforce/SalesforceHttpClient.java        | 111 +++++++
 .../component/salesforce/api/dto/Address.java   |  10 +
 .../salesforce/api/dto/RestResources.java       |  24 ++
 .../salesforce/internal/SalesforceSession.java  | 287 ++++++++-----------
 .../internal/client/AbstractClientBase.java     | 195 +++++++------
 .../client/DefaultAnalyticsApiClient.java       |  95 +++---
 .../internal/client/DefaultBulkApiClient.java   |  93 +++---
 .../internal/client/DefaultRestClient.java      | 100 ++++---
 .../internal/client/SalesforceExchange.java     |  36 ---
 .../internal/client/SalesforceHttpRequest.java  |  38 +++
 .../client/SalesforceSecurityHandler.java       | 262 +++++++++++++++++
 .../client/SalesforceSecurityListener.java      | 192 -------------
 .../internal/client/XStreamUtils.java           |   2 +-
 .../processor/AbstractRestProcessor.java        |   3 +-
 .../processor/AbstractSalesforceProcessor.java  |   4 +-
 .../processor/AnalyticsApiProcessor.java        |   3 +-
 .../internal/processor/JsonRestProcessor.java   |   2 +-
 .../internal/processor/XmlRestProcessor.java    |   8 +-
 .../internal/streaming/SubscriptionHelper.java  |  38 +--
 .../salesforce/AbstractBulkApiTestBase.java     |   4 +-
 .../salesforce/AbstractSalesforceTestBase.java  |   9 +
 .../salesforce/BulkApiIntegrationTest.java      |  24 +-
 .../salesforce/HttpProxyIntegrationTest.java    |  44 ++-
 .../salesforce/RestApiIntegrationTest.java      |  79 +++--
 .../internal/SessionIntegrationTest.java        |  12 +-
 .../camel-salesforce-maven-plugin/pom.xml       |  25 +-
 .../apache/camel/maven/CamelSalesforceMojo.java | 128 +++++++--
 .../maven/HttpProxyMojoIntegrationTest.java     |  43 ++-
 components/camel-salesforce/pom.xml             |   1 +
 parent/pom.xml                                  |   2 +-
 33 files changed, 1263 insertions(+), 822 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/pom.xml 
b/components/camel-salesforce/camel-salesforce-component/pom.xml
index 79267a0..ac12d73 100644
--- a/components/camel-salesforce/camel-salesforce-component/pom.xml
+++ b/components/camel-salesforce/camel-salesforce-component/pom.xml
@@ -31,9 +31,6 @@
   <description>Camel Salesforce support</description>
 
   <properties>
-    <!-- TODO: upgrade to jetty 9 -->
-    <jetty8-version>8.1.17.v20150415</jetty8-version>
-
     <camel.osgi.import.before.defaults>
         org.joda.time.*;version="[1.6,3)"
     </camel.osgi.import.before.defaults>
@@ -53,17 +50,22 @@
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-client</artifactId>
-      <version>${jetty8-version}</version>
+      <version>${jetty9-version}</version>
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-util</artifactId>
-      <version>${jetty8-version}</version>
+      <version>${jetty9-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-util-ajax</artifactId>
+      <version>${jetty9-version}</version>
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-io</artifactId>
-      <version>${jetty8-version}</version>
+      <version>${jetty9-version}</version>
     </dependency>
     <dependency>
       <groupId>org.codehaus.jackson</groupId>
@@ -81,11 +83,11 @@
       <exclusions>
         <exclusion>
           <groupId>org.eclipse.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
+          <artifactId>*</artifactId>
         </exclusion>
         <exclusion>
-          <groupId>org.eclipse.jetty</groupId>
-          <artifactId>jetty-io</artifactId>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
@@ -125,7 +127,19 @@
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-server</artifactId>
-      <version>${jetty8-version}</version>
+      <version>${jetty9-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <version>${jetty9-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-proxy</artifactId>
+      <version>${jetty9-version}</version>
       <scope>test</scope>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index 5f93441..600dcbf 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.salesforce;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -43,10 +44,13 @@ import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.jsse.SSLContextParameters;
-import org.eclipse.jetty.client.Address;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.RedirectListener;
-import org.eclipse.jetty.client.security.ProxyAuthorization;
+import org.eclipse.jetty.client.HttpProxy;
+import org.eclipse.jetty.client.Origin;
+import org.eclipse.jetty.client.ProxyConfiguration;
+import org.eclipse.jetty.client.Socks4Proxy;
+import org.eclipse.jetty.client.api.Authentication;
+import org.eclipse.jetty.client.util.BasicAuthentication;
+import org.eclipse.jetty.client.util.DigestAuthentication;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,7 +63,6 @@ public class SalesforceComponent extends UriEndpointComponent 
implements Endpoin
     private static final Logger LOG = 
LoggerFactory.getLogger(SalesforceComponent.class);
 
     private static final int CONNECTION_TIMEOUT = 60000;
-    private static final long RESPONSE_TIMEOUT = 60000;
     private static final Pattern SOBJECT_NAME_PATTERN = 
Pattern.compile("^.*[\\?&]sObjectName=([^&,]+).*$");
     private static final String APEX_CALL_PREFIX = 
OperationName.APEX_CALL.value() + "/";
 
@@ -75,16 +78,23 @@ public class SalesforceComponent extends 
UriEndpointComponent implements Endpoin
     // Proxy host and port
     private String httpProxyHost;
     private Integer httpProxyPort;
+    private boolean isHttpProxySocks4;
+    private boolean isHttpProxySecure = true;
+    private Set<String> httpProxyIncludedAddresses;
+    private Set<String> httpProxyExcludedAddresses;
 
     // Proxy basic authentication
     private String httpProxyUsername;
     private String httpProxyPassword;
+    private String httpProxyAuthUri;
+    private String httpProxyRealm;
+    private boolean httpProxyUseDigestAuth;
 
     // DTO packages to scan
     private String[] packages;
 
     // component state
-    private HttpClient httpClient;
+    private SalesforceHttpClient httpClient;
     private SalesforceSession session;
     private Map<String, Class<?>> classMap;
 
@@ -179,20 +189,18 @@ public class SalesforceComponent extends 
UriEndpointComponent implements Endpoin
             if (config != null && config.getHttpClient() != null) {
                 httpClient = config.getHttpClient();
             } else {
-                httpClient = new HttpClient();
-                // default settings
-                
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+                // set ssl context parameters if set
+                final SSLContextParameters contextParameters = 
sslContextParameters != null
+                    ? sslContextParameters : new SSLContextParameters();
+                final SslContextFactory sslContextFactory = new 
SslContextFactory();
+                
sslContextFactory.setSslContext(contextParameters.createSSLContext(getCamelContext()));
+
+                httpClient = new SalesforceHttpClient(sslContextFactory);
+                // default settings, use httpClientProperties to set other 
properties
                 httpClient.setConnectTimeout(CONNECTION_TIMEOUT);
-                httpClient.setTimeout(RESPONSE_TIMEOUT);
             }
         }
 
-        // set ssl context parameters
-        final SSLContextParameters contextParameters = sslContextParameters != 
null
-            ? sslContextParameters : new SSLContextParameters();
-        final SslContextFactory sslContextFactory = 
httpClient.getSslContextFactory();
-        
sslContextFactory.setSslContext(contextParameters.createSSLContext(getCamelContext()));
-
         // set HTTP client parameters
         if (httpClientProperties != null && !httpClientProperties.isEmpty()) {
             
IntrospectionSupport.setProperties(getCamelContext().getTypeConverter(),
@@ -201,29 +209,46 @@ public class SalesforceComponent extends 
UriEndpointComponent implements Endpoin
 
         // set HTTP proxy settings
         if (this.httpProxyHost != null && httpProxyPort != null) {
-            httpClient.setProxy(new Address(this.httpProxyHost, 
this.httpProxyPort));
+            Origin.Address proxyAddress = new 
Origin.Address(this.httpProxyHost, this.httpProxyPort);
+            ProxyConfiguration.Proxy proxy; 
+            if (isHttpProxySocks4) {
+                proxy = new Socks4Proxy(proxyAddress, isHttpProxySecure);
+            } else {
+                proxy = new HttpProxy(proxyAddress, isHttpProxySecure);
+            }
+            if (httpProxyIncludedAddresses != null && 
!httpProxyIncludedAddresses.isEmpty()) {
+                
proxy.getIncludedAddresses().addAll(httpProxyIncludedAddresses);
+            }
+            if (httpProxyExcludedAddresses != null && 
!httpProxyExcludedAddresses.isEmpty()) {
+                
proxy.getExcludedAddresses().addAll(httpProxyExcludedAddresses);
+            }
+            httpClient.getProxyConfiguration().getProxies().add(proxy);
         }
         if (this.httpProxyUsername != null && httpProxyPassword != null) {
-            httpClient.setProxyAuthentication(new 
ProxyAuthorization(this.httpProxyUsername, this.httpProxyPassword));
-        }
 
-        // add redirect listener to handle Salesforce redirects
-        // this is ok to do since the RedirectListener is in the same 
classloader as Jetty client
-        String listenerClass = RedirectListener.class.getName();
-        if (httpClient.getRegisteredListeners() == null
-                || 
!httpClient.getRegisteredListeners().contains(listenerClass)) {
-            httpClient.registerListener(listenerClass);
-        }
-        // SalesforceSecurityListener can't be registered the same way
-        // since Jetty HttpClient's Class.forName() can't see it
+            ObjectHelper.notEmpty(httpProxyAuthUri, "httpProxyAuthUri");
+            ObjectHelper.notEmpty(httpProxyRealm, "httpProxyRealm");
 
-        // start the Jetty client to initialize thread pool, etc.
-        httpClient.start();
+            final Authentication authentication;
+            if (httpProxyUseDigestAuth) {
+                authentication = new DigestAuthentication(new 
URI(httpProxyAuthUri),
+                    httpProxyRealm, httpProxyUsername, httpProxyPassword);
+            } else {
+                authentication = new BasicAuthentication(new 
URI(httpProxyAuthUri),
+                    httpProxyRealm, httpProxyUsername, httpProxyPassword);
+            }
+            
httpClient.getAuthenticationStore().addAuthentication(authentication);
+        }
 
         // support restarts
         if (null == this.session) {
-            this.session = new SalesforceSession(httpClient, loginConfig);
+            this.session = new SalesforceSession(httpClient, 
httpClient.getTimeout(), loginConfig);
         }
+        // set session before calling start()
+        httpClient.setSession(this.session);
+
+        // start the Jetty client to initialize thread pool, etc.
+        httpClient.start();
 
         // login at startup if lazyLogin is disabled
         if (!loginConfig.isLazyLogin()) {
@@ -441,6 +466,83 @@ public class SalesforceComponent extends 
UriEndpointComponent implements Endpoin
         this.httpProxyPassword = httpProxyPassword;
     }
 
+    public boolean isHttpProxySocks4() {
+        return isHttpProxySocks4;
+    }
+
+    /**
+     * Enable for Socks4 proxy, false by default
+     */
+    public void setIsHttpProxySocks4(boolean isHttpProxySocks4) {
+        this.isHttpProxySocks4 = isHttpProxySocks4;
+    }
+
+    public boolean isHttpProxySecure() {
+        return isHttpProxySecure;
+    }
+
+    /**
+     * Enable for TLS connections, true by default
+     */
+    public void setIsHttpProxySecure(boolean isHttpProxySecure) {
+        this.isHttpProxySecure = isHttpProxySecure;
+    }
+
+    public Set<String> getHttpProxyIncludedAddresses() {
+        return httpProxyIncludedAddresses;
+    }
+
+    /**
+     * HTTP proxy included addresses
+     */
+    public void setHttpProxyIncludedAddresses(Set<String> 
httpProxyIncludedAddresses) {
+        this.httpProxyIncludedAddresses = httpProxyIncludedAddresses;
+    }
+
+    public Set<String> getHttpProxyExcludedAddresses() {
+        return httpProxyExcludedAddresses;
+    }
+
+    /**
+     * HTTP proxy excluded addresses
+     */
+    public void setHttpProxyExcludedAddresses(Set<String> 
httpProxyExcludedAddresses) {
+        this.httpProxyExcludedAddresses = httpProxyExcludedAddresses;
+    }
+
+    public String getHttpProxyAuthUri() {
+        return httpProxyAuthUri;
+    }
+
+    /**
+     * HTTP proxy authentication URI
+     */
+    public void setHttpProxyAuthUri(String httpProxyAuthUri) {
+        this.httpProxyAuthUri = httpProxyAuthUri;
+    }
+
+    public String getHttpProxyRealm() {
+        return httpProxyRealm;
+    }
+
+    /**
+     * HTTP proxy authentication realm
+     */
+    public void setHttpProxyRealm(String httpProxyRealm) {
+        this.httpProxyRealm = httpProxyRealm;
+    }
+
+    public boolean isHttpProxyUseDigestAuth() {
+        return httpProxyUseDigestAuth;
+    }
+
+    /**
+     * Use HTTP proxy Digest authentication, false by default
+     */
+    public void setHttpProxyUseDigestAuth(boolean httpProxyUseDigestAuth) {
+        this.httpProxyUseDigestAuth = httpProxyUseDigestAuth;
+    }
+
     public String[] getPackages() {
         return packages;
     }
@@ -469,5 +571,4 @@ public class SalesforceComponent extends 
UriEndpointComponent implements Endpoin
     public Map<String, Class<?>> getClassMap() {
         return classMap;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
index e25198e5c..2bb6306 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
@@ -28,7 +28,6 @@ import 
org.apache.camel.component.salesforce.internal.dto.NotifyForFieldsEnum;
 import 
org.apache.camel.component.salesforce.internal.dto.NotifyForOperationsEnum;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
-import org.eclipse.jetty.client.HttpClient;
 
 /**
  * Salesforce Endpoint configuration.
@@ -71,6 +70,9 @@ public class SalesforceEndpointConfig implements Cloneable {
     public static final String REPORT_METADATA = "reportMetadata";
     public static final String INSTANCE_ID = "instanceId";
 
+    // default maximum authentication retries on failed authentication or 
expired session
+    public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4;
+
     // general properties
     @UriParam
     private String apiVersion = DEFAULT_VERSION;
@@ -139,9 +141,9 @@ public class SalesforceEndpointConfig implements Cloneable {
     @UriParam
     private String instanceId;
 
-    // Jetty HttpClient, set using reference
+    // Salesforce Jetty9 HttpClient, set using reference
     @UriParam
-    private HttpClient httpClient;
+    private SalesforceHttpClient httpClient;
 
     public SalesforceEndpointConfig copy() {
         try {
@@ -475,11 +477,11 @@ public class SalesforceEndpointConfig implements 
Cloneable {
     /**
      * Custom Jetty Http Client to use to connect to Salesforce.
      */
-    public void setHttpClient(HttpClient httpClient) {
+    public void setHttpClient(SalesforceHttpClient httpClient) {
         this.httpClient = httpClient;
     }
 
-    public HttpClient getHttpClient() {
+    public SalesforceHttpClient getHttpClient() {
         return httpClient;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
new file mode 100644
index 0000000..6bca3f8
--- /dev/null
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import 
org.apache.camel.component.salesforce.internal.client.SalesforceHttpRequest;
+import 
org.apache.camel.component.salesforce.internal.client.SalesforceSecurityHandler;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpClientTransport;
+import org.eclipse.jetty.client.HttpConversation;
+import org.eclipse.jetty.client.HttpRequest;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+/**
+ * Custom Salesforce HTTP Client that creates {@link SalesforceHttpRequest} 
requests.
+ */
+public class SalesforceHttpClient extends HttpClient {
+
+    // default total request timeout in msecs
+    static final long DEFAULT_TIMEOUT = 60000;
+
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private static final int DEFAULT_MAX_CONTENT_LENGTH = 4 * 1024 * 1024;
+
+    private SalesforceSession session;
+    private int maxRetries = DEFAULT_MAX_RETRIES;
+    private int maxContentLength = DEFAULT_MAX_CONTENT_LENGTH;
+    private long timeout = DEFAULT_TIMEOUT;
+
+    public SalesforceHttpClient() {
+    }
+
+    public SalesforceHttpClient(SslContextFactory sslContextFactory) {
+        super(sslContextFactory);
+    }
+
+    public SalesforceHttpClient(HttpClientTransport transport, 
SslContextFactory sslContextFactory) {
+        super(transport, sslContextFactory);
+    }
+
+    @Override
+    public HttpRequest newHttpRequest(HttpConversation conversation, URI uri) {
+        final SalesforceHttpRequest request = new SalesforceHttpRequest(this, 
conversation, uri);
+        request.timeout(timeout, TimeUnit.MILLISECONDS);
+        return request;
+    }
+
+    @Override
+    public Request copyRequest(HttpRequest oldRequest, URI newURI) {
+        return super.copyRequest(oldRequest, newURI);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (getSession() == null) {
+            throw new IllegalStateException("Missing SalesforceSession in 
property session!");
+        }
+        getProtocolHandlers().add(new SalesforceSecurityHandler(this));
+        super.doStart();
+    }
+
+    public SalesforceSession getSession() {
+        return session;
+    }
+
+    public void setSession(SalesforceSession session) {
+        this.session = session;
+    }
+
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
+    public void setMaxRetries(int maxRetries) {
+        this.maxRetries = maxRetries;
+    }
+
+    public int getMaxContentLength() {
+        return maxContentLength;
+    }
+
+    public void setMaxContentLength(int maxContentLength) {
+        this.maxContentLength = maxContentLength;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
index 2385f94..6096260 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/Address.java
@@ -35,6 +35,8 @@ public class Address extends GeoLocation {
 
     private String street;
 
+    private String geocodeAccuracy;
+
     public String getCity() {
         return city;
     }
@@ -90,4 +92,12 @@ public class Address extends GeoLocation {
     public void setStreet(String street) {
         this.street = street;
     }
+
+    public String getGeocodeAccuracy() {
+        return geocodeAccuracy;
+    }
+
+    public void setGeocodeAccuracy(String geocodeAccuracy) {
+        this.geocodeAccuracy = geocodeAccuracy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
index a07cb6a..46d6944 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/RestResources.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.salesforce.api.dto;
 
 import com.thoughtworks.xstream.annotations.XStreamAlias;
 
+import org.codehaus.jackson.annotate.JsonProperty;
+
 /**
  * DTO for Salesforce Resources.
  */
@@ -46,6 +48,12 @@ public class RestResources extends AbstractDTOBase {
     private String actions;
     private String tabs;
     private String wave;
+    @JsonProperty("async-queries")
+    @XStreamAlias("async-queries")
+    private String asyncQueries;
+    @JsonProperty("exchange-connect")
+    @XStreamAlias("exchange-connect")
+    private String exchangeConnect;
 
     public String getSobjects() {
         return sobjects;
@@ -222,4 +230,20 @@ public class RestResources extends AbstractDTOBase {
     public void setWave(String wave) {
         this.wave = wave;
     }
+
+    public String getAsyncQueries() {
+        return asyncQueries;
+    }
+
+    public void setAsyncQueries(String asyncQueries) {
+        this.asyncQueries = asyncQueries;
+    }
+
+    public String getExchangeConnect() {
+        return exchangeConnect;
+    }
+
+    public void setExchangeConnect(String exchangeConnect) {
+        this.exchangeConnect = exchangeConnect;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
index bf3a395..0fbe8ec 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
@@ -17,12 +17,17 @@
 package org.apache.camel.component.salesforce.internal;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.Service;
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
 import org.apache.camel.component.salesforce.SalesforceLoginConfig;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.api.dto.RestError;
@@ -30,15 +35,13 @@ import 
org.apache.camel.component.salesforce.internal.dto.LoginError;
 import org.apache.camel.component.salesforce.internal.dto.LoginToken;
 import org.apache.camel.util.ObjectHelper;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.client.HttpConversation;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.util.FormContentProvider;
+import org.eclipse.jetty.http.HttpMethod;
 import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.io.Buffer;
-import org.eclipse.jetty.io.ByteArrayBuffer;
-import org.eclipse.jetty.util.StringUtil;
-import org.eclipse.jetty.util.UrlEncoded;
+import org.eclipse.jetty.util.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,9 +51,9 @@ public class SalesforceSession implements Service {
     private static final String OAUTH2_TOKEN_PATH = "/services/oauth2/token";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SalesforceSession.class);
-    private static final String FORM_CONTENT_TYPE = 
"application/x-www-form-urlencoded;charset=utf-8";
 
-    private final HttpClient httpClient;
+    private final SalesforceHttpClient httpClient;
+    private final long timeout;
 
     private final SalesforceLoginConfig config;
 
@@ -60,7 +63,7 @@ public class SalesforceSession implements Service {
     private volatile String accessToken;
     private volatile String instanceUrl;
 
-    public SalesforceSession(HttpClient httpClient, SalesforceLoginConfig 
config) {
+    public SalesforceSession(SalesforceHttpClient httpClient, long timeout, 
SalesforceLoginConfig config) {
         // validate parameters
         ObjectHelper.notNull(httpClient, "httpClient");
         ObjectHelper.notNull(config, "SalesforceLoginConfig");
@@ -71,6 +74,7 @@ public class SalesforceSession implements Service {
         ObjectHelper.notNull(config.getPassword(), "password");
 
         this.httpClient = httpClient;
+        this.timeout = timeout;
         this.config = config;
 
         // strip trailing '/'
@@ -100,144 +104,133 @@ public class SalesforceSession implements Service {
             }
 
             // login to Salesforce and get session id
-            final StatusExceptionExchange loginPost = new 
StatusExceptionExchange(true);
-            String url = config.getLoginUrl() + OAUTH2_TOKEN_PATH;
-            loginPost.setURL(url);
-            loginPost.setMethod(HttpMethods.POST);
-            loginPost.setRequestContentType(FORM_CONTENT_TYPE);
-
-            final UrlEncoded nvps = new UrlEncoded();
-            nvps.put("grant_type", "password");
-            nvps.put("client_id", config.getClientId());
-            nvps.put("client_secret", config.getClientSecret());
-            nvps.put("username", config.getUserName());
-            nvps.put("password", config.getPassword());
-            nvps.put("format", "json");
-
+            final Request loginPost = getLoginRequest(null);
             try {
 
-                LOG.info("Login user {} at Salesforce url: {}", 
config.getUserName(), url);
-
-                // set form content
-                loginPost.setRequestContent(new ByteArrayBuffer(
-                        nvps.encode(StringUtil.__UTF8, 
true).getBytes(StringUtil.__UTF8)));
-                httpClient.send(loginPost);
-
-                // wait for the login to finish
-                final int exchangeState = loginPost.waitForDone();
-
-                switch (exchangeState) {
-                case HttpExchange.STATUS_COMPLETED:
-                    final byte[] responseContent = 
loginPost.getResponseContentBytes();
-                    final int responseStatus = loginPost.getResponseStatus();
-
-                    switch (responseStatus) {
-                    case HttpStatus.OK_200:
-                        // parse the response to get token
-                        LoginToken token = 
objectMapper.readValue(responseContent, LoginToken.class);
-
-                        // don't log token or instance URL for security reasons
-                        LOG.info("Login successful");
-                        accessToken = token.getAccessToken();
-                        instanceUrl = token.getInstanceUrl();
-
-                        // notify all listeners
-                        for (SalesforceSessionListener listener : listeners) {
-                            try {
-                                listener.onLogin(accessToken, instanceUrl);
-                            } catch (Throwable t) {
-                                LOG.warn("Unexpected error from listener {}: 
{}", listener, t.getMessage());
-                            }
-                        }
-
-                        break;
-
-                    case HttpStatus.BAD_REQUEST_400:
-                        // parse the response to get error
-                        final LoginError error = 
objectMapper.readValue(responseContent, LoginError.class);
-                        final String msg = String.format("Login error 
code:[%s] description:[%s]",
-                                error.getError(), error.getErrorDescription());
-                        final List<RestError> errors = new 
ArrayList<RestError>();
-                        errors.add(new RestError(msg, 
error.getErrorDescription()));
-                        throw new SalesforceException(errors, 
HttpStatus.BAD_REQUEST_400);
-
-                    default:
-                        throw new SalesforceException(String.format("Login 
error status:[%s] reason:[%s]",
-                            responseStatus, loginPost.getReason()), 
responseStatus);
-                    }
-                    break;
-
-                case HttpExchange.STATUS_EXCEPTED:
-                    final Throwable ex = loginPost.getException();
-                    throw new SalesforceException(
-                            String.format("Unexpected login exception: %s", 
ex.getMessage()), ex);
+                final ContentResponse loginResponse = loginPost.send();
+                parseLoginResponse(loginResponse, 
loginResponse.getContentAsString());
 
-                case HttpExchange.STATUS_CANCELLED:
-                    throw new SalesforceException("Login request CANCELLED!", 
null);
-
-                case HttpExchange.STATUS_EXPIRED:
-                    throw new SalesforceException("Login request TIMEOUT!", 
null);
-
-                default:
-                    throw new SalesforceException("Unknow status: " + 
exchangeState, null);
-                }
-            } catch (IOException e) {
-                String msg = "Login error: unexpected exception " + 
e.getMessage();
-                throw new SalesforceException(msg, e);
             } catch (InterruptedException e) {
-                String msg = "Login error: unexpected exception " + 
e.getMessage();
-                throw new SalesforceException(msg, e);
+                throw new SalesforceException("Login error: " + 
e.getMessage(), e);
+            } catch (TimeoutException e) {
+                throw new SalesforceException("Login request timeout: " + 
e.getMessage(), e);
+            } catch (ExecutionException e) {
+                throw new SalesforceException("Unexpected login error: " + 
e.getCause().getMessage(), e.getCause());
             }
         }
 
         return accessToken;
     }
 
-    public synchronized void logout() throws SalesforceException {
-        if (accessToken == null) {
-            return;
+    /**
+     * Creates login request, allows SalesforceSecurityHandler to create a 
login request for a failed authentication conversation
+     * @return login POST request.
+     */
+    public Request getLoginRequest(HttpConversation conversation) {
+        final String loginUrl = (instanceUrl == null ? config.getLoginUrl() : 
instanceUrl) + OAUTH2_TOKEN_PATH;
+        LOG.info("Login user {} at Salesforce loginUrl: {}", 
config.getUserName(), loginUrl);
+        final Fields fields = new Fields(true);
+
+        fields.put("grant_type", "password");
+        fields.put("client_id", config.getClientId());
+        fields.put("client_secret", config.getClientSecret());
+        fields.put("username", config.getUserName());
+        fields.put("password", config.getPassword());
+        fields.put("format", "json");
+
+        final Request post;
+        if (conversation == null) {
+            post = httpClient.POST(loginUrl);
+        } else {
+            post = httpClient.newHttpRequest(conversation, 
URI.create(loginUrl))
+                .method(HttpMethod.POST);
         }
 
-        StatusExceptionExchange logoutGet = new StatusExceptionExchange(true);
-        logoutGet.setURL(config.getLoginUrl() + OAUTH2_REVOKE_PATH + 
accessToken);
-        logoutGet.setMethod(HttpMethods.GET);
+        return post.content(new FormContentProvider(fields))
+            .timeout(timeout, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Parses login response, allows SalesforceSecurityHandler to parse a 
login request for a failed authentication conversation.
+     * @param loginResponse
+     * @param responseContent
+     * @throws SalesforceException
+     */
+    public synchronized void parseLoginResponse(ContentResponse loginResponse, 
String responseContent) throws SalesforceException {
+        final int responseStatus = loginResponse.getStatus();
 
         try {
-            httpClient.send(logoutGet);
-            final int done = logoutGet.waitForDone();
-            switch (done) {
-            case HttpExchange.STATUS_COMPLETED:
-                final int statusCode = logoutGet.getResponseStatus();
-                final String reason = logoutGet.getReason();
-
-                if (statusCode == HttpStatus.OK_200) {
-                    LOG.info("Logout successful");
-                } else {
-                    throw new SalesforceException(
-                            String.format("Logout error, code: [%s] reason: 
[%s]",
-                                    statusCode, reason),
-                            statusCode);
+            switch (responseStatus) {
+            case HttpStatus.OK_200:
+                // parse the response to get token
+                LoginToken token = objectMapper.readValue(responseContent, 
LoginToken.class);
+
+                // don't log token or instance URL for security reasons
+                LOG.info("Login successful");
+                accessToken = token.getAccessToken();
+                instanceUrl = token.getInstanceUrl();
+
+                // notify all session listeners
+                for (SalesforceSessionListener listener : listeners) {
+                    try {
+                        listener.onLogin(accessToken, instanceUrl);
+                    } catch (Throwable t) {
+                        LOG.warn("Unexpected error from listener {}: {}", 
listener, t.getMessage());
+                    }
                 }
+
                 break;
 
-            case HttpExchange.STATUS_EXCEPTED:
-                final Throwable ex = logoutGet.getException();
-                throw new SalesforceException("Unexpected logout exception: " 
+ ex.getMessage(), ex);
+            case HttpStatus.BAD_REQUEST_400:
+                // parse the response to get error
+                final LoginError error = 
objectMapper.readValue(responseContent, LoginError.class);
+                final String msg = String.format("Login error code:[%s] 
description:[%s]",
+                    error.getError(), error.getErrorDescription());
+                final List<RestError> errors = new ArrayList<RestError>();
+                errors.add(new RestError(msg, error.getErrorDescription()));
+                throw new SalesforceException(errors, 
HttpStatus.BAD_REQUEST_400);
 
-            case HttpExchange.STATUS_CANCELLED:
-                throw new SalesforceException("Logout request CANCELLED!", 
null);
+            default:
+                throw new SalesforceException(String.format("Login error 
status:[%s] reason:[%s]",
+                    responseStatus, loginResponse.getReason()), 
responseStatus);
+            }
+        } catch (IOException e) {
+            String msg = "Login error: response parse exception " + 
e.getMessage();
+            throw new SalesforceException(msg, e);
+        }
+    }
 
-            case HttpExchange.STATUS_EXPIRED:
-                throw new SalesforceException("Logout request TIMEOUT!", null);
+    public synchronized void logout() throws SalesforceException {
+        if (accessToken == null) {
+            return;
+        }
 
-            default:
-                throw new SalesforceException("Unknown status: " + done, null);
+        try {
+            String logoutUrl = (instanceUrl == null ? config.getLoginUrl() : 
instanceUrl) + OAUTH2_REVOKE_PATH + accessToken;
+            final Request logoutGet = httpClient.newRequest(logoutUrl)
+                .timeout(timeout, TimeUnit.MILLISECONDS);
+            final ContentResponse logoutResponse = logoutGet.send();
+
+            final int statusCode = logoutResponse.getStatus();
+            final String reason = logoutResponse.getReason();
+
+            if (statusCode == HttpStatus.OK_200) {
+                LOG.info("Logout successful");
+            } else {
+                throw new SalesforceException(
+                        String.format("Logout error, code: [%s] reason: [%s]",
+                                statusCode, reason),
+                        statusCode);
             }
-        } catch (SalesforceException e) {
-            throw e;
-        } catch (Exception e) {
+
+        } catch (InterruptedException e) {
             String msg = "Logout error: " + e.getMessage();
             throw new SalesforceException(msg, e);
+        } catch (ExecutionException e) {
+            final Throwable ex = e.getCause();
+            throw new SalesforceException("Unexpected logout exception: " + 
ex.getMessage(), ex);
+        } catch (TimeoutException e) {
+            throw new SalesforceException("Logout request TIMEOUT!", null);
         } finally {
             // reset session
             accessToken = null;
@@ -281,45 +274,8 @@ public class SalesforceSession implements Service {
         logout();
     }
 
-    /**
-     * Records status line, and exception from exchange.
-     */
-    private static class StatusExceptionExchange extends ContentExchange {
-
-        private String reason;
-        private Throwable exception;
-
-        public StatusExceptionExchange(boolean cacheFields) {
-            super(cacheFields);
-        }
-
-        @Override
-        protected synchronized void onResponseStatus(Buffer version, int 
status, Buffer reason) throws IOException {
-            // remember reason
-            this.reason = reason.toString(StringUtil.__ISO_8859_1);
-            super.onResponseStatus(version, status, reason);
-        }
-
-        @Override
-        protected void onConnectionFailed(Throwable x) {
-            this.exception = x;
-            super.onConnectionFailed(x);
-        }
-
-        @Override
-        protected void onException(Throwable x) {
-            this.exception = x;
-            super.onException(x);
-        }
-
-        public String getReason() {
-            return reason;
-        }
-
-        public Throwable getException() {
-            return exception;
-        }
-
+    public long getTimeout() {
+        return timeout;
     }
 
     public interface SalesforceSessionListener {
@@ -327,5 +283,4 @@ public class SalesforceSession implements Service {
 
         void onLogout();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
index b61c161..757293e 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
@@ -16,21 +16,27 @@
  */
 package org.apache.camel.component.salesforce.internal.client;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Service;
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpEventListenerWrapper;
-import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.http.HttpSchemes;
+import org.eclipse.jetty.client.HttpContentResponse;
+import org.eclipse.jetty.client.api.ContentProvider;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.ByteBufferContentProvider;
+import org.eclipse.jetty.client.util.InputStreamContentProvider;
+import org.eclipse.jetty.http.HttpMethod;
 import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.io.Buffer;
-import org.eclipse.jetty.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,15 +47,15 @@ public abstract class AbstractClientBase implements 
SalesforceSession.Salesforce
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
 
-    protected final HttpClient httpClient;
+    protected final SalesforceHttpClient httpClient;
     protected final SalesforceSession session;
     protected final String version;
 
     protected String accessToken;
     protected String instanceUrl;
 
-    public AbstractClientBase(String version,
-                              SalesforceSession session, HttpClient 
httpClient) throws SalesforceException {
+    public AbstractClientBase(String version, SalesforceSession session,
+                              SalesforceHttpClient httpClient) throws 
SalesforceException {
 
         this.version = version;
         this.session = session;
@@ -89,111 +95,102 @@ public abstract class AbstractClientBase implements 
SalesforceSession.Salesforce
         // SalesforceSecurityListener will auto login!
     }
 
-    protected SalesforceExchange getContentExchange(String method, String url) 
{
-        SalesforceExchange get = new SalesforceExchange();
-        get.setMethod(method);
-        get.setURL(url);
-        get.setClient(this);
-        return get;
+    protected Request getRequest(HttpMethod method, String url) {
+        return getRequest(method.asString(), url);
+    }
+
+    protected Request getRequest(String method, String url) {
+        SalesforceHttpRequest request = (SalesforceHttpRequest) 
httpClient.newRequest(url)
+            .method(method)
+            .timeout(session.getTimeout(), TimeUnit.MILLISECONDS);
+        
request.getConversation().setAttribute(SalesforceSecurityHandler.CLIENT_ATTRIBUTE,
 this);
+        return request;
     }
 
     protected interface ClientResponseCallback {
         void onResponse(InputStream response, SalesforceException ex);
     }
 
-    protected void doHttpRequest(final ContentExchange request, final 
ClientResponseCallback callback) {
+    protected void doHttpRequest(final Request request, final 
ClientResponseCallback callback) {
+        // Highly memory inefficient,
+        // but buffer the request content to allow it to be replayed for 
authentication retries
+        final ContentProvider content = request.getContent();
+        if (content instanceof InputStreamContentProvider) {
+            final List<ByteBuffer> buffers = new ArrayList<>();
+            for (ByteBuffer buffer : content) {
+                buffers.add(buffer);
+            }
+            request.content(new ByteBufferContentProvider(buffers.toArray(new 
ByteBuffer[buffers.size()])));
+            buffers.clear();
+        }
 
-        // use SalesforceSecurityListener for security login retries
-        final SalesforceSecurityListener securityListener;
-        try {
-            final boolean isHttps = 
HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme()));
-            securityListener = new SalesforceSecurityListener(
-                    httpClient.getDestination(request.getAddress(), isHttps),
-                    request, session, accessToken) {
+        // execute the request
+        request.send(new 
BufferingResponseListener(httpClient.getMaxContentLength()) {
+            @Override
+            public void onComplete(Result result) {
+                Response response = result.getResponse();
+                if (result.isFailed()) {
+
+                    // Failure!!!
+                    // including Salesforce errors reported as exception from 
SalesforceSecurityHandler
+                    Throwable failure = result.getFailure();
+                    if (failure instanceof SalesforceException) {
+                        callback.onResponse(null, (SalesforceException) 
failure);
+                    } else {
+                        final String msg = String.format("Unexpected error 
{%s:%s} executing {%s:%s}",
+                            response.getStatus(), response.getReason(), 
request.getMethod(), request.getURI());
+                        callback.onResponse(null, new SalesforceException(msg, 
response.getStatus(), failure));
+                    }
+                } else {
 
-                private String reason;
+                    // HTTP error status
+                    final int status = response.getStatus();
+                    SalesforceHttpRequest request = (SalesforceHttpRequest) 
((SalesforceHttpRequest) result.getRequest())
+                        .getConversation()
+                        
.getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE);
 
-                @Override
-                public void onResponseStatus(Buffer version, int status, 
Buffer reason) throws IOException {
-                    super.onResponseStatus(version, status, reason);
-                    // remember status reason
-                    this.reason = reason.toString(StringUtil.__ISO_8859_1);
-                }
+                    if (status == HttpStatus.BAD_REQUEST_400 && request != 
null) {
+                        // parse login error
+                        ContentResponse contentResponse = new 
HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
+                        try {
 
-                @Override
-                protected SalesforceException createExceptionResponse() {
-                    final int responseStatus = request.getResponseStatus();
-                    if (responseStatus < HttpStatus.OK_200 || responseStatus 
>= HttpStatus.MULTIPLE_CHOICES_300) {
-                        final String msg = String.format("Error {%s:%s} 
executing {%s:%s}",
-                                responseStatus, reason, request.getMethod(), 
request.getRequestURI());
-                        return new SalesforceException(msg, responseStatus, 
createRestException(request, reason));
-                    } else {
-                        return super.createExceptionResponse();
-                    }
-                }
-            };
-        } catch (IOException e) {
-            // propagate exception
-            callback.onResponse(null, new SalesforceException(
-                    String.format("Error registering security listener: %s", 
e.getMessage()),
-                    e));
-            return;
-        }
+                            session.parseLoginResponse(contentResponse, 
getContentAsString());
+                            final String msg = String.format("Unexpected Error 
{%s:%s} executing {%s:%s}",
+                                status, response.getReason(), 
request.getMethod(), request.getURI());
+                            callback.onResponse(null, new 
SalesforceException(msg, null));
 
-        // use HttpEventListener for lifecycle events
-        request.setEventListener(new 
HttpEventListenerWrapper(request.getEventListener(), true) {
+                        } catch (SalesforceException e) {
 
-            @Override
-            public void onConnectionFailed(Throwable ex) {
-                super.onConnectionFailed(ex);
-                callback.onResponse(null,
-                        new SalesforceException("Connection error: " + 
ex.getMessage(), ex));
-            }
+                            final String msg = String.format("Error {%s:%s} 
executing {%s:%s}",
+                                status, response.getReason(), 
request.getMethod(), request.getURI());
+                            callback.onResponse(null, new 
SalesforceException(msg, response.getStatus(), e));
 
-            @Override
-            public void onException(Throwable ex) {
-                super.onException(ex);
-                callback.onResponse(null,
-                        new SalesforceException("Unexpected exception: " + 
ex.getMessage(), ex));
-            }
+                        }
+                    } else if (status < HttpStatus.OK_200 || status >= 
HttpStatus.MULTIPLE_CHOICES_300) {
 
-            @Override
-            public void onExpire() {
-                super.onExpire();
-                callback.onResponse(null,
-                        new SalesforceException("Request expired", null));
-            }
+                        // Salesforce HTTP failure!
+                        request = (SalesforceHttpRequest) result.getRequest();
+                        final String msg = String.format("Error {%s:%s} 
executing {%s:%s}",
+                            status, response.getReason(), request.getMethod(), 
request.getURI());
+                        final SalesforceException cause = 
createRestException(response, getContentAsInputStream());
+                        callback.onResponse(null, new SalesforceException(msg, 
response.getStatus(), cause));
 
-            @Override
-            public void onResponseComplete() throws IOException {
-                super.onResponseComplete();
+                    } else {
 
-                SalesforceException e = 
securityListener.getExceptionResponse();
-                if (e != null) {
-                    callback.onResponse(null, e);
-                } else {
-                    // TODO not memory efficient for large response messages,
-                    // doesn't seem to be possible in Jetty 7 to directly 
stream to response parsers
-                    final byte[] bytes = request.getResponseContentBytes();
-                    callback.onResponse(bytes != null ? new 
ByteArrayInputStream(bytes) : null, null);
+                        // Success!!!
+                        callback.onResponse(getContentAsInputStream(), null);
+                    }
                 }
+            }
 
+            @Override
+            public InputStream getContentAsInputStream() {
+                if (getContent().length == 0) {
+                    return null;
+                }
+                return super.getContentAsInputStream();
             }
         });
-
-        // wrap the above lifecycle event listener with 
SalesforceSecurityListener
-        securityListener.setEventListener(request.getEventListener());
-        request.setEventListener(securityListener);
-
-        // execute the request
-        try {
-            httpClient.send(request);
-        } catch (IOException e) {
-            String msg = "Unexpected Error: " + e.getMessage();
-            // send error through callback
-            callback.onResponse(null, new SalesforceException(msg, e));
-        }
-
     }
 
     public void setAccessToken(String accessToken) {
@@ -204,8 +201,8 @@ public abstract class AbstractClientBase implements 
SalesforceSession.Salesforce
         this.instanceUrl = instanceUrl;
     }
 
-    protected abstract void setAccessToken(HttpExchange httpExchange);
+    protected abstract void setAccessToken(Request request);
 
-    protected abstract SalesforceException createRestException(ContentExchange 
httpExchange, String reason);
+    protected abstract SalesforceException createRestException(Response 
response, InputStream responseContent);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
index 1041f7f..f6e72dc 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultAnalyticsApiClient.java
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.api.dto.RestError;
 import 
org.apache.camel.component.salesforce.api.dto.analytics.reports.AsyncReportResults;
@@ -33,12 +34,11 @@ import 
org.apache.camel.component.salesforce.api.dto.analytics.reports.SyncRepor
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.http.HttpHeaders;
-import org.eclipse.jetty.http.HttpMethods;
-import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
 import org.eclipse.jetty.util.StringUtil;
 
 /**
@@ -51,7 +51,8 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
     private ObjectMapper objectMapper;
 
 
-    public DefaultAnalyticsApiClient(String version, SalesforceSession 
session, HttpClient httpClient) throws SalesforceException {
+    public DefaultAnalyticsApiClient(String version, SalesforceSession session,
+                                     SalesforceHttpClient httpClient) throws 
SalesforceException {
         super(version, session, httpClient);
 
         objectMapper = new ObjectMapper();
@@ -60,16 +61,16 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
     @Override
     public void getRecentReports(final RecentReportsResponseCallback callback) 
{
 
-        final ContentExchange contentExchange = 
getContentExchange(HttpMethods.GET, reportsUrl());
+        final Request Request = getRequest(HttpMethod.GET, reportsUrl());
 
-        doHttpRequest(contentExchange, new ClientResponseCallback() {
+        doHttpRequest(Request, new ClientResponseCallback() {
             @Override
             @SuppressWarnings("unchecked")
             public void onResponse(InputStream response, SalesforceException 
ex) {
                 List<RecentReport> recentReports = null;
                 if (response != null) {
                     try {
-                        recentReports = unmarshalResponse(response, 
contentExchange,
+                        recentReports = unmarshalResponse(response, Request,
                             new TypeReference<List<RecentReport>>() {
                             }
                         );
@@ -85,14 +86,14 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
     @Override
     public void getReportDescription(String reportId, final 
ReportDescriptionResponseCallback callback) {
 
-        final ContentExchange contentExchange = 
getContentExchange(HttpMethods.GET, reportsDescribeUrl(reportId));
+        final Request Request = getRequest(HttpMethod.GET, 
reportsDescribeUrl(reportId));
 
-        doHttpRequest(contentExchange, new ClientResponseCallback() {
+        doHttpRequest(Request, new ClientResponseCallback() {
             @Override
             public void onResponse(InputStream response, SalesforceException 
ex) {
                 ReportDescription reportDescription = null;
                 try {
-                    reportDescription = unmarshalResponse(response, 
contentExchange, ReportDescription.class);
+                    reportDescription = unmarshalResponse(response, Request, 
ReportDescription.class);
                 } catch (SalesforceException e) {
                     ex = e;
                 }
@@ -106,8 +107,8 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
                                   final ReportResultsResponseCallback 
callback) {
 
         final boolean useGet = reportMetadata == null;
-        final ContentExchange contentExchange = getContentExchange(
-            useGet ? HttpMethods.GET : HttpMethods.POST, reportsUrl(reportId, 
includeDetails));
+        final Request Request = getRequest(
+            useGet ? HttpMethod.GET : HttpMethod.POST, reportsUrl(reportId, 
includeDetails));
 
         // set POST data
         if (!useGet) {
@@ -115,19 +116,19 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
                 // wrap reportMetadata in a map
                 final HashMap<String, Object> request = new HashMap<String, 
Object>();
                 request.put("reportMetadata", reportMetadata);
-                marshalRequest(request, contentExchange);
+                marshalRequest(request, Request);
             } catch (SalesforceException e) {
                 callback.onResponse(null, e);
                 return;
             }
         }
 
-        doHttpRequest(contentExchange, new ClientResponseCallback() {
+        doHttpRequest(Request, new ClientResponseCallback() {
             @Override
             public void onResponse(InputStream response, SalesforceException 
ex) {
                 SyncReportResults reportResults = null;
                 try {
-                    reportResults = unmarshalResponse(response, 
contentExchange, SyncReportResults.class);
+                    reportResults = unmarshalResponse(response, Request, 
SyncReportResults.class);
                 } catch (SalesforceException e) {
                     ex = e;
                 }
@@ -140,7 +141,7 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
     public void executeAsyncReport(String reportId, Boolean includeDetails, 
ReportMetadata reportMetadata,
                                    final ReportInstanceResponseCallback 
callback) {
 
-        final ContentExchange contentExchange = 
getContentExchange(HttpMethods.POST,
+        final Request Request = getRequest(HttpMethod.POST,
             reportInstancesUrl(reportId, includeDetails));
 
         // set POST data
@@ -149,19 +150,19 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
                 // wrap reportMetadata in a map
                 final HashMap<String, Object> request = new HashMap<String, 
Object>();
                 request.put("reportMetadata", reportMetadata);
-                marshalRequest(request, contentExchange);
+                marshalRequest(request, Request);
             } catch (SalesforceException e) {
                 callback.onResponse(null, e);
                 return;
             }
         }
 
-        doHttpRequest(contentExchange, new ClientResponseCallback() {
+        doHttpRequest(Request, new ClientResponseCallback() {
             @Override
             public void onResponse(InputStream response, SalesforceException 
ex) {
                 ReportInstance reportInstance = null;
                 try {
-                    reportInstance = unmarshalResponse(response, 
contentExchange, ReportInstance.class);
+                    reportInstance = unmarshalResponse(response, Request, 
ReportInstance.class);
                 } catch (SalesforceException e) {
                     ex = e;
                 }
@@ -173,16 +174,16 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
     @Override
     public void getReportInstances(String reportId, final 
ReportInstanceListResponseCallback callback) {
 
-        final ContentExchange contentExchange = 
getContentExchange(HttpMethods.GET, reportInstancesUrl(reportId));
+        final Request Request = getRequest(HttpMethod.GET, 
reportInstancesUrl(reportId));
 
-        doHttpRequest(contentExchange, new ClientResponseCallback() {
+        doHttpRequest(Request, new ClientResponseCallback() {
             @Override
             @SuppressWarnings("unchecked")
             public void onResponse(InputStream response, SalesforceException 
ex) {
                 List<ReportInstance> reportInstances = null;
                 if (response != null) {
                     try {
-                        reportInstances = unmarshalResponse(response, 
contentExchange,
+                        reportInstances = unmarshalResponse(response, Request,
                             new TypeReference<List<ReportInstance>>() {
                             }
                         );
@@ -198,15 +199,15 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
     @Override
     public void getReportResults(String reportId, String instanceId, final 
ReportResultsResponseCallback callback) {
 
-        final ContentExchange contentExchange = 
getContentExchange(HttpMethods.GET,
+        final Request Request = getRequest(HttpMethod.GET,
             reportInstancesUrl(reportId, instanceId));
 
-        doHttpRequest(contentExchange, new ClientResponseCallback() {
+        doHttpRequest(Request, new ClientResponseCallback() {
             @Override
             public void onResponse(InputStream response, SalesforceException 
ex) {
                 AsyncReportResults reportResults = null;
                 try {
-                    reportResults = unmarshalResponse(response, 
contentExchange, AsyncReportResults.class);
+                    reportResults = unmarshalResponse(response, Request, 
AsyncReportResults.class);
                 } catch (SalesforceException e) {
                     ex = e;
                 }
@@ -247,16 +248,15 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
     }
 
     @Override
-    protected void setAccessToken(HttpExchange httpExchange) {
-        httpExchange.setRequestHeader(HttpHeaders.AUTHORIZATION, TOKEN_PREFIX 
+ accessToken);
+    protected void setAccessToken(Request request) {
+        // replace old token
+        request.getHeaders().put(HttpHeader.AUTHORIZATION, TOKEN_PREFIX + 
accessToken);
     }
 
     @Override
-    protected SalesforceException createRestException(ContentExchange 
httpExchange, String reason) {
-        final int statusCode = httpExchange.getResponseStatus();
-        String responseContent = null;
+    protected SalesforceException createRestException(Response response, 
InputStream responseContent) {
+        final int statusCode = response.getStatus();
         try {
-            responseContent = httpExchange.getResponseContent();
             if (responseContent != null) {
                 // unmarshal RestError
                 final List<RestError> errors = 
objectMapper.readValue(responseContent,
@@ -277,36 +277,37 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
         }
 
         // just report HTTP status info
-        return new SalesforceException("Unexpected error: " + reason + ", with 
content: " + responseContent,
-            statusCode);
+        String message = String.format("Unexpected error: %s, with content: 
%s",
+            response.getReason(), responseContent);
+        return new SalesforceException(message, statusCode);
     }
 
     @Override
-    protected void doHttpRequest(ContentExchange request, 
ClientResponseCallback callback) {
+    protected void doHttpRequest(Request request, ClientResponseCallback 
callback) {
 
         // set access token for all requests
         setAccessToken(request);
 
         // set request and response content type and charset, which is always 
JSON for analytics API
-        request.setRequestHeader(HttpHeaders.CONTENT_TYPE, 
APPLICATION_JSON_UTF8);
-        request.setRequestHeader(HttpHeaders.ACCEPT, APPLICATION_JSON_UTF8);
-        request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, 
StringUtil.__UTF8);
+        request.header(HttpHeader.CONTENT_TYPE, APPLICATION_JSON_UTF8);
+        request.header(HttpHeader.ACCEPT, APPLICATION_JSON_UTF8);
+        request.header(HttpHeader.ACCEPT_CHARSET, StringUtil.__UTF8);
 
         super.doHttpRequest(request, callback);
     }
 
-    private void marshalRequest(Object input, ContentExchange request) throws 
SalesforceException {
+    private void marshalRequest(Object input, Request request) throws 
SalesforceException {
         try {
-            request.setRequestContent(new 
ByteArrayBuffer(objectMapper.writeValueAsBytes(input)));
+            request.content(new 
BytesContentProvider(objectMapper.writeValueAsBytes(input)));
         } catch (IOException e) {
             throw new SalesforceException(
                 String.format("Error marshaling request for {%s:%s} : %s",
-                    request.getMethod(), request.getRequestURI(), 
e.getMessage()),
+                    request.getMethod(), request.getURI(), e.getMessage()),
                 e);
         }
     }
 
-    private <T> T unmarshalResponse(InputStream response, ContentExchange 
request,
+    private <T> T unmarshalResponse(InputStream response, Request request,
                                       TypeReference<T> responseTypeReference)
         throws SalesforceException {
 
@@ -315,12 +316,12 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
         } catch (IOException e) {
             throw new SalesforceException(
                 String.format("Error unmarshaling response {%s:%s} : %s",
-                    request.getMethod(), request.getRequestURI(), 
e.getMessage()),
+                    request.getMethod(), request.getURI(), e.getMessage()),
                 e);
         }
     }
 
-    private <T> T unmarshalResponse(InputStream response, ContentExchange 
request, Class<T> responseClass)
+    private <T> T unmarshalResponse(InputStream response, Request request, 
Class<T> responseClass)
         throws SalesforceException {
 
         if (response == null) {
@@ -332,7 +333,7 @@ public class DefaultAnalyticsApiClient extends 
AbstractClientBase implements Ana
         } catch (IOException e) {
             throw new SalesforceException(
                 String.format("Error unmarshaling response {%s:%s} : %s",
-                    request.getMethod(), request.getRequestURI(), 
e.getMessage()),
+                    request.getMethod(), request.getURI(), e.getMessage()),
                 e);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/ec90c0b4/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
index 3ab4227..1f024db 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
@@ -16,9 +16,9 @@
  */
 package org.apache.camel.component.salesforce.internal.client;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.Collections;
 import javax.xml.bind.JAXBContext;
@@ -28,6 +28,7 @@ import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import javax.xml.transform.stream.StreamSource;
 
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.api.dto.RestError;
 import org.apache.camel.component.salesforce.api.dto.bulk.BatchInfo;
@@ -39,12 +40,12 @@ import 
org.apache.camel.component.salesforce.api.dto.bulk.JobStateEnum;
 import org.apache.camel.component.salesforce.api.dto.bulk.ObjectFactory;
 import org.apache.camel.component.salesforce.api.dto.bulk.QueryResultList;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.http.HttpHeaders;
-import org.eclipse.jetty.http.HttpMethods;
-import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.client.util.InputStreamContentProvider;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
 import org.eclipse.jetty.util.StringUtil;
 
 public class DefaultBulkApiClient extends AbstractClientBase implements 
BulkApiClient {
@@ -55,7 +56,7 @@ public class DefaultBulkApiClient extends AbstractClientBase 
implements BulkApiC
     private JAXBContext context;
     private ObjectFactory objectFactory;
 
-    public DefaultBulkApiClient(String version, SalesforceSession session, 
HttpClient httpClient)
+    public DefaultBulkApiClient(String version, SalesforceSession session, 
SalesforceHttpClient httpClient)
         throws SalesforceException {
         super(version, session, httpClient);
 
@@ -74,7 +75,7 @@ public class DefaultBulkApiClient extends AbstractClientBase 
implements BulkApiC
         // clear system fields if set
         sanitizeJobRequest(request);
 
-        final ContentExchange post = getContentExchange(HttpMethods.POST, 
jobUrl(null));
+        final Request post = getRequest(HttpMethod.POST, jobUrl(null));
         try {
             marshalRequest(objectFactory.createJobInfo(request), post, 
APPLICATION_XML_UTF8);
         } catch (SalesforceException e) {
@@ -123,7 +124,7 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
 
     @Override
     public void getJob(String jobId, final JobInfoResponseCallback callback) {
-        final ContentExchange get = getContentExchange(HttpMethods.GET, 
jobUrl(jobId));
+        final Request get = getRequest(HttpMethod.GET, jobUrl(jobId));
 
         // make the call and parse the result
         doHttpRequest(get, new ClientResponseCallback() {
@@ -145,7 +146,7 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
         final JobInfo request = new JobInfo();
         request.setState(JobStateEnum.CLOSED);
 
-        final ContentExchange post = getContentExchange(HttpMethods.POST, 
jobUrl(jobId));
+        final Request post = getRequest(HttpMethod.POST, jobUrl(jobId));
         try {
             marshalRequest(objectFactory.createJobInfo(request), post, 
APPLICATION_XML_UTF8);
         } catch (SalesforceException e) {
@@ -173,7 +174,7 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
         final JobInfo request = new JobInfo();
         request.setState(JobStateEnum.ABORTED);
 
-        final ContentExchange post = getContentExchange(HttpMethods.POST, 
jobUrl(jobId));
+        final Request post = getRequest(HttpMethod.POST, jobUrl(jobId));
         try {
             marshalRequest(objectFactory.createJobInfo(request), post, 
APPLICATION_XML_UTF8);
         } catch (SalesforceException e) {
@@ -199,9 +200,9 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
     @Override
     public void createBatch(InputStream batchStream, String jobId, ContentType 
contentTypeEnum, 
         final BatchInfoResponseCallback callback) {
-        final ContentExchange post = getContentExchange(HttpMethods.POST, 
batchUrl(jobId, null));
-        post.setRequestContentSource(batchStream);
-        post.setRequestContentType(getContentType(contentTypeEnum) + 
";charset=" + StringUtil.__UTF8);
+        final Request post = getRequest(HttpMethod.POST, batchUrl(jobId, 
null));
+        post.content(new InputStreamContentProvider(batchStream));
+        post.header(HttpHeader.CONTENT_TYPE, getContentType(contentTypeEnum) + 
";charset=" + StringUtil.__UTF8);
 
         // make the call and parse the result
         doHttpRequest(post, new ClientResponseCallback() {
@@ -220,7 +221,7 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
 
     @Override
     public void getBatch(String jobId, String batchId, final 
BatchInfoResponseCallback callback) {
-        final ContentExchange get = getContentExchange(HttpMethods.GET, 
batchUrl(jobId, batchId));
+        final Request get = getRequest(HttpMethod.GET, batchUrl(jobId, 
batchId));
 
         // make the call and parse the result
         doHttpRequest(get, new ClientResponseCallback() {
@@ -239,7 +240,7 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
 
     @Override
     public void getAllBatches(String jobId, final 
BatchInfoListResponseCallback callback) {
-        final ContentExchange get = getContentExchange(HttpMethods.GET, 
batchUrl(jobId, null));
+        final Request get = getRequest(HttpMethod.GET, batchUrl(jobId, null));
 
         // make the call and parse the result
         doHttpRequest(get, new ClientResponseCallback() {
@@ -258,7 +259,7 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
 
     @Override
     public void getRequest(String jobId, String batchId, final 
StreamResponseCallback callback) {
-        final ContentExchange get = getContentExchange(HttpMethods.GET, 
batchUrl(jobId, batchId));
+        final Request get = getRequest(HttpMethod.GET, batchUrl(jobId, 
batchId));
 
         // make the call and parse the result
         doHttpRequest(get, new ClientResponseCallback() {
@@ -271,7 +272,7 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
 
     @Override
     public void getResults(String jobId, String batchId, final 
StreamResponseCallback callback) {
-        final ContentExchange get = getContentExchange(HttpMethods.GET, 
batchResultUrl(jobId, batchId, null));
+        final Request get = getRequest(HttpMethod.GET, batchResultUrl(jobId, 
batchId, null));
 
         // make the call and return the result
         doHttpRequest(get, new ClientResponseCallback() {
@@ -285,10 +286,16 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
     @Override
     public void createBatchQuery(String jobId, String soqlQuery, ContentType 
jobContentType,
         final BatchInfoResponseCallback callback) {
-        final ContentExchange post = getContentExchange(HttpMethods.POST, 
batchUrl(jobId, null));
-        byte[] queryBytes = soqlQuery.getBytes(StringUtil.__UTF8_CHARSET);
-        post.setRequestContent(new ByteArrayBuffer(queryBytes));
-        post.setRequestContentType(getContentType(jobContentType) + 
";charset=" + StringUtil.__UTF8);
+        final Request post = getRequest(HttpMethod.POST, batchUrl(jobId, 
null));
+        final byte[] queryBytes;
+        try {
+            queryBytes = soqlQuery.getBytes(StringUtil.__UTF8);
+        } catch (UnsupportedEncodingException e) {
+            callback.onResponse(null, new SalesforceException("Unexpected 
exception: " + e.getMessage(), e));
+            return;
+        }
+        post.content(new BytesContentProvider(queryBytes));
+        post.header(HttpHeader.CONTENT_TYPE, getContentType(jobContentType) + 
";charset=" + StringUtil.__UTF8);
 
         // make the call and parse the result
         doHttpRequest(post, new ClientResponseCallback() {
@@ -307,7 +314,7 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
 
     @Override
     public void getQueryResultIds(String jobId, String batchId, final 
QueryResultIdsCallback callback) {
-        final ContentExchange get = getContentExchange(HttpMethods.GET, 
batchResultUrl(jobId, batchId, null));
+        final Request get = getRequest(HttpMethod.GET, batchResultUrl(jobId, 
batchId, null));
 
         // make the call and parse the result
         doHttpRequest(get, new ClientResponseCallback() {
@@ -326,7 +333,7 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
 
     @Override
     public void getQueryResult(String jobId, String batchId, String resultId, 
final StreamResponseCallback callback) {
-        final ContentExchange get = getContentExchange(HttpMethods.GET, 
batchResultUrl(jobId, batchId, resultId));
+        final Request get = getRequest(HttpMethod.GET, batchResultUrl(jobId, 
batchId, resultId));
 
         // make the call and parse the result
         doHttpRequest(get, new ClientResponseCallback() {
@@ -338,23 +345,24 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
     }
 
     @Override
-    protected void setAccessToken(HttpExchange httpExchange) {
-        httpExchange.setRequestHeader(TOKEN_HEADER, accessToken);
+    protected void setAccessToken(Request request) {
+        // replace old token
+        request.getHeaders().put(TOKEN_HEADER, accessToken);
     }
 
     @Override
-    protected void doHttpRequest(ContentExchange request, 
ClientResponseCallback callback) {
+    protected void doHttpRequest(Request request, ClientResponseCallback 
callback) {
         // set access token for all requests
         setAccessToken(request);
 
         // set default charset
-        request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, 
StringUtil.__UTF8);
+        request.header(HttpHeader.ACCEPT_CHARSET, StringUtil.__UTF8);
 
         // TODO check if this is really needed or not, since SF response 
content type seems fixed
         // check if the default accept content type must be used
-        if (!request.getRequestFields().containsKey(HttpHeaders.ACCEPT)) {
+        if (!request.getHeaders().contains(HttpHeader.ACCEPT)) {
             final String contentType = getContentType(DEFAULT_ACCEPT_TYPE);
-            request.setRequestHeader(HttpHeaders.ACCEPT, contentType);
+            request.header(HttpHeader.ACCEPT, contentType);
             // request content type and charset is set by the request entity
         }
 
@@ -386,24 +394,23 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
     }
 
     @Override
-    protected SalesforceException createRestException(ContentExchange request, 
String reason) {
+    protected SalesforceException createRestException(Response response, 
InputStream responseContent) {
         // this must be of type Error
         try {
-            final Error error = unmarshalResponse(new 
ByteArrayInputStream(request.getResponseContentBytes()),
-                    request, Error.class);
+            final Error error = unmarshalResponse(responseContent, 
response.getRequest(), Error.class);
 
             final RestError restError = new RestError();
             restError.setErrorCode(error.getExceptionCode());
             restError.setMessage(error.getExceptionMessage());
 
-            return new SalesforceException(Arrays.asList(restError), 
request.getResponseStatus());
+            return new SalesforceException(Arrays.asList(restError), 
response.getStatus());
         } catch (SalesforceException e) {
             String msg = "Error un-marshaling Salesforce Error: " + 
e.getMessage();
             return new SalesforceException(msg, e);
         }
     }
 
-    private <T> T unmarshalResponse(InputStream response, ContentExchange 
request, Class<T> resultClass)
+    private <T> T unmarshalResponse(InputStream response, Request request, 
Class<T> resultClass)
         throws SalesforceException {
         try {
             Unmarshaller unmarshaller = context.createUnmarshaller();
@@ -412,32 +419,32 @@ public class DefaultBulkApiClient extends 
AbstractClientBase implements BulkApiC
         } catch (JAXBException e) {
             throw new SalesforceException(
                     String.format("Error unmarshaling response {%s:%s} : %s",
-                            request.getMethod(), request.getRequestURI(), 
e.getMessage()),
+                            request.getMethod(), request.getURI(), 
e.getMessage()),
                     e);
         } catch (IllegalArgumentException e) {
             throw new SalesforceException(
                     String.format("Error unmarshaling response for {%s:%s} : 
%s",
-                            request.getMethod(), request.getRequestURI(), 
e.getMessage()),
+                            request.getMethod(), request.getURI(), 
e.getMessage()),
                     e);
         }
     }
 
-    private void marshalRequest(Object input, ContentExchange request, String 
contentType) throws SalesforceException {
+    private void marshalRequest(Object input, Request request, String 
contentType) throws SalesforceException {
         try {
             Marshaller marshaller = context.createMarshaller();
             ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
             marshaller.marshal(input, byteStream);
-            request.setRequestContent(new 
ByteArrayBuffer(byteStream.toByteArray()));
-            request.setRequestContentType(contentType);
+
+            request.content(new BytesContentProvider(contentType, 
byteStream.toByteArray()));
         } catch (JAXBException e) {
             throw new SalesforceException(
                     String.format("Error marshaling request for {%s:%s} : %s",
-                            request.getMethod(), request.getRequestURI(), 
e.getMessage()),
+                            request.getMethod(), request.getURI(), 
e.getMessage()),
                     e);
         } catch (IllegalArgumentException e) {
             throw new SalesforceException(
                     String.format("Error marshaling request for {%s:%s} : %s",
-                            request.getMethod(), request.getRequestURI(), 
e.getMessage()),
+                            request.getMethod(), request.getURI(), 
e.getMessage()),
                     e);
         }
     }

Reply via email to