This is an automated email from the ASF dual-hosted git repository.
nvollmar pushed a commit to branch fix-tests
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git
The following commit(s) were added to refs/heads/fix-tests by this push:
new ff5a43b wip
ff5a43b is described below
commit ff5a43b312fb40ec7546cb7a0b88a0496648f18c
Author: Nicolas Vollmar <[email protected]>
AuthorDate: Tue Aug 22 13:27:16 2023 +0200
wip
---
.../cassandra/query/EventsByTagSpec.scala | 54 +++++++++++-----------
1 file changed, 26 insertions(+), 28 deletions(-)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
index f781760..8f87937 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
@@ -108,12 +108,12 @@ object EventsByTagSpec {
pekko.persistence.cassandra.events-by-tag {
eventual-consistency-delay = 0s
# will test by requiring a new persistence-id search every 2s
- new-persistence-id-scan-timeout = 500ms
- cleanup-old-persistence-ids = 1s
+ new-persistence-id-scan-timeout = 1s
+ cleanup-old-persistence-ids = 2s
back-track {
// need to be smaller than the cleanup old persistence ids
- period = 800ms
- long-period = 850ms
+ period = 1600ms
+ long-period = 1900ms
}
}
""").withFallback(config)
@@ -1360,8 +1360,8 @@ object EventsByTagDisabledSpec {
class EventsByTagPersistenceIdCleanupSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.persistenceIdCleanupConfig) {
- private val newPersistenceIdScan: FiniteDuration = 500.millis
- private val cleanupPeriod: FiniteDuration = 1.second
+ private val newPersistenceIdScan: FiniteDuration = 1.second
+ private val cleanupPeriod: FiniteDuration = 2.second
private val logFilters = Set("cleanup-old-persistence-ids has been set")
@@ -1373,28 +1373,26 @@ class EventsByTagPersistenceIdCleanupSpec extends
AbstractEventsByTagSpec(Events
"PersistenceId cleanup" must {
"drop state and trigger new persistence id lookup periodically" in {
// this test depends on new persistenceId can not happening at the wrong
time...
- eventually(timeout(Span(30, Seconds))) {
- val t1: LocalDateTime = LocalDateTime.now(ZoneOffset.UTC).minusDays(2)
- val event1 = PersistentRepr(s"cleanup-1", 1, "cleanup")
- writeTaggedEvent(t1, event1, Set("cleanup-tag"), 1, bucketSize)
-
- val query =
- queries.eventsByTag("cleanup-tag",
-
TimeBasedUUID(Uuids.startOf(t1.toInstant(ZoneOffset.UTC).toEpochMilli - 1L)))
- val probe = query.runWith(TestSink.probe[Any])
- probe.request(10)
- probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 1L,
"cleanup-1") => e }
- // wait for persistence id cleanup
- probe.expectNoMessage(cleanupPeriod + 250.millis)
-
- // the metadata for pid cleanup should have been removed meaning the
next event will be delayed
- val event2 = PersistentRepr(s"cleanup-2", 2, "cleanup")
- writeTaggedEvent(event2, Set("cleanup-tag"), 2, bucketSize)
-
- // we don't now when exactly the next persistence id scan will be
- probe.expectNoMessage(newPersistenceIdScan - 150.millis)
- probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 2L,
"cleanup-2") => e }
- }
+ val t1: LocalDateTime = LocalDateTime.now(ZoneOffset.UTC).minusDays(2)
+ val event1 = PersistentRepr(s"cleanup-1", 1, "cleanup")
+ writeTaggedEvent(t1, event1, Set("cleanup-tag"), 1, bucketSize)
+
+ val query =
+ queries.eventsByTag("cleanup-tag",
+
TimeBasedUUID(Uuids.startOf(t1.toInstant(ZoneOffset.UTC).toEpochMilli - 1L)))
+ val probe = query.runWith(TestSink.probe[Any])
+ probe.request(10)
+ probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 1L,
"cleanup-1") => e }
+ // wait for persistence id cleanup
+ probe.expectNoMessage(cleanupPeriod + 250.millis)
+
+ // the metadata for pid cleanup should have been removed meaning the
next event will be delayed
+ val event2 = PersistentRepr(s"cleanup-2", 2, "cleanup")
+ writeTaggedEvent(event2, Set("cleanup-tag"), 2, bucketSize)
+
+ // we don't now when exactly the next persistence id scan will be
+ probe.expectNoMessage(newPersistenceIdScan - 150.millis)
+ probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 2L,
"cleanup-2") => e }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]