This is an automated email from the ASF dual-hosted git repository.

shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new b686120  UNOMI-343 Fix integrations tests that are currently broken
     new 1a80d71  Merge pull request #163 from enonic/unomi-343
b686120 is described below

commit b6861206751711a144086e7258d82d40fe107517
Author: Pavel Milkevich <[email protected]>
AuthorDate: Thu Jun 4 21:17:58 2020 +0300

    UNOMI-343 Fix integrations tests that are currently broken
---
 .../test/java/org/apache/unomi/itests/BaseIT.java  | 35 ++++++++++++++++++++--
 .../org/apache/unomi/itests/ContextServletIT.java  | 25 +++++++++++-----
 .../apache/unomi/itests/ProfileImportActorsIT.java | 33 +++++++++++++++-----
 .../unomi/itests/ProfileImportRankingIT.java       | 22 ++++++++++----
 .../unomi/itests/ProfileImportSurfersIT.java       | 33 +++++++++++++-------
 .../unomi/itests/graphql/GraphQLEventIT.java       | 12 ++++----
 .../unomi/itests/graphql/GraphQLProfileIT.java     |  8 +++++
 .../unomi/itests/graphql/GraphQLWebSocketIT.java   | 28 +++++++++++++----
 8 files changed, 150 insertions(+), 46 deletions(-)

diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java 
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 336771d..f6c35e4 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -17,6 +17,10 @@
 
 package org.apache.unomi.itests;
 
+import org.apache.unomi.api.Item;
+import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.persistence.spi.PersistenceService;
 import org.junit.Assert;
 import org.ops4j.pax.exam.Configuration;
 import org.ops4j.pax.exam.CoreOptions;
@@ -24,9 +28,10 @@ import org.ops4j.pax.exam.Option;
 import org.ops4j.pax.exam.karaf.container.internal.JavaVersionUtil;
 import org.ops4j.pax.exam.karaf.options.LogLevelOption.LogLevel;
 import org.ops4j.pax.exam.options.MavenArtifactUrlReference;
-import org.ops4j.pax.exam.options.MavenUrlReference;
 import org.ops4j.pax.exam.options.extra.VMOption;
+import org.ops4j.pax.exam.util.Filter;
 
+import javax.inject.Inject;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,7 +41,12 @@ import java.util.function.Supplier;
 
 import static org.ops4j.pax.exam.CoreOptions.maven;
 import static org.ops4j.pax.exam.CoreOptions.systemProperty;
-import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.*;
+import static 
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.debugConfiguration;
+import static 
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.editConfigurationFilePut;
+import static 
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.karafDistributionConfiguration;
+import static 
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.keepRuntimeFolder;
+import static 
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.logLevel;
+import static 
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.replaceConfigurationFile;
 
 /**
  * Base class for integration tests.
@@ -50,6 +60,27 @@ public abstract class BaseIT {
     protected static final String KARAF_DIR = "target/exam";
     protected static final String UNOMI_KEY = 
"670c26d1cc413346c3b2fd9ce65dab41";
 
+    @Inject
+    @Filter(timeout = 600000)
+    protected PersistenceService persistenceService;
+
+    @Inject
+    @Filter(timeout = 600000)
+    protected DefinitionsService definitionsService;
+
+    protected void removeItems(final Class<? extends Item> ...classes) throws 
InterruptedException {
+        Condition condition = new 
Condition(definitionsService.getConditionType("matchAllCondition"));
+        for (Class<? extends Item> aClass : classes) {
+            persistenceService.removeByQuery(condition, aClass);
+        }
+        refreshPersistence();
+    }
+
+    protected void refreshPersistence() throws InterruptedException {
+        persistenceService.refresh();
+        Thread.sleep(1000);
+    }
+
     @Configuration
     public Option[] config() throws InterruptedException {
 
diff --git a/itests/src/test/java/org/apache/unomi/itests/ContextServletIT.java 
b/itests/src/test/java/org/apache/unomi/itests/ContextServletIT.java
index 926627b..2830e29 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ContextServletIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ContextServletIT.java
@@ -21,7 +21,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
-import org.apache.unomi.api.*;
+import org.apache.unomi.api.ContextRequest;
+import org.apache.unomi.api.ContextResponse;
+import org.apache.unomi.api.Event;
+import org.apache.unomi.api.Metadata;
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.Session;
 import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.segments.Segment;
 import org.apache.unomi.api.services.DefinitionsService;
@@ -87,6 +92,8 @@ public class ContextServletIT extends BaseIT {
        @Filter(timeout = 600000)
        protected SegmentService segmentService;
 
+       private Profile profile;
+
        @Before
        public void setUp() throws InterruptedException {
                //Create a past-event segment
@@ -100,7 +107,12 @@ public class ContextServletIT extends BaseIT {
                
segmentCondition.setParameter("eventCondition",pastEventEventCondition);
                segment.setCondition(segmentCondition);
                segmentService.setSegmentDefinition(segment);
-               Thread.sleep(2000);
+
+               String profileId = "test-profile-id";
+               profile = new Profile(profileId);
+               profileService.save(profile);
+
+               refreshPersistence();
        }
 
        @After
@@ -108,8 +120,8 @@ public class ContextServletIT extends BaseIT {
                TestUtils.removeAllEvents(definitionsService, 
persistenceService);
                TestUtils.removeAllSessions(definitionsService, 
persistenceService);
                TestUtils.removeAllProfiles(definitionsService, 
persistenceService);
+               profileService.delete(profile.getItemId(), false);
                segmentService.removeSegmentDefinition(SEGMENT_ID,false);
-               persistenceService.refresh();
        }
 
        @Test
@@ -182,15 +194,12 @@ public class ContextServletIT extends BaseIT {
        public void 
testUpdateEventFromContextAuthorizedThirdPartyNoItemID_Fail() throws 
IOException, InterruptedException {
                //Arrange
                String eventId = "test-event-id3";
-               String profileId = "test-profile-id";
                String sessionId = "test-session-id";
                String scope = "test-scope";
                String eventTypeOriginal = "test-event-type-original";
                String eventTypeUpdated = "test-event-type-updated";
-               Profile profile = new Profile(profileId);
                Session session = new Session(sessionId, profile, new Date(), 
scope);
                Event event = new Event(eventId, eventTypeOriginal, session, 
profile, scope, null, null, new Date());
-               profileService.save(profile);
                this.eventService.send(event);
                Thread.sleep(2000);
                event.setEventType(eventTypeUpdated); //change the event so we 
can see the update effect
@@ -211,7 +220,7 @@ public class ContextServletIT extends BaseIT {
        }
 
        @Test
-       public void 
testCreateEventsWithNoTimestampParam_profileAddedToSegment() throws IOException 
{
+       public void 
testCreateEventsWithNoTimestampParam_profileAddedToSegment() throws 
IOException, InterruptedException {
                //Arrange
                String sessionId = "test-session-id";
                String scope = "test-scope";
@@ -227,6 +236,8 @@ public class ContextServletIT extends BaseIT {
                HttpPost request = new HttpPost(URL + CONTEXT_URL);
                request.setEntity(new 
StringEntity(objectMapper.writeValueAsString(contextRequest), 
ContentType.create("application/json")));
                String cookieHeaderValue = 
TestUtils.executeContextJSONRequest(request, sessionId).getCookieHeaderValue();
+               Thread.sleep(1000); //Making sure DB is updated
+
                //Add the context-profile-id cookie to the second event
                request.addHeader("Cookie", cookieHeaderValue);
                ContextResponse response = 
(TestUtils.executeContextJSONRequest(request, sessionId)).getContextResponse(); 
//second event
diff --git 
a/itests/src/test/java/org/apache/unomi/itests/ProfileImportActorsIT.java 
b/itests/src/test/java/org/apache/unomi/itests/ProfileImportActorsIT.java
index 677ccbc..0d31b7f 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileImportActorsIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileImportActorsIT.java
@@ -34,7 +34,12 @@ import org.ops4j.pax.exam.util.Filter;
 
 import javax.inject.Inject;
 import java.io.File;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 /**
  * Created by amidani on 14/08/2017.
@@ -43,9 +48,11 @@ import java.util.*;
 @ExamReactorStrategy(PerSuite.class)
 public class ProfileImportActorsIT extends BaseIT {
 
-    @Inject @Filter(value="(configDiscriminator=IMPORT)", timeout = 600000)
+    @Inject
+    @Filter(value = "(configDiscriminator=IMPORT)", timeout = 600000)
     protected ImportExportConfigurationService<ImportConfiguration> 
importConfigurationService;
-    @Inject @Filter(timeout = 600000)
+    @Inject
+    @Filter(timeout = 600000)
     protected ProfileService profileService;
 
     @Test
@@ -66,11 +73,17 @@ public class ProfileImportActorsIT extends BaseIT {
         profileService.setPropertyType(propertyTypeTwitterId);
         profileService.setPropertyType(propertyTypeActorsGenres);
 
-        PropertyType propTwitterId = 
profileService.getPropertyType("twitterId");
-        Assert.assertNotNull(propTwitterId);
+        PropertyType propTwitterId = keepTrying("Failed waiting for property 
type 'twitterId'",
+                () -> profileService.getPropertyType("twitterId"),
+                Objects::nonNull,
+                1000,
+                100);
 
-        PropertyType propActorsGenre = 
profileService.getPropertyType("movieGenres");
-        Assert.assertNotNull(propActorsGenre);
+        PropertyType propActorsGenre = keepTrying("Failed waiting for property 
type 'movieGenres'",
+                () -> profileService.getPropertyType("movieGenres"),
+                Objects::nonNull,
+                1000,
+                100);
 
 
         /*** Actors Test ***/
@@ -101,7 +114,11 @@ public class ProfileImportActorsIT extends BaseIT {
         importConfigurationService.save(importConfigActors, true);
 
         //Wait for data to be processed
-        keepTrying("Failed waiting for actors initial import to complete", 
()-> profileService.findProfilesByPropertyValue("properties.city", "hollywood", 
0, 10, null), (p)->p.getTotalSize() == 6, 1000, 200);
+        keepTrying("Failed waiting for actors initial import to complete",
+                () -> 
profileService.findProfilesByPropertyValue("properties.city", "hollywood", 0, 
10, null),
+                (p) -> p.getTotalSize() == 6,
+                1000,
+                200);
 
         List<ImportConfiguration> importConfigurations = 
importConfigurationService.getAll();
         Assert.assertEquals(1, importConfigurations.size());
diff --git 
a/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java 
b/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
index 9694598..9159426 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
@@ -38,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Created by amidani on 09/08/2017.
@@ -71,11 +72,17 @@ public class ProfileImportRankingIT extends BaseIT {
 
         profileService.setPropertyType(propertyTypeRank);
 
-        PropertyType propUciId = profileService.getPropertyType("uciId");
-        Assert.assertNotNull(propUciId);
+        PropertyType propUciId = keepTrying("Failed waiting for property type 
'uciId'",
+                () -> profileService.getPropertyType("uciId"),
+                Objects::nonNull,
+                1000,
+                100);
 
-        PropertyType propRankId = profileService.getPropertyType("rank");
-        Assert.assertNotNull(propRankId);
+        PropertyType propRankId = keepTrying("Failed waiting for property type 
'rank'",
+                () -> profileService.getPropertyType("rank"),
+                Objects::nonNull,
+                1000,
+                100);
 
 
         /*** Surfers Test ***/
@@ -108,8 +115,11 @@ public class ProfileImportRankingIT extends BaseIT {
         //Wait for data to be processed
         keepTrying("Failed waiting for ranking import to complete", 
()->profileService.findProfilesByPropertyValue("properties.city", 
"rankingCity", 0, 50, null), (p)->p.getTotalSize() == 25, 1000, 200);
 
-        List<ImportConfiguration> importConfigurations = 
importConfigurationService.getAll();
-        Assert.assertEquals(1, importConfigurations.size());
+        List<ImportConfiguration> importConfigurations = keepTrying("Failed 
waiting for import configurations list with 1 item",
+                () -> importConfigurationService.getAll(),
+                (list) -> Objects.nonNull(list) && list.size() == 1,
+                1000,
+                100);
 
         PartialList<Profile> gregProfileList = 
profileService.findProfilesByPropertyValue("properties.uciId", "10004451371", 
0, 10, null);
         Assert.assertEquals(1, gregProfileList.getList().size());
diff --git 
a/itests/src/test/java/org/apache/unomi/itests/ProfileImportSurfersIT.java 
b/itests/src/test/java/org/apache/unomi/itests/ProfileImportSurfersIT.java
index e4bbf69..01ea0fe 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileImportSurfersIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileImportSurfersIT.java
@@ -40,8 +40,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Created by amidani on 09/08/2017.
@@ -67,11 +67,13 @@ public class ProfileImportSurfersIT extends BaseIT {
 
         profileService.setPropertyType(propertyType);
 
-        PropertyType propAlive = profileService.getPropertyType("alive");
+        keepTrying("Failed waiting for property type 'alive'",
+                () -> profileService.getPropertyType("alive"),
+                Objects::nonNull,
+                1000,
+                100);
 
-        Assert.assertNotNull("Alive property should not be null", propAlive);
-
-        propAlive = 
RouterUtils.getPropertyTypeById(profileService.getTargetPropertyTypes("profiles"),
 "alive");
+        PropertyType propAlive = 
RouterUtils.getPropertyTypeById(profileService.getTargetPropertyTypes("profiles"),
 "alive");
 
         Assert.assertNotNull("Lookup of alive property through profiles target 
has failed !", propAlive);
 
@@ -107,8 +109,11 @@ public class ProfileImportSurfersIT extends BaseIT {
         //Wait for data to be processed
         keepTrying("Failed waiting for surfers initial import to complete", 
()->profileService.findProfilesByPropertyValue("properties.city", 
"surfersCity", 0, 50, null), (p)->p.getTotalSize() == 34, 1000, 100);
 
-        List<ImportConfiguration> importConfigurations = 
importConfigurationService.getAll();
-        Assert.assertEquals(1, importConfigurations.size());
+        keepTrying("Failed waiting for import configurations list with 1 item",
+                () -> importConfigurationService.getAll(),
+                (list) -> Objects.nonNull(list) && list.size() == 1,
+                1000,
+                100);
 
         //Profile not to delete
         PartialList<Profile> jordyProfile = 
profileService.findProfilesByPropertyValue("properties.email", 
"[email protected]", 0, 10, null);
@@ -152,8 +157,11 @@ public class ProfileImportSurfersIT extends BaseIT {
         //Wait for data to be processed
         keepTrying("Failed waiting for surfers overwrite import to complete", 
()->profileService.findProfilesByPropertyValue("properties.city", 
"surfersCity", 0, 50, null), (p)->p.getTotalSize() == 36, 1000, 100);
 
-        importConfigurations = importConfigurationService.getAll();
-        Assert.assertEquals(1, importConfigurations.size());
+        keepTrying("Failed waiting for import configurations list with 1 item",
+                () -> importConfigurationService.getAll(),
+                (list) -> Objects.nonNull(list) && list.size() == 1,
+                1000,
+                100);
 
         //Profile not to delete
         PartialList<Profile> aliveProfiles = 
profileService.findProfilesByPropertyValue("properties.alive", "true", 0, 50, 
null);
@@ -191,8 +199,11 @@ public class ProfileImportSurfersIT extends BaseIT {
         //Wait for data to be processed
         keepTrying("Failed waiting for surfers delete import to complete", 
()->profileService.findProfilesByPropertyValue("properties.city", 
"surfersCity", 0, 50, null), (p)->p.getTotalSize() == 0, 1000, 100);
 
-        importConfigurations = importConfigurationService.getAll();
-        Assert.assertEquals(1, importConfigurations.size());
+        keepTrying("Failed waiting for import configurations list with 1 item",
+                () -> importConfigurationService.getAll(),
+                (list) -> Objects.nonNull(list) && list.size() == 1,
+                1000,
+                100);
 
         PartialList<Profile> jordyProfileDelete = 
profileService.findProfilesByPropertyValue("properties.email", 
"[email protected]", 0, 10, null);
         Assert.assertEquals(0, jordyProfileDelete.getList().size());
diff --git 
a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLEventIT.java 
b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLEventIT.java
index 1c6363b..ad514e2 100644
--- a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLEventIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLEventIT.java
@@ -19,7 +19,6 @@ package org.apache.unomi.itests.graphql;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.unomi.api.Event;
 import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.conditions.Condition;
 import org.apache.unomi.api.services.DefinitionsService;
 import org.apache.unomi.api.services.EventService;
 import org.apache.unomi.persistence.spi.PersistenceService;
@@ -58,11 +57,7 @@ public class GraphQLEventIT extends BaseGraphQLIT {
         profile = new Profile(profileID);
         persistenceService.save(profile);
 
-        Condition condition = new 
Condition(definitionsService.getConditionType("matchAllCondition"));
-        persistenceService.removeByQuery(condition, Event.class);
-        persistenceService.refresh();
-        // Wait for refresh to finish
-        Thread.sleep(1000);
+        removeItems(Event.class);
     }
 
 
@@ -78,6 +73,8 @@ public class GraphQLEventIT extends BaseGraphQLIT {
     @Test
     public void testGetEvent() throws IOException, InterruptedException {
         final Event event = createEvent(eventID, profile);
+        refreshPersistence();
+
         try (CloseableHttpResponse response = 
post("graphql/event/get-event.json")) {
             final ResponseContext context = 
ResponseContext.parse(response.getEntity());
 
@@ -93,6 +90,8 @@ public class GraphQLEventIT extends BaseGraphQLIT {
         createEvent("event-2", profile);
         final Profile profile2 = new Profile("profile-2");
         createEvent("event-3", profile2);
+        refreshPersistence();
+
         try (CloseableHttpResponse response = 
post("graphql/event/find-events.json")) {
             final ResponseContext context = 
ResponseContext.parse(response.getEntity());
             Assert.assertNotNull(context.getValue("data.cdp.findEvents"));
@@ -122,7 +121,6 @@ public class GraphQLEventIT extends BaseGraphQLIT {
     private Event createEvent(final String eventID, final Profile profile) 
throws InterruptedException {
         Event event = new Event(eventID, "profileUpdated", null, profile, 
"test", profile, null, new Date());
         persistenceService.save(event);
-        Thread.sleep(1000);
         return event;
     }
 }
diff --git 
a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLProfileIT.java 
b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLProfileIT.java
index 21eeca7..867b654 100644
--- a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLProfileIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLProfileIT.java
@@ -20,6 +20,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.unomi.api.Profile;
 import org.apache.unomi.api.services.ProfileService;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.ops4j.pax.exam.util.Filter;
 
@@ -33,6 +34,11 @@ public class GraphQLProfileIT extends BaseGraphQLIT {
     @Filter(timeout = 600000)
     protected ProfileService profileService;
 
+    @Before
+    public void setUp() throws InterruptedException {
+        removeItems(Profile.class);
+    }
+
     @Test
     public void testGetProfile_WithoutCreation() throws IOException {
         try (CloseableHttpResponse response = 
post("graphql/profile/get-profile-without-creation.json")) {
@@ -73,6 +79,7 @@ public class GraphQLProfileIT extends BaseGraphQLIT {
         final Profile profile = new Profile("FindProfiles_ProfileId1");
         profile.setProperty("firstName", "FindProfiles_Username1");
         profileService.save(profile);
+        refreshPersistence();
 
         try (CloseableHttpResponse response = 
post("graphql/profile/find-profiles.json")) {
             final ResponseContext context = 
ResponseContext.parse(response.getEntity());
@@ -86,6 +93,7 @@ public class GraphQLProfileIT extends BaseGraphQLIT {
         final Profile profile = new 
Profile("profileId_deleteAllPersonalDataTest");
         profile.setProperty("firstName", "FirstName");
         profileService.save(profile);
+        refreshPersistence();
 
         try (CloseableHttpResponse response = 
post("graphql/profile/delete-all-personal-data.json")) {
             final ResponseContext context = 
ResponseContext.parse(response.getEntity());
diff --git 
a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLWebSocketIT.java 
b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLWebSocketIT.java
index 3ee25f0..74a4ad8 100644
--- 
a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLWebSocketIT.java
+++ 
b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLWebSocketIT.java
@@ -21,7 +21,6 @@ import io.reactivex.Flowable;
 import io.reactivex.Observable;
 import io.reactivex.ObservableEmitter;
 import io.reactivex.subscribers.DefaultSubscriber;
-import org.apache.unomi.itests.BasicIT;
 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -33,6 +32,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +47,7 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
     @Test
     public void testWebSocketConnectionSegment() throws Exception {
         WebSocketClient client = new WebSocketClient();
+        Socket socket = new Socket();
         try {
             LOGGER.info("Starting web socket client...");
             client.start();
@@ -53,7 +55,6 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
             URI echoUri = new URI(SUBSCRIPTION_ENDPOINT);
             ClientUpgradeRequest request = new ClientUpgradeRequest();
 
-            Socket socket = new Socket();
             Future<Session> onConnected = client.connect(socket, echoUri, 
request);
             RemoteEndpoint remote = onConnected.get().getRemote();
 
@@ -88,13 +89,18 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
 
         private Flowable<String> publisher;
 
-        private ObservableEmitter<String> emitter;
+        private CompletableFuture<ObservableEmitter<String>> emitterFuture;
 
         private CompletableFuture<CloseStatus> closeStatus = new 
CompletableFuture<>();
 
+        private List<Future<String>> messageListeners = new ArrayList<>();
+
         public Socket() {
+            // web socket message may come faster than observable callback is 
executed
+            emitterFuture = new CompletableFuture<>();
+
             publisher = Observable
-                    .create((ObservableEmitter<String> emitter) -> 
this.emitter = emitter)
+                    .create((ObservableEmitter<String> emitter) -> 
this.emitterFuture.complete(emitter))
                     .toFlowable(BackpressureStrategy.BUFFER);
         }
 
@@ -105,7 +111,11 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
 
         @Override
         public void onWebSocketText(String message) {
-            this.emitter.onNext(message);
+            try {
+                this.emitterFuture.get(10, TimeUnit.SECONDS).onNext(message);
+            } catch (Exception e) {
+                throw new RuntimeException("Could not get emitter", e);
+            }
         }
 
         public Future<String> waitMessage() {
@@ -128,8 +138,10 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
                 public void onComplete() {
                     future.cancel(false);
                     cancel();
+                    messageListeners.remove(future);
                 }
             });
+            messageListeners.add(future);
             return future;
         }
 
@@ -139,8 +151,14 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
 
         @Override
         public void onWebSocketClose(int statusCode, String reason) {
+            LOGGER.info("Web socket close, code: " + statusCode + ", reason: " 
+ reason);
             super.onWebSocketClose(statusCode, reason);
             closeStatus.complete(new CloseStatus(statusCode, reason));
+            cancelListeners();
+        }
+
+        private void cancelListeners() {
+            this.messageListeners.forEach(future -> future.cancel(false));
         }
     }
 

Reply via email to