This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7dd71fb4b83449d3bd5d6d061ef6ccbe532bc89f
Author: Zoran Regvart <zregv...@apache.org>
AuthorDate: Wed Dec 16 10:14:19 2020 +0100

    CAMEL-12871: stub server and test
    
    Adds a stub server implemented in Jetty, as it is already pulled in as a
    dependency, and a integration test for the testing streaming resiliency.
    
    Two cases are added in the integration test: server closing the
    TCP connection (e.g. in a abrupt server shutdown), and restarting the
    `SubscriptionHelper` service (e.g. when route is restarted).
---
 .../salesforce/internal/streaming/StubServer.java  | 285 ++++++++++++++++
 .../SubscriptionHelperIntegrationTest.java         | 367 +++++++++++++++++++++
 2 files changed, 652 insertions(+)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
new file mode 100644
index 0000000..fdfee4e
--- /dev/null
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.camel.util.IOHelper;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.AbstractNetworkConnector;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class StubServer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StubServer.class);
+
+    private final List<StubResponse> defaultStubs = new ArrayList<>();
+
+    private final Server server;
+
+    private final List<StubResponse> stubs = new ArrayList<>();
+
+    class StubHandler extends AbstractHandler {
+
+        @Override
+        public void handle(final String target, final Request baseRequest, 
final HttpServletRequest request, final HttpServletResponse response)
+            throws IOException, ServletException {
+            final String body;
+            try (Reader bodyReader = request.getReader()) {
+                body = IOHelper.toString(bodyReader);
+            }
+
+            final StubResponse stub = stubFor(request, body);
+
+            if (stub == null) {
+                LOG.error("Stub not found for {} {}", request.getMethod(), 
request.getRequestURI());
+                response.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
+                return;
+            }
+
+            response.setStatus(stub.responseStatus);
+            response.setContentType("application/json;charset=UTF-8");
+
+            final String id = messageIdFrom(body);
+
+            try (Writer out = response.getWriter()) {
+                stub.writeTo(id, out);
+            }
+        }
+
+        private StubResponse stubFor(final HttpServletRequest request, final 
String body) throws IOException {
+            final List<StubResponse> allResponses = new 
ArrayList<>(defaultStubs);
+            allResponses.addAll(stubs);
+
+            for (final StubResponse stub : allResponses) {
+                if (stub.matches(request, body)) {
+                    return stub;
+                }
+            }
+
+            return null;
+        }
+
+    }
+
+    final class StubResponse {
+
+        private Predicate<String> requestCondition;
+
+        private final String requestMethod;
+
+        private final String requestPath;
+
+        private BlockingQueue<String> responseMessages;
+
+        private final int responseStatus;
+
+        private String responseString;
+
+        public StubResponse(final String requestMethod, final String 
requestPath, final int responseStatus, final Predicate<String> requestCondition,
+            final BlockingQueue<String> responseMessages) {
+            this(requestMethod, requestPath, responseStatus, requestCondition);
+
+            this.responseMessages = responseMessages;
+        }
+
+        private StubResponse(final String requestMethod, final String 
requestPath, final int responseStatus) {
+            this.responseStatus = responseStatus;
+            this.requestMethod = Objects.requireNonNull(requestMethod, 
"requestMethod");
+            this.requestPath = Objects.requireNonNull(requestPath, 
"requestPath");
+        }
+
+        private StubResponse(final String requestMethod, final String 
requestPath, final int responseStatus, final BlockingQueue<String> 
responseMessages) {
+            this(requestMethod, requestPath, responseStatus);
+
+            this.responseMessages = responseMessages;
+        }
+
+        private StubResponse(final String requestMethod, final String 
requestPath, final int responseStatus, final Predicate<String> 
requestCondition) {
+            this(requestMethod, requestPath, responseStatus);
+
+            this.requestCondition = requestCondition;
+        }
+
+        private StubResponse(final String requestMethod, final String 
requestPath, final int responseStatus, final Predicate<String> requestCondition,
+            final String responseString) {
+            this(requestMethod, requestPath, responseStatus, requestCondition);
+
+            this.responseString = responseString;
+        }
+
+        private StubResponse(final String requestMethod, final String 
requestPath, final int responseStatus,
+            final String responseString) {
+            this(requestMethod, requestPath, responseStatus);
+            this.responseString = responseString;
+        }
+
+        @Override
+        public String toString() {
+            return requestMethod + " " + requestPath;
+        }
+
+        private boolean matches(final HttpServletRequest request, final String 
body) throws IOException {
+            final boolean matches = Objects.equals(requestMethod, 
request.getMethod()) && Objects.equals(requestPath, request.getRequestURI());
+
+            if (!matches) {
+                return false;
+            }
+
+            if (requestCondition == null) {
+                return true;
+            }
+
+            return requestCondition.test(body);
+        }
+
+        private void writeTo(final String messageId, final Writer out) throws 
IOException {
+            if (responseString != null) {
+                out.write(responseString.replace("$id", messageId));
+                out.flush();
+                return;
+            }
+
+            if (responseMessages != null) {
+                while (true) {
+                    try {
+                        final String message = responseMessages.poll(25, 
TimeUnit.MILLISECONDS);
+                        if (message != null) {
+                            out.write(message.replace("$id", messageId));
+                            out.flush();
+                            return;
+                        }
+
+                        if (!server.isRunning()) {
+                            return;
+                        }
+                    } catch (final InterruptedException ignored) {
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    public StubServer() {
+        server = new Server(0);
+        server.setHandler(new StubHandler());
+
+        try {
+            server.start();
+        } catch (final Exception e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public void abruptlyRestart() {
+        final int port = port();
+
+        stop();
+
+        connector().setPort(port);
+
+        try {
+            server.start();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public int port() {
+        return connector().getLocalPort();
+    }
+
+    public void replyTo(final String method, final String path, final 
BlockingQueue<String> messages) {
+        stubs.add(new StubResponse(method, path, 200, messages));
+    }
+
+    public void replyTo(final String method, final String path, final int 
status) {
+        stubs.add(new StubResponse(method, path, status));
+    }
+
+    public void replyTo(final String method, final String path, final 
Predicate<String> requestCondition, final BlockingQueue<String> messages) {
+        stubs.add(new StubResponse(method, path, 200, requestCondition, 
messages));
+    }
+
+    public void replyTo(final String method, final String path, final 
Predicate<String> requestCondition, final String response) {
+        stubs.add(new StubResponse(method, path, 200, requestCondition, 
response));
+    }
+
+    public void replyTo(final String method, final String path, final String 
response) {
+        stubs.add(new StubResponse(method, path, 200, response));
+    }
+
+    public void reset() {
+        stubs.clear();
+    }
+
+    public void stop() {
+        try {
+            for (final EndPoint endPoint : 
connector().getConnectedEndPoints()) {
+                endPoint.close();
+            }
+
+            server.stop();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void stubsAsDefaults() {
+        defaultStubs.addAll(stubs);
+        stubs.clear();
+    }
+
+    private AbstractNetworkConnector connector() {
+        final AbstractNetworkConnector connector = (AbstractNetworkConnector) 
server.getConnectors()[0];
+        return connector;
+    }
+
+    private static String messageIdFrom(final String body) {
+        int idx = body.indexOf("\"id\":\"");
+        String id = "";
+
+        if (idx > 0) {
+            idx += 6;
+            char ch;
+            while (Character.isDigit(ch = body.charAt(idx++))) {
+                id += ch;
+            }
+        }
+        return id;
+    }
+
+}
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
new file mode 100644
index 0000000..9eff7cc
--- /dev/null
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
+import org.apache.camel.component.salesforce.AuthenticationType;
+import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceConsumer;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class SubscriptionHelperIntegrationTest {
+
+    final CamelContext camel;
+
+    final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
+
+    final BlockingQueue<String> messages = new LinkedBlockingDeque<>();
+
+    final SalesforceComponent salesforce;
+
+    final StubServer server;
+
+    final SubscriptionHelper subscription;
+
+    SalesforceConsumer toUnsubscribe;
+
+    static class MessageArgumentMatcher implements ArgumentMatcher<Message> {
+
+        private final String name;
+
+        public MessageArgumentMatcher(final String name) {
+            this.name = name;
+        }
+
+        @Override
+        public boolean matches(final Message message) {
+            final Map<String, Object> data = message.getDataAsMap();
+
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> event = (Map<String, Object>) 
data.get("event");
+
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> sobject = (Map<String, Object>) 
data.get("sobject");
+
+            return "created".equals(event.get("type")) && 
name.equals(sobject.get("Name"));
+        }
+
+        static Message messageForAccountCreationWithName(final String name) {
+            return argThat(new MessageArgumentMatcher(name));
+        }
+
+    }
+
+    public SubscriptionHelperIntegrationTest() throws SalesforceException {
+        server = new StubServer();
+
+        
LoggerFactory.getLogger(SubscriptionHelperIntegrationTest.class).info("Port for 
wireshark to filter: {}", server.port());
+
+        final String instanceUrl = "http://localhost:"; + server.port();
+
+        server.replyTo("POST", "/services/oauth2/token", 
"{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}");
+
+        server.replyTo("GET", "/services/oauth2/revoke?token=token", 200);
+
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n"
+            + "  {\n"
+            + "    \"ext\": {\n"
+            + "      \"replay\": true,\n"
+            + "      \"payload.format\": true\n"
+            + "    },\n"
+            + "    \"minimumVersion\": \"1.0\",\n"
+            + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+            + "    \"supportedConnectionTypes\": [\n"
+            + "      \"long-polling\"\n"
+            + "    ],\n"
+            + "    \"channel\": \"/meta/handshake\",\n"
+            + "    \"id\": \"$id\",\n"
+            + "    \"version\": \"1.0\",\n"
+            + "    \"successful\": true\n"
+            + "  }\n"
+            + "]");
+
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", (req) -> 
req.contains("\"timeout\":0"), "[\n"
+            + "  {\n"
+            + "    \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n"
+            + "    \"advice\": {\n"
+            + "      \"interval\": 0,\n"
+            + "      \"timeout\": 110000,\n"
+            + "      \"reconnect\": \"retry\"\n"
+            + "    },\n"
+            + "    \"channel\": \"/meta/connect\",\n"
+            + "    \"id\": \"$id\",\n"
+            + "    \"successful\": true\n"
+            + "  }\n"
+            + "]");
+
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", messages);
+
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", "[\n"
+            + "  {\n"
+            + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+            + "    \"channel\": \"/meta/subscribe\",\n"
+            + "    \"id\": \"$id\",\n"
+            + "    \"subscription\": \"/topic/Account\",\n"
+            + "    \"successful\": true\n"
+            + "  }\n"
+            + "]");
+
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/unsubscribe", "[\n"
+            + "  {\n"
+            + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+            + "    \"channel\": \"/meta/unsubscribe\",\n"
+            + "    \"id\": \"$id\",\n"
+            + "    \"subscription\": \"/topic/Account\",\n"
+            + "    \"successful\": true\n"
+            + "  }\n"
+            + "]");
+
+        server.replyTo("POST", "/cometd/" + 
SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect", "[\n"
+            + "  {\n"
+            + "     \"channel\": \"/meta/disconnect\",\n"
+            + "     \"clientId\": \"client-id\"\n"
+            + "   }\n"
+            + "]");
+
+        server.replyTo("GET", "/services/oauth2/revoke", 200);
+
+        server.stubsAsDefaults();
+
+        camel = new DefaultCamelContext();
+        camel.start();
+        salesforce = new SalesforceComponent(camel);
+        salesforce.setLoginUrl(instanceUrl);
+        salesforce.setClientId("clientId");
+        salesforce.setClientSecret("clientSecret");
+        salesforce.setRefreshToken("refreshToken");
+        salesforce.setAuthenticationType(AuthenticationType.REFRESH_TOKEN);
+        salesforce.setConfig(config);
+
+        salesforce.start();
+        subscription = new SubscriptionHelper(salesforce);
+    }
+
+    @BeforeEach
+    public void cleanSlate() throws CamelException {
+        if (toUnsubscribe != null) {
+            subscription.unsubscribe("Account", toUnsubscribe);
+        }
+        server.reset();
+    }
+
+    @AfterAll
+    public void stop() {
+        salesforce.stop();
+        camel.stop();
+        server.stop();
+    }
+
+    @Test
+    void shouldResubscribeOnConnectionFailures() throws InterruptedException {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer = toUnsubscribe = 
mock(SalesforceConsumer.class, 
"shouldResubscribeOnConnectionFailures:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, 
"shouldResubscribeOnConnectionFailures:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+            + "  {\n"
+            + "    \"data\": {\n"
+            + "      \"event\": {\n"
+            + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+            + "        \"replayId\": 1,\n"
+            + "        \"type\": \"created\"\n"
+            + "      },\n"
+            + "      \"sobject\": {\n"
+            + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+            + "        \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n"
+            + "      }\n"
+            + "    },\n"
+            + "    \"channel\": \"/topic/Account\"\n"
+            + "  },\n"
+            + "  {\n"
+            + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+            + "    \"channel\": \"/meta/connect\",\n"
+            + "    \"id\": \"$id\",\n"
+            + "    \"successful\": true\n"
+            + "  }\n"
+            + "]");
+
+        verify(consumer, 
Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class),
+            
messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1"));
+
+        // terminate server abruptly by closing the connection (sends FIN, ACK)
+        server.abruptlyRestart();
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+            + "  {\n"
+            + "    \"data\": {\n"
+            + "      \"event\": {\n"
+            + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+            + "        \"replayId\": 2,\n"
+            + "        \"type\": \"created\"\n"
+            + "      },\n"
+            + "      \"sobject\": {\n"
+            + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+            + "        \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n"
+            + "      }\n"
+            + "    },\n"
+            + "    \"channel\": \"/topic/Account\"\n"
+            + "  },\n"
+            + "  {\n"
+            + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+            + "    \"channel\": \"/meta/connect\",\n"
+            + "    \"id\": \"$id\",\n"
+            + "    \"successful\": true\n"
+            + "  }\n"
+            + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, 
timeout(10000)).processMessage(any(ClientSessionChannel.class),
+            
messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+
+    @Test
+    void shouldResubscribeOnHelperRestart() {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer = toUnsubscribe = 
mock(SalesforceConsumer.class, "shouldResubscribeOnHelperRestart:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, 
"shouldResubscribeOnHelperRestart:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+            + "  {\n"
+            + "    \"data\": {\n"
+            + "      \"event\": {\n"
+            + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+            + "        \"replayId\": 1,\n"
+            + "        \"type\": \"created\"\n"
+            + "      },\n"
+            + "      \"sobject\": {\n"
+            + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+            + "        \"Name\": \"shouldResubscribeOnHelperRestart 1\"\n"
+            + "      }\n"
+            + "    },\n"
+            + "    \"channel\": \"/topic/Account\"\n"
+            + "  },\n"
+            + "  {\n"
+            + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+            + "    \"channel\": \"/meta/connect\",\n"
+            + "    \"id\": \"$id\",\n"
+            + "    \"successful\": true\n"
+            + "  }\n"
+            + "]");
+        verify(consumer, 
timeout(100)).processMessage(any(ClientSessionChannel.class),
+            
messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 1"));
+
+        // stop and start the subscription helper
+        subscription.stop();
+        subscription.start();
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+            + "  {\n"
+            + "    \"data\": {\n"
+            + "      \"event\": {\n"
+            + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+            + "        \"replayId\": 2,\n"
+            + "        \"type\": \"created\"\n"
+            + "      },\n"
+            + "      \"sobject\": {\n"
+            + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+            + "        \"Name\": \"shouldResubscribeOnHelperRestart 2\"\n"
+            + "      }\n"
+            + "    },\n"
+            + "    \"channel\": \"/topic/Account\"\n"
+            + "  },\n"
+            + "  {\n"
+            + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+            + "    \"channel\": \"/meta/connect\",\n"
+            + "    \"id\": \"$id\",\n"
+            + "    \"successful\": true\n"
+            + "  }\n"
+            + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, 
timeout(2000)).processMessage(any(ClientSessionChannel.class),
+            
messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+}

Reply via email to