Support PushTopic replay extension See Salesforce API documentation here: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/replay_pushtopic_events.htm
Signed-off-by: Sune Keller <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/68254a0b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/68254a0b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/68254a0b Branch: refs/heads/master Commit: 68254a0bc8c004d598dbecaaae3b7401b72613be Parents: 615bc23 Author: Sune Keller <[email protected]> Authored: Wed Jun 15 10:26:54 2016 +0200 Committer: Claus Ibsen <[email protected]> Committed: Tue Jun 21 08:40:03 2016 +0200 ---------------------------------------------------------------------- .../src/main/docs/salesforce.adoc | 8 +- .../salesforce/SalesforceComponent.java | 4 +- .../salesforce/SalesforceConsumer.java | 5 ++ .../salesforce/SalesforceEndpoint.java | 2 +- .../salesforce/SalesforceEndpointConfig.java | 39 ++++++++- .../salesforce/api/dto/SObjectUrls.java | 9 ++ .../streaming/CometDReplayExtension.java | 88 ++++++++++++++++++++ .../internal/streaming/SubscriptionHelper.java | 42 +++++++++- 8 files changed, 184 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc index 614b537..3f5c9cd 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc +++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc @@ -230,10 +230,8 @@ The Salesforce component supports 16 options which are listed below. - - // endpoint options: START -The Salesforce component supports 37 endpoint options which are listed below: +The Salesforce component supports 39 endpoint options which are listed below: {% raw %} [width="100%",cols="2s,1,1m,1m,5",options="header"] @@ -247,9 +245,11 @@ The Salesforce component supports 37 endpoint options which are listed below: | apiVersion | common | | String | Salesforce API version defaults to SalesforceEndpointConfig.DEFAULT_VERSION | batchId | common | | String | Bulk API Batch ID | contentType | common | | ContentType | Bulk API content type one of XML CSV ZIP_XML ZIP_CSV +| defaultReplayId | common | | Integer | Default replayId setting if no value is found in link initialReplayIdMap | format | common | | PayloadFormat | Payload format to use for Salesforce API calls either JSON or XML defaults to JSON | httpClient | common | | SalesforceHttpClient | Custom Jetty Http Client to use to connect to Salesforce. | includeDetails | common | | Boolean | Include details in Salesforce1 Analytics report defaults to false. +| initialReplayIdMap | common | | Map | Replay IDs to start from per channel name. | instanceId | common | | String | Salesforce1 Analytics report execution instance ID | jobId | common | | String | Bulk API Job ID | notifyForFields | common | | NotifyForFieldsEnum | Notify for fields options are ALL REFERENCED SELECT WHERE @@ -281,8 +281,6 @@ The Salesforce component supports 37 endpoint options which are listed below: // endpoint options: END - - For obvious security reasons it is recommended that the clientId, clientSecret, userName and password fields be not set in the pom.xml. The plugin should be configured for the rest of the properties, and can http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java index 600dcbf..61357ea 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java @@ -303,10 +303,10 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin } } - public SubscriptionHelper getSubscriptionHelper() throws Exception { + public SubscriptionHelper getSubscriptionHelper(String topicName) throws Exception { if (subscriptionHelper == null) { // lazily create subscription helper - subscriptionHelper = new SubscriptionHelper(this); + subscriptionHelper = new SubscriptionHelper(this, topicName); // also start the helper to connect to Salesforce ServiceHelper.startService(subscriptionHelper); http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java index b2fccfe..e606a81 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java @@ -47,6 +47,7 @@ public class SalesforceConsumer extends DefaultConsumer { private static final String TYPE_PROPERTY = "type"; private static final String CREATED_DATE_PROPERTY = "createdDate"; private static final String SOBJECT_PROPERTY = "sobject"; + private static final String REPLAY_ID_PROPERTY = "replayId"; private static final double MINIMUM_VERSION = 24.0; private final SalesforceEndpoint endpoint; @@ -152,6 +153,7 @@ public class SalesforceConsumer extends DefaultConsumer { final Map<String, Object> event = (Map<String, Object>) data.get(EVENT_PROPERTY); final Object eventType = event.get(TYPE_PROPERTY); Object createdDate = event.get(CREATED_DATE_PROPERTY); + Object replayId = event.get(REPLAY_ID_PROPERTY); if (log.isDebugEnabled()) { log.debug(String.format("Received event %s on channel %s created on %s", eventType, channel.getChannelId(), createdDate)); @@ -159,6 +161,9 @@ public class SalesforceConsumer extends DefaultConsumer { in.setHeader("CamelSalesforceEventType", eventType); in.setHeader("CamelSalesforceCreatedDate", createdDate); + if (replayId != null) { + in.setHeader("CamelSalesforceReplayId", replayId); + } // get SObject @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java index c0a9dac..0e8b3b1 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java @@ -75,7 +75,7 @@ public class SalesforceEndpoint extends DefaultEndpoint { } final SalesforceConsumer consumer = new SalesforceConsumer(this, processor, - getComponent().getSubscriptionHelper()); + getComponent().getSubscriptionHelper(topicName)); configureConsumer(consumer); return consumer; } http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java index 36a22af..39d0d06 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java @@ -72,6 +72,10 @@ public class SalesforceEndpointConfig implements Cloneable { public static final String REPORT_METADATA = "reportMetadata"; public static final String INSTANCE_ID = "instanceId"; + // parameters for Streaming API + public static final String DEFAULT_REPLAY_ID = "defaultReplayId"; + public static final String INITIAL_REPLAY_ID_MAP = "initialReplayIdMap"; + // default maximum authentication retries on failed authentication or expired session public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4; @@ -143,6 +147,12 @@ public class SalesforceEndpointConfig implements Cloneable { @UriParam private String instanceId; + // Streaming API properties + @UriParam + private Integer defaultReplayId; + @UriParam + private Map<String, Integer> initialReplayIdMap; + // Salesforce Jetty9 HttpClient, set using reference @UriParam private SalesforceHttpClient httpClient; @@ -535,6 +545,33 @@ public class SalesforceEndpointConfig implements Cloneable { valueMap.put(REPORT_METADATA, reportMetadata); valueMap.put(INSTANCE_ID, instanceId); + // add streaming API properties + valueMap.put(DEFAULT_REPLAY_ID, defaultReplayId); + valueMap.put(INITIAL_REPLAY_ID_MAP, initialReplayIdMap); + return Collections.unmodifiableMap(valueMap); } -} \ No newline at end of file + + public Integer getDefaultReplayId() { + return defaultReplayId; + } + + /** + * Default replayId setting if no value is found in {@link #initialReplayIdMap} + * @param defaultReplayId + */ + public void setDefaultReplayId(Integer defaultReplayId) { + this.defaultReplayId = defaultReplayId; + } + + public Map<String, Integer> getInitialReplayIdMap() { + return initialReplayIdMap; + } + + /** + * Replay IDs to start from per channel name. + */ + public void setInitialReplayIdMap(Map<String, Integer> initialReplayIdMap) { + this.initialReplayIdMap = initialReplayIdMap; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java index 85115d1..e4ac3e2 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java @@ -31,6 +31,7 @@ public class SObjectUrls extends AbstractDTOBase { private String compactLayouts; private String caseRowArticleSuggestions; private String push; + private String defaultValues; public String getSobject() { return sobject; @@ -135,4 +136,12 @@ public class SObjectUrls extends AbstractDTOBase { public void setPush(String push) { this.push = push; } + + public String getDefaultValues() { + return defaultValues; + } + + public void setDefaultValues(String defaultValues) { + this.defaultValues = defaultValues; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java new file mode 100644 index 0000000..98df4ef --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.streaming; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.cometd.bayeux.Channel; +import org.cometd.bayeux.Message; +import org.cometd.bayeux.client.ClientSession; +import org.cometd.bayeux.client.ClientSession.Extension.Adapter; + +/** + * CometDReplayExtension, typical usages are the following: + * {@code client.addExtension(new CometDReplayExtension<>(replayMap));} + * + * @author yzhao + * @since 198 (Winter '16) + */ +public class CometDReplayExtension<V> extends Adapter { + private static final String EXTENSION_NAME = "replay"; + private final ConcurrentMap<String, V> dataMap = new ConcurrentHashMap<>(); + private final AtomicBoolean supported = new AtomicBoolean(); + + public CometDReplayExtension(Map<String, V> dataMap) { + this.dataMap.putAll(dataMap); + } + + @Override + @SuppressWarnings("unchecked") + public boolean rcv(ClientSession session, Message.Mutable message) { + Object data = message.get(EXTENSION_NAME); + if (this.supported.get() && data != null) { + try { + dataMap.put(message.getChannel(), (V) data); + } catch (ClassCastException e) { + return false; + } + } + return true; + } + + @Override + public boolean rcvMeta(ClientSession session, Message.Mutable message) { + switch (message.getChannel()) { + case Channel.META_HANDSHAKE: + Map<String, Object> ext = message.getExt(false); + this.supported.set(ext != null && Boolean.TRUE.equals(ext.get(EXTENSION_NAME))); + break; + default: + break; + } + return true; + } + + @Override + public boolean sendMeta(ClientSession session, Message.Mutable message) { + switch (message.getChannel()) { + case Channel.META_HANDSHAKE: + message.getExt(true).put(EXTENSION_NAME, Boolean.TRUE); + break; + case Channel.META_SUBSCRIBE: + if (supported.get()) { + message.getExt(true).put(EXTENSION_NAME, dataMap); + } + break; + default: + break; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 1cc4a21..befd168 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -17,6 +17,8 @@ package org.apache.camel.component.salesforce.internal.streaming; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -31,6 +33,8 @@ import org.apache.camel.component.salesforce.SalesforceHttpClient; import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.apache.camel.support.ServiceSupport; import org.cometd.bayeux.Message; +import org.cometd.bayeux.client.ClientSession; +import org.cometd.bayeux.client.ClientSession.Extension; import org.cometd.bayeux.client.ClientSessionChannel; import org.cometd.client.BayeuxClient; import org.cometd.client.transport.ClientTransport; @@ -56,6 +60,8 @@ public class SubscriptionHelper extends ServiceSupport { private static final String EXCEPTION_FIELD = "exception"; + private static final double MINIMUM_REPLAY_VERSION = 36.0; + private final SalesforceComponent component; private final SalesforceSession session; private final BayeuxClient client; @@ -71,14 +77,14 @@ public class SubscriptionHelper extends ServiceSupport { private String connectError; private boolean reconnecting; - public SubscriptionHelper(SalesforceComponent component) throws Exception { + public SubscriptionHelper(SalesforceComponent component, String topicName) throws Exception { this.component = component; this.session = component.getSession(); this.listenerMap = new ConcurrentHashMap<SalesforceConsumer, ClientSessionChannel.MessageListener>(); // create CometD client - this.client = createClient(); + this.client = createClient(topicName); } @Override @@ -179,7 +185,7 @@ public class SubscriptionHelper extends ServiceSupport { } } - private BayeuxClient createClient() throws Exception { + private BayeuxClient createClient(String topicName) throws Exception { // use default Jetty client from SalesforceComponent, its shared by all consumers final SalesforceHttpClient httpClient = component.getConfig().getHttpClient(); @@ -203,6 +209,29 @@ public class SubscriptionHelper extends ServiceSupport { }; BayeuxClient client = new BayeuxClient(getEndpointUrl(), transport); + Integer replayId = null; + String channelName = getChannelName(topicName); + Map<String, Integer> replayIdMap = component.getConfig().getInitialReplayIdMap(); + if (replayIdMap != null) { + replayId = replayIdMap.get(channelName); + } + if (replayId == null) { + replayId = component.getConfig().getDefaultReplayId(); + } + if (replayId != null) { + LOG.info("Sending replayId={} for channel {}", replayId, channelName); + List<Extension> extensions = client.getExtensions(); + Extension ext = null; + for (Iterator<Extension> iter = extensions.iterator(); iter.hasNext(); ext = iter.next()) { + if (ext instanceof CometDReplayExtension) { + iter.remove(); + } + } + Map<String, Integer> dataMap = new HashMap<>(); + dataMap.put(channelName, replayId); + ClientSession.Extension extension = new CometDReplayExtension<>(dataMap); + client.addExtension(extension); + } return client; } @@ -344,7 +373,12 @@ public class SubscriptionHelper extends ServiceSupport { } public String getEndpointUrl() { - return component.getSession().getInstanceUrl() + "/cometd/" + component.getConfig().getApiVersion(); + if (Double.valueOf(component.getConfig().getApiVersion()) >= MINIMUM_REPLAY_VERSION + && (component.getConfig().getDefaultReplayId() != null || !component.getConfig().getInitialReplayIdMap().isEmpty())) { + return component.getSession().getInstanceUrl() + "/cometd/replay/" + component.getConfig().getApiVersion(); + } else { + return component.getSession().getInstanceUrl() + "/cometd/" + component.getConfig().getApiVersion(); + } } }
