This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new b686120 UNOMI-343 Fix integrations tests that are currently broken
new 1a80d71 Merge pull request #163 from enonic/unomi-343
b686120 is described below
commit b6861206751711a144086e7258d82d40fe107517
Author: Pavel Milkevich <[email protected]>
AuthorDate: Thu Jun 4 21:17:58 2020 +0300
UNOMI-343 Fix integrations tests that are currently broken
---
.../test/java/org/apache/unomi/itests/BaseIT.java | 35 ++++++++++++++++++++--
.../org/apache/unomi/itests/ContextServletIT.java | 25 +++++++++++-----
.../apache/unomi/itests/ProfileImportActorsIT.java | 33 +++++++++++++++-----
.../unomi/itests/ProfileImportRankingIT.java | 22 ++++++++++----
.../unomi/itests/ProfileImportSurfersIT.java | 33 +++++++++++++-------
.../unomi/itests/graphql/GraphQLEventIT.java | 12 ++++----
.../unomi/itests/graphql/GraphQLProfileIT.java | 8 +++++
.../unomi/itests/graphql/GraphQLWebSocketIT.java | 28 +++++++++++++----
8 files changed, 150 insertions(+), 46 deletions(-)
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 336771d..f6c35e4 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -17,6 +17,10 @@
package org.apache.unomi.itests;
+import org.apache.unomi.api.Item;
+import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.persistence.spi.PersistenceService;
import org.junit.Assert;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.CoreOptions;
@@ -24,9 +28,10 @@ import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.karaf.container.internal.JavaVersionUtil;
import org.ops4j.pax.exam.karaf.options.LogLevelOption.LogLevel;
import org.ops4j.pax.exam.options.MavenArtifactUrlReference;
-import org.ops4j.pax.exam.options.MavenUrlReference;
import org.ops4j.pax.exam.options.extra.VMOption;
+import org.ops4j.pax.exam.util.Filter;
+import javax.inject.Inject;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,7 +41,12 @@ import java.util.function.Supplier;
import static org.ops4j.pax.exam.CoreOptions.maven;
import static org.ops4j.pax.exam.CoreOptions.systemProperty;
-import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.*;
+import static
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.debugConfiguration;
+import static
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.editConfigurationFilePut;
+import static
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.karafDistributionConfiguration;
+import static
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.keepRuntimeFolder;
+import static
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.logLevel;
+import static
org.ops4j.pax.exam.karaf.options.KarafDistributionOption.replaceConfigurationFile;
/**
* Base class for integration tests.
@@ -50,6 +60,27 @@ public abstract class BaseIT {
protected static final String KARAF_DIR = "target/exam";
protected static final String UNOMI_KEY =
"670c26d1cc413346c3b2fd9ce65dab41";
+ @Inject
+ @Filter(timeout = 600000)
+ protected PersistenceService persistenceService;
+
+ @Inject
+ @Filter(timeout = 600000)
+ protected DefinitionsService definitionsService;
+
+ protected void removeItems(final Class<? extends Item> ...classes) throws
InterruptedException {
+ Condition condition = new
Condition(definitionsService.getConditionType("matchAllCondition"));
+ for (Class<? extends Item> aClass : classes) {
+ persistenceService.removeByQuery(condition, aClass);
+ }
+ refreshPersistence();
+ }
+
+ protected void refreshPersistence() throws InterruptedException {
+ persistenceService.refresh();
+ Thread.sleep(1000);
+ }
+
@Configuration
public Option[] config() throws InterruptedException {
diff --git a/itests/src/test/java/org/apache/unomi/itests/ContextServletIT.java
b/itests/src/test/java/org/apache/unomi/itests/ContextServletIT.java
index 926627b..2830e29 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ContextServletIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ContextServletIT.java
@@ -21,7 +21,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
-import org.apache.unomi.api.*;
+import org.apache.unomi.api.ContextRequest;
+import org.apache.unomi.api.ContextResponse;
+import org.apache.unomi.api.Event;
+import org.apache.unomi.api.Metadata;
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.Session;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.segments.Segment;
import org.apache.unomi.api.services.DefinitionsService;
@@ -87,6 +92,8 @@ public class ContextServletIT extends BaseIT {
@Filter(timeout = 600000)
protected SegmentService segmentService;
+ private Profile profile;
+
@Before
public void setUp() throws InterruptedException {
//Create a past-event segment
@@ -100,7 +107,12 @@ public class ContextServletIT extends BaseIT {
segmentCondition.setParameter("eventCondition",pastEventEventCondition);
segment.setCondition(segmentCondition);
segmentService.setSegmentDefinition(segment);
- Thread.sleep(2000);
+
+ String profileId = "test-profile-id";
+ profile = new Profile(profileId);
+ profileService.save(profile);
+
+ refreshPersistence();
}
@After
@@ -108,8 +120,8 @@ public class ContextServletIT extends BaseIT {
TestUtils.removeAllEvents(definitionsService,
persistenceService);
TestUtils.removeAllSessions(definitionsService,
persistenceService);
TestUtils.removeAllProfiles(definitionsService,
persistenceService);
+ profileService.delete(profile.getItemId(), false);
segmentService.removeSegmentDefinition(SEGMENT_ID,false);
- persistenceService.refresh();
}
@Test
@@ -182,15 +194,12 @@ public class ContextServletIT extends BaseIT {
public void
testUpdateEventFromContextAuthorizedThirdPartyNoItemID_Fail() throws
IOException, InterruptedException {
//Arrange
String eventId = "test-event-id3";
- String profileId = "test-profile-id";
String sessionId = "test-session-id";
String scope = "test-scope";
String eventTypeOriginal = "test-event-type-original";
String eventTypeUpdated = "test-event-type-updated";
- Profile profile = new Profile(profileId);
Session session = new Session(sessionId, profile, new Date(),
scope);
Event event = new Event(eventId, eventTypeOriginal, session,
profile, scope, null, null, new Date());
- profileService.save(profile);
this.eventService.send(event);
Thread.sleep(2000);
event.setEventType(eventTypeUpdated); //change the event so we
can see the update effect
@@ -211,7 +220,7 @@ public class ContextServletIT extends BaseIT {
}
@Test
- public void
testCreateEventsWithNoTimestampParam_profileAddedToSegment() throws IOException
{
+ public void
testCreateEventsWithNoTimestampParam_profileAddedToSegment() throws
IOException, InterruptedException {
//Arrange
String sessionId = "test-session-id";
String scope = "test-scope";
@@ -227,6 +236,8 @@ public class ContextServletIT extends BaseIT {
HttpPost request = new HttpPost(URL + CONTEXT_URL);
request.setEntity(new
StringEntity(objectMapper.writeValueAsString(contextRequest),
ContentType.create("application/json")));
String cookieHeaderValue =
TestUtils.executeContextJSONRequest(request, sessionId).getCookieHeaderValue();
+ Thread.sleep(1000); //Making sure DB is updated
+
//Add the context-profile-id cookie to the second event
request.addHeader("Cookie", cookieHeaderValue);
ContextResponse response =
(TestUtils.executeContextJSONRequest(request, sessionId)).getContextResponse();
//second event
diff --git
a/itests/src/test/java/org/apache/unomi/itests/ProfileImportActorsIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileImportActorsIT.java
index 677ccbc..0d31b7f 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileImportActorsIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileImportActorsIT.java
@@ -34,7 +34,12 @@ import org.ops4j.pax.exam.util.Filter;
import javax.inject.Inject;
import java.io.File;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
/**
* Created by amidani on 14/08/2017.
@@ -43,9 +48,11 @@ import java.util.*;
@ExamReactorStrategy(PerSuite.class)
public class ProfileImportActorsIT extends BaseIT {
- @Inject @Filter(value="(configDiscriminator=IMPORT)", timeout = 600000)
+ @Inject
+ @Filter(value = "(configDiscriminator=IMPORT)", timeout = 600000)
protected ImportExportConfigurationService<ImportConfiguration>
importConfigurationService;
- @Inject @Filter(timeout = 600000)
+ @Inject
+ @Filter(timeout = 600000)
protected ProfileService profileService;
@Test
@@ -66,11 +73,17 @@ public class ProfileImportActorsIT extends BaseIT {
profileService.setPropertyType(propertyTypeTwitterId);
profileService.setPropertyType(propertyTypeActorsGenres);
- PropertyType propTwitterId =
profileService.getPropertyType("twitterId");
- Assert.assertNotNull(propTwitterId);
+ PropertyType propTwitterId = keepTrying("Failed waiting for property
type 'twitterId'",
+ () -> profileService.getPropertyType("twitterId"),
+ Objects::nonNull,
+ 1000,
+ 100);
- PropertyType propActorsGenre =
profileService.getPropertyType("movieGenres");
- Assert.assertNotNull(propActorsGenre);
+ PropertyType propActorsGenre = keepTrying("Failed waiting for property
type 'movieGenres'",
+ () -> profileService.getPropertyType("movieGenres"),
+ Objects::nonNull,
+ 1000,
+ 100);
/*** Actors Test ***/
@@ -101,7 +114,11 @@ public class ProfileImportActorsIT extends BaseIT {
importConfigurationService.save(importConfigActors, true);
//Wait for data to be processed
- keepTrying("Failed waiting for actors initial import to complete",
()-> profileService.findProfilesByPropertyValue("properties.city", "hollywood",
0, 10, null), (p)->p.getTotalSize() == 6, 1000, 200);
+ keepTrying("Failed waiting for actors initial import to complete",
+ () ->
profileService.findProfilesByPropertyValue("properties.city", "hollywood", 0,
10, null),
+ (p) -> p.getTotalSize() == 6,
+ 1000,
+ 200);
List<ImportConfiguration> importConfigurations =
importConfigurationService.getAll();
Assert.assertEquals(1, importConfigurations.size());
diff --git
a/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
index 9694598..9159426 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
@@ -38,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
* Created by amidani on 09/08/2017.
@@ -71,11 +72,17 @@ public class ProfileImportRankingIT extends BaseIT {
profileService.setPropertyType(propertyTypeRank);
- PropertyType propUciId = profileService.getPropertyType("uciId");
- Assert.assertNotNull(propUciId);
+ PropertyType propUciId = keepTrying("Failed waiting for property type
'uciId'",
+ () -> profileService.getPropertyType("uciId"),
+ Objects::nonNull,
+ 1000,
+ 100);
- PropertyType propRankId = profileService.getPropertyType("rank");
- Assert.assertNotNull(propRankId);
+ PropertyType propRankId = keepTrying("Failed waiting for property type
'rank'",
+ () -> profileService.getPropertyType("rank"),
+ Objects::nonNull,
+ 1000,
+ 100);
/*** Surfers Test ***/
@@ -108,8 +115,11 @@ public class ProfileImportRankingIT extends BaseIT {
//Wait for data to be processed
keepTrying("Failed waiting for ranking import to complete",
()->profileService.findProfilesByPropertyValue("properties.city",
"rankingCity", 0, 50, null), (p)->p.getTotalSize() == 25, 1000, 200);
- List<ImportConfiguration> importConfigurations =
importConfigurationService.getAll();
- Assert.assertEquals(1, importConfigurations.size());
+ List<ImportConfiguration> importConfigurations = keepTrying("Failed
waiting for import configurations list with 1 item",
+ () -> importConfigurationService.getAll(),
+ (list) -> Objects.nonNull(list) && list.size() == 1,
+ 1000,
+ 100);
PartialList<Profile> gregProfileList =
profileService.findProfilesByPropertyValue("properties.uciId", "10004451371",
0, 10, null);
Assert.assertEquals(1, gregProfileList.getList().size());
diff --git
a/itests/src/test/java/org/apache/unomi/itests/ProfileImportSurfersIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileImportSurfersIT.java
index e4bbf69..01ea0fe 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileImportSurfersIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileImportSurfersIT.java
@@ -40,8 +40,8 @@ import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
* Created by amidani on 09/08/2017.
@@ -67,11 +67,13 @@ public class ProfileImportSurfersIT extends BaseIT {
profileService.setPropertyType(propertyType);
- PropertyType propAlive = profileService.getPropertyType("alive");
+ keepTrying("Failed waiting for property type 'alive'",
+ () -> profileService.getPropertyType("alive"),
+ Objects::nonNull,
+ 1000,
+ 100);
- Assert.assertNotNull("Alive property should not be null", propAlive);
-
- propAlive =
RouterUtils.getPropertyTypeById(profileService.getTargetPropertyTypes("profiles"),
"alive");
+ PropertyType propAlive =
RouterUtils.getPropertyTypeById(profileService.getTargetPropertyTypes("profiles"),
"alive");
Assert.assertNotNull("Lookup of alive property through profiles target
has failed !", propAlive);
@@ -107,8 +109,11 @@ public class ProfileImportSurfersIT extends BaseIT {
//Wait for data to be processed
keepTrying("Failed waiting for surfers initial import to complete",
()->profileService.findProfilesByPropertyValue("properties.city",
"surfersCity", 0, 50, null), (p)->p.getTotalSize() == 34, 1000, 100);
- List<ImportConfiguration> importConfigurations =
importConfigurationService.getAll();
- Assert.assertEquals(1, importConfigurations.size());
+ keepTrying("Failed waiting for import configurations list with 1 item",
+ () -> importConfigurationService.getAll(),
+ (list) -> Objects.nonNull(list) && list.size() == 1,
+ 1000,
+ 100);
//Profile not to delete
PartialList<Profile> jordyProfile =
profileService.findProfilesByPropertyValue("properties.email",
"[email protected]", 0, 10, null);
@@ -152,8 +157,11 @@ public class ProfileImportSurfersIT extends BaseIT {
//Wait for data to be processed
keepTrying("Failed waiting for surfers overwrite import to complete",
()->profileService.findProfilesByPropertyValue("properties.city",
"surfersCity", 0, 50, null), (p)->p.getTotalSize() == 36, 1000, 100);
- importConfigurations = importConfigurationService.getAll();
- Assert.assertEquals(1, importConfigurations.size());
+ keepTrying("Failed waiting for import configurations list with 1 item",
+ () -> importConfigurationService.getAll(),
+ (list) -> Objects.nonNull(list) && list.size() == 1,
+ 1000,
+ 100);
//Profile not to delete
PartialList<Profile> aliveProfiles =
profileService.findProfilesByPropertyValue("properties.alive", "true", 0, 50,
null);
@@ -191,8 +199,11 @@ public class ProfileImportSurfersIT extends BaseIT {
//Wait for data to be processed
keepTrying("Failed waiting for surfers delete import to complete",
()->profileService.findProfilesByPropertyValue("properties.city",
"surfersCity", 0, 50, null), (p)->p.getTotalSize() == 0, 1000, 100);
- importConfigurations = importConfigurationService.getAll();
- Assert.assertEquals(1, importConfigurations.size());
+ keepTrying("Failed waiting for import configurations list with 1 item",
+ () -> importConfigurationService.getAll(),
+ (list) -> Objects.nonNull(list) && list.size() == 1,
+ 1000,
+ 100);
PartialList<Profile> jordyProfileDelete =
profileService.findProfilesByPropertyValue("properties.email",
"[email protected]", 0, 10, null);
Assert.assertEquals(0, jordyProfileDelete.getList().size());
diff --git
a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLEventIT.java
b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLEventIT.java
index 1c6363b..ad514e2 100644
--- a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLEventIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLEventIT.java
@@ -19,7 +19,6 @@ package org.apache.unomi.itests.graphql;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.Profile;
-import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.services.DefinitionsService;
import org.apache.unomi.api.services.EventService;
import org.apache.unomi.persistence.spi.PersistenceService;
@@ -58,11 +57,7 @@ public class GraphQLEventIT extends BaseGraphQLIT {
profile = new Profile(profileID);
persistenceService.save(profile);
- Condition condition = new
Condition(definitionsService.getConditionType("matchAllCondition"));
- persistenceService.removeByQuery(condition, Event.class);
- persistenceService.refresh();
- // Wait for refresh to finish
- Thread.sleep(1000);
+ removeItems(Event.class);
}
@@ -78,6 +73,8 @@ public class GraphQLEventIT extends BaseGraphQLIT {
@Test
public void testGetEvent() throws IOException, InterruptedException {
final Event event = createEvent(eventID, profile);
+ refreshPersistence();
+
try (CloseableHttpResponse response =
post("graphql/event/get-event.json")) {
final ResponseContext context =
ResponseContext.parse(response.getEntity());
@@ -93,6 +90,8 @@ public class GraphQLEventIT extends BaseGraphQLIT {
createEvent("event-2", profile);
final Profile profile2 = new Profile("profile-2");
createEvent("event-3", profile2);
+ refreshPersistence();
+
try (CloseableHttpResponse response =
post("graphql/event/find-events.json")) {
final ResponseContext context =
ResponseContext.parse(response.getEntity());
Assert.assertNotNull(context.getValue("data.cdp.findEvents"));
@@ -122,7 +121,6 @@ public class GraphQLEventIT extends BaseGraphQLIT {
private Event createEvent(final String eventID, final Profile profile)
throws InterruptedException {
Event event = new Event(eventID, "profileUpdated", null, profile,
"test", profile, null, new Date());
persistenceService.save(event);
- Thread.sleep(1000);
return event;
}
}
diff --git
a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLProfileIT.java
b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLProfileIT.java
index 21eeca7..867b654 100644
--- a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLProfileIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLProfileIT.java
@@ -20,6 +20,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.unomi.api.Profile;
import org.apache.unomi.api.services.ProfileService;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.ops4j.pax.exam.util.Filter;
@@ -33,6 +34,11 @@ public class GraphQLProfileIT extends BaseGraphQLIT {
@Filter(timeout = 600000)
protected ProfileService profileService;
+ @Before
+ public void setUp() throws InterruptedException {
+ removeItems(Profile.class);
+ }
+
@Test
public void testGetProfile_WithoutCreation() throws IOException {
try (CloseableHttpResponse response =
post("graphql/profile/get-profile-without-creation.json")) {
@@ -73,6 +79,7 @@ public class GraphQLProfileIT extends BaseGraphQLIT {
final Profile profile = new Profile("FindProfiles_ProfileId1");
profile.setProperty("firstName", "FindProfiles_Username1");
profileService.save(profile);
+ refreshPersistence();
try (CloseableHttpResponse response =
post("graphql/profile/find-profiles.json")) {
final ResponseContext context =
ResponseContext.parse(response.getEntity());
@@ -86,6 +93,7 @@ public class GraphQLProfileIT extends BaseGraphQLIT {
final Profile profile = new
Profile("profileId_deleteAllPersonalDataTest");
profile.setProperty("firstName", "FirstName");
profileService.save(profile);
+ refreshPersistence();
try (CloseableHttpResponse response =
post("graphql/profile/delete-all-personal-data.json")) {
final ResponseContext context =
ResponseContext.parse(response.getEntity());
diff --git
a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLWebSocketIT.java
b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLWebSocketIT.java
index 3ee25f0..74a4ad8 100644
---
a/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLWebSocketIT.java
+++
b/itests/src/test/java/org/apache/unomi/itests/graphql/GraphQLWebSocketIT.java
@@ -21,7 +21,6 @@ import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.subscribers.DefaultSubscriber;
-import org.apache.unomi.itests.BasicIT;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -33,6 +32,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -46,6 +47,7 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
@Test
public void testWebSocketConnectionSegment() throws Exception {
WebSocketClient client = new WebSocketClient();
+ Socket socket = new Socket();
try {
LOGGER.info("Starting web socket client...");
client.start();
@@ -53,7 +55,6 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
URI echoUri = new URI(SUBSCRIPTION_ENDPOINT);
ClientUpgradeRequest request = new ClientUpgradeRequest();
- Socket socket = new Socket();
Future<Session> onConnected = client.connect(socket, echoUri,
request);
RemoteEndpoint remote = onConnected.get().getRemote();
@@ -88,13 +89,18 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
private Flowable<String> publisher;
- private ObservableEmitter<String> emitter;
+ private CompletableFuture<ObservableEmitter<String>> emitterFuture;
private CompletableFuture<CloseStatus> closeStatus = new
CompletableFuture<>();
+ private List<Future<String>> messageListeners = new ArrayList<>();
+
public Socket() {
+ // web socket message may come faster than observable callback is
executed
+ emitterFuture = new CompletableFuture<>();
+
publisher = Observable
- .create((ObservableEmitter<String> emitter) ->
this.emitter = emitter)
+ .create((ObservableEmitter<String> emitter) ->
this.emitterFuture.complete(emitter))
.toFlowable(BackpressureStrategy.BUFFER);
}
@@ -105,7 +111,11 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
@Override
public void onWebSocketText(String message) {
- this.emitter.onNext(message);
+ try {
+ this.emitterFuture.get(10, TimeUnit.SECONDS).onNext(message);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not get emitter", e);
+ }
}
public Future<String> waitMessage() {
@@ -128,8 +138,10 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
public void onComplete() {
future.cancel(false);
cancel();
+ messageListeners.remove(future);
}
});
+ messageListeners.add(future);
return future;
}
@@ -139,8 +151,14 @@ public class GraphQLWebSocketIT extends BaseGraphQLIT {
@Override
public void onWebSocketClose(int statusCode, String reason) {
+ LOGGER.info("Web socket close, code: " + statusCode + ", reason: "
+ reason);
super.onWebSocketClose(statusCode, reason);
closeStatus.complete(new CloseStatus(statusCode, reason));
+ cancelListeners();
+ }
+
+ private void cancelListeners() {
+ this.messageListeners.forEach(future -> future.cancel(false));
}
}