This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/pekko-persistence-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new 049e29b pekko 1.3.0 (#334)
049e29b is described below
commit 049e29b3b4372f545525b072a59a6fd62add9f24
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Nov 26 12:07:36 2025 +0100
pekko 1.3.0 (#334)
---
.../cassandra/EventsByTagMultiJvmSpec.scala | 2 +-
.../cassandra/CassandraEventsByTagLoadSpec.scala | 2 +-
.../persistence/cassandra/CassandraSpec.scala | 4 +-
.../cassandra/EventsByTagCrashSpec.scala | 2 +-
.../cassandra/EventsByTagMigrationSpec.scala | 24 +++---
.../cassandra/EventsByTagRecoverySpec.scala | 10 +--
.../cassandra/EventsByTagRestartSpec.scala | 6 +-
.../cassandra/EventsByTagStressSpec.scala | 2 +-
.../cassandra/query/AllPersistenceIdsSpec.scala | 16 ++--
.../query/CassandraQueryJournalOverrideSpec.scala | 6 +-
.../cassandra/query/EventAdaptersReadSpec.scala | 12 +--
.../EventsByPersistenceIdFastForwardSpec.scala | 2 +-
...ventsByPersistenceIdMultiPartitionGapSpec.scala | 10 +--
.../query/EventsByPersistenceIdSpec.scala | 54 ++++++-------
.../EventsByPersistenceIdWithControlSpec.scala | 10 +--
.../cassandra/query/EventsByTagPubsubSpec.scala | 2 +-
.../cassandra/query/EventsByTagSpec.scala | 90 +++++++++++-----------
.../cassandra/query/EventsByTagStageSpec.scala | 38 ++++-----
.../query/javadsl/CassandraReadJournalSpec.scala | 8 +-
.../query/scaladsl/CassandraReadJournalSpec.scala | 8 +-
project/PekkoConnectorsDependency.scala | 2 +-
project/PekkoCoreDependency.scala | 2 +-
project/PekkoManagementDependency.scala | 2 +-
23 files changed, 157 insertions(+), 157 deletions(-)
diff --git
a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
index f1545e6..f29c923 100644
---
a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
+++
b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
@@ -109,7 +109,7 @@ abstract class EventsByTagMultiJvmSpec
queryJournal
.eventsByTag("all", NoOffset)
.map(e => (e.persistenceId, e.event.asInstanceOf[Int]))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
}
}
enterBarrier("query-started")
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
index ee76f67..ddceff2 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
@@ -83,7 +83,7 @@ class CassandraEventsByTagLoadSpec extends
CassandraSpec(CassandraEventsByTagLoa
private def validateTagStream(readJournal: CassandraReadJournal)(tag:
String): Unit = {
system.log.info(s"Validating tag $tag")
- val probe = readJournal.eventsByTag("orange",
NoOffset).toMat(TestSink.probe)(Keep.right).run()
+ val probe = readJournal.eventsByTag("orange",
NoOffset).toMat(TestSink())(Keep.right).run()
var sequenceNrsPerPid = Map[String, Long]()
var allReceived: Map[String, List[Long]] =
Map.empty.withDefaultValue(List.empty)
probe.request(messagesPerPersistenceId * nrPersistenceIds)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraSpec.scala
index d2e5af1..ac715b8 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraSpec.scala
@@ -304,10 +304,10 @@ abstract class CassandraSpec(
.futureValue
def eventsByTag(tag: String): TestSubscriber.Probe[Any] =
- queries.eventsByTag(tag, NoOffset).map(_.event).runWith(TestSink.probe)
+ queries.eventsByTag(tag, NoOffset).map(_.event).runWith(TestSink())
def expectEventsForTag(tag: String, elements: String*): Unit = {
- val probe = queries.eventsByTag(tag,
NoOffset).map(_.event).runWith(TestSink.probe)
+ val probe = queries.eventsByTag(tag,
NoOffset).map(_.event).runWith(TestSink())
probe.request(elements.length + 1)
elements.foreach(probe.expectNext)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagCrashSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagCrashSpec.scala
index b74c6e9..3bfcfed 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagCrashSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagCrashSpec.scala
@@ -43,7 +43,7 @@ class EventsByTagCrashSpec extends
CassandraSpec(EventsByTagRestartSpec.config)
expectMsg(Ack)
}
val blueTags: Source[EventEnvelope, NotUsed] =
queryJournal.eventsByTag(tag = "blue", offset = NoOffset)
- val tagProbe = blueTags.runWith(TestSink.probe[EventEnvelope](system))
+ val tagProbe = blueTags.runWith(TestSink[EventEnvelope]()(system))
(1L to msgs).foreach { m =>
val expected = s"msg $m"
tagProbe.request(1)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigrationSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigrationSpec.scala
index 4870fbe..fbbca56 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigrationSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigrationSpec.scala
@@ -91,7 +91,7 @@ class EventsByTagMigrationProvidePersistenceIds extends
AbstractEventsByTagMigra
migrator.migratePidsToTagViews(List(pidOne)).futureValue shouldEqual Done
val blueSrc = queries.eventsByTag("blue", NoOffset)
- val blueProbe = blueSrc.runWith(TestSink.probe[Any])
+ val blueProbe = blueSrc.runWith(TestSink[Any]())
blueProbe.request(5)
blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 2, "e-2") => }
@@ -101,7 +101,7 @@ class EventsByTagMigrationProvidePersistenceIds extends
AbstractEventsByTagMigra
migrator.migratePidsToTagViews(List(pidTwo)).futureValue shouldEqual Done
val blueSrcTakeTwo = queries.eventsByTag("blue", NoOffset)
- val blueProbeTakeTwo = blueSrcTakeTwo.runWith(TestSink.probe[Any])
+ val blueProbeTakeTwo = blueSrcTakeTwo.runWith(TestSink[Any]())
blueProbeTakeTwo.request(5)
blueProbeTakeTwo.expectNextPF { case EventEnvelope(_, `pidOne`, 1,
"e-1") => }
blueProbeTakeTwo.expectNextPF { case EventEnvelope(_, `pidOne`, 2,
"e-2") => }
@@ -188,7 +188,7 @@ class EventsByTagMigrationSpec extends
AbstractEventsByTagMigrationSpec {
"work with the current implementation" taggedAs RequiresCassandraThree in {
val blueSrc: Source[EventEnvelope, NotUsed] =
queries.eventsByTag("blue", NoOffset)
- val blueProbe = blueSrc.runWith(TestSink.probe[Any])
+ val blueProbe = blueSrc.runWith(TestSink[Any]())
blueProbe.request(5)
blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 2, "e-2") => }
@@ -201,7 +201,7 @@ class EventsByTagMigrationSpec extends
AbstractEventsByTagMigrationSpec {
blueProbe.cancel()
val greenSrc: Source[EventEnvelope, NotUsed] =
queries.eventsByTag("green", NoOffset)
- val greenProbe = greenSrc.runWith(TestSink.probe[Any])
+ val greenProbe = greenSrc.runWith(TestSink[Any]())
greenProbe.request(4)
greenProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
greenProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 4, "e-4") => }
@@ -210,27 +210,27 @@ class EventsByTagMigrationSpec extends
AbstractEventsByTagMigrationSpec {
greenProbe.cancel()
val orangeSrc: Source[EventEnvelope, NotUsed] =
queries.eventsByTag("orange", NoOffset)
- val orangeProbe = orangeSrc.runWith(TestSink.probe[Any])
+ val orangeProbe = orangeSrc.runWith(TestSink[Any]())
orangeProbe.request(3)
orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
orangeProbe.expectNoMessage(waitTime)
orangeProbe.cancel()
val bananaSrc: Source[EventEnvelope, NotUsed] =
queries.eventsByTag("banana", NoOffset)
- val bananaProbe = bananaSrc.runWith(TestSink.probe[Any])
+ val bananaProbe = bananaSrc.runWith(TestSink[Any]())
bananaProbe.request(3)
bananaProbe.expectNoMessage(waitTime)
bananaProbe.cancel()
val redSrc: Source[EventEnvelope, NotUsed] = queries.eventsByTag("red",
NoOffset)
- val redProbe = redSrc.runWith(TestSink.probe[Any])
+ val redProbe = redSrc.runWith(TestSink[Any]())
redProbe.request(3)
redProbe.expectNextPF { case EventEnvelope(_, `pidWithSnapshot`, 10,
"h-1") => }
redProbe.expectNextPF { case EventEnvelope(_, `pidWithSnapshot`, 11,
"h-2") => }
redProbe.cancel()
val excludedSrc: Source[EventEnvelope, NotUsed] =
queries.eventsByTag("bad-tag", NoOffset)
- val excludedProbe = excludedSrc.runWith(TestSink.probe[Any])
+ val excludedProbe = excludedSrc.runWith(TestSink[Any]())
excludedProbe.request(1)
excludedProbe.expectNoMessage(waitTime)
excludedProbe.cancel()
@@ -242,7 +242,7 @@ class EventsByTagMigrationSpec extends
AbstractEventsByTagMigrationSpec {
probe.expectMsg(RecoveryCompleted)
val blueSrc: Source[EventEnvelope, NotUsed] =
queries.eventsByTag("blue", NoOffset)
- val blueProbe = blueSrc.runWith(TestSink.probe[Any])
+ val blueProbe = blueSrc.runWith(TestSink[Any]())
blueProbe.request(6)
blueProbe.expectNextN(5) // ignore the ones we've already validated
// This event wasn't migrated, should have been fixed on actor start up
@@ -251,7 +251,7 @@ class EventsByTagMigrationSpec extends
AbstractEventsByTagMigrationSpec {
blueProbe.cancel()
val greenSrc: Source[EventEnvelope, NotUsed] =
queries.eventsByTag("green", NoOffset)
- val greenProbe = greenSrc.runWith(TestSink.probe[Any])
+ val greenProbe = greenSrc.runWith(TestSink[Any]())
greenProbe.request(6)
greenProbe.expectNextN(3) // ignore the ones we've already validated
// This event wasn't migrated, should have been fixed on actor start up
@@ -284,7 +284,7 @@ class EventsByTagMigrationSpec extends
AbstractEventsByTagMigrationSpec {
expectMsg(Ack)
val blueSrc: Source[EventEnvelope, NotUsed] =
queriesTwo.eventsByTag("blue", NoOffset)
- val blueProbe =
blueSrc.runWith(TestSink.probe[Any])(SystemMaterializer(systemTwo).materializer)
+ val blueProbe =
blueSrc.runWith(TestSink[Any]())(SystemMaterializer(systemTwo).materializer)
blueProbe.request(10)
blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 2, "e-2") => }
@@ -317,7 +317,7 @@ class EventsByTagMigrationSpec extends
AbstractEventsByTagMigrationSpec {
expectMsg(Ack)
val orangeSrc: Source[EventEnvelope, NotUsed] =
queriesThree.eventsByTag("orange", NoOffset)
- val orangeProbe =
orangeSrc.runWith(TestSink.probe[Any])(SystemMaterializer(systemThree).materializer)
+ val orangeProbe =
orangeSrc.runWith(TestSink[Any]())(SystemMaterializer(systemThree).materializer)
orangeProbe.request(3)
orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
orangeProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 5,
"new-event-1") => }
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRecoverySpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRecoverySpec.scala
index 9a724e2..99cab69 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRecoverySpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRecoverySpec.scala
@@ -69,7 +69,7 @@ class EventsByTagRecoverySpec extends
CassandraSpec(EventsByTagRecoverySpec.conf
p1take2 ! PoisonPill
val greenTags = queryJournal.eventsByTag(tag = "blue", offset =
NoOffset)
- val probe = greenTags.runWith(TestSink.probe[Any](system))
+ val probe = greenTags.runWith(TestSink[Any]()(system))
probe.request(9)
(1 to 8).foreach { i =>
val event = s"e-$i"
@@ -87,7 +87,7 @@ class EventsByTagRecoverySpec extends
CassandraSpec(EventsByTagRecoverySpec.conf
}
val greenTagsTake2 = queryJournal.eventsByTag(tag = "blue", offset =
NoOffset)
- val probeTake2 = greenTagsTake2.runWith(TestSink.probe[Any](system))
+ val probeTake2 = greenTagsTake2.runWith(TestSink[Any]()(system))
probeTake2.request(13)
(1 to 12).foreach { i =>
val event = s"e-$i"
@@ -125,7 +125,7 @@ class EventsByTagRecoverySpec extends
CassandraSpec(EventsByTagRecoverySpec.conf
val queryJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val greenTags = queryJournal.eventsByTag(tag = "red", offset =
NoOffset)
- val probe = greenTags.runWith(TestSink.probe[Any](system))
+ val probe = greenTags.runWith(TestSink[Any]()(system))
probe.request(9)
(1 to 8).foreach { i =>
val event = s"e-$i"
@@ -171,7 +171,7 @@ class EventsByTagRecoverySpec extends
CassandraSpec(EventsByTagRecoverySpec.conf
val queryJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val greenTags = queryJournal.eventsByTag(tag = "red", offset =
NoOffset)
- val probe = greenTags.runWith(TestSink.probe[Any](system))
+ val probe = greenTags.runWith(TestSink[Any]()(system))
probe.request(13)
(1 to 12).foreach { i =>
val event = s"e-$i"
@@ -209,7 +209,7 @@ class EventsByTagRecoverySpec extends
CassandraSpec(EventsByTagRecoverySpec.conf
val queryJournal =
PersistenceQuery(systemTwo).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val blueTags = queryJournal.eventsByTag(tag = "blue", offset =
NoOffset)
- val probe = blueTags.runWith(TestSink.probe[Any](systemTwo))
+ val probe = blueTags.runWith(TestSink[Any]()(systemTwo))
probe.request(6)
(1 to 5).foreach { i =>
val event = s"e-$i"
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRestartSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRestartSpec.scala
index cc3fcf5..3f4eb63 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRestartSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRestartSpec.scala
@@ -74,7 +74,7 @@ class EventsByTagRestartSpec extends
CassandraSpec(EventsByTagRestartSpec.config
}
val blueTags = queryJournal.eventsByTag(tag, offset = NoOffset)
- val tagProbe = blueTags.runWith(TestSink.probe[Any](system))
+ val tagProbe = blueTags.runWith(TestSink[Any]()(system))
(0 until restarts).foreach { restart =>
tagProbe.request(messagesPerRestart + 1)
(1 to messagesPerRestart).foreach { i =>
@@ -109,7 +109,7 @@ class EventsByTagRestartSpec extends
CassandraSpec(EventsByTagRestartSpec.config
deathProbe.expectTerminated(p2)
val greenTags = queryJournal.eventsByTag(tag, offset = NoOffset)
- val tagProbe = greenTags.runWith(TestSink.probe[Any](system))
+ val tagProbe = greenTags.runWith(TestSink[Any]()(system))
tagProbe.request(10)
(1 to 3).foreach { n =>
val event = s"e$n"
@@ -161,7 +161,7 @@ class EventsByTagRestartSpec extends
CassandraSpec(EventsByTagRestartSpec.config
probe2.expectMsg(Ack)
val greenTags = queryJournal.eventsByTag(tag, offset = NoOffset)
- val tagProbe = greenTags.runWith(TestSink.probe[Any](system))
+ val tagProbe = greenTags.runWith(TestSink[Any]()(system))
tagProbe.request(10)
// without the fix this would not complete because e4 will have tagSeqNr
1 rather than the expected 4
(1 to 6).foreach { n =>
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagStressSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagStressSpec.scala
index 5349aa8..08c4c86 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagStressSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagStressSpec.scala
@@ -50,7 +50,7 @@ class EventsByTagStressSpec extends CassandraSpec(s"""
.map(i => {
(i.persistenceId, i.event.asInstanceOf[Int])
})
- .runWith(TestSink.probe)
+ .runWith(TestSink())
(i, probe)
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsSpec.scala
index 4a8c7dc..fa5e0e7 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsSpec.scala
@@ -77,14 +77,14 @@ class AllPersistenceIdsSpec extends
CassandraSpec(AllPersistenceIdsSpec.config)
setup("c", 1)
val src = current()
- src.runWith(TestSink.probe[Any]).request(4).expectNextUnordered("a",
"b", "c").expectComplete()
+ src.runWith(TestSink[Any]()).request(4).expectNextUnordered("a", "b",
"c").expectComplete()
}
"deliver persistenceId only once if there are multiple events spanning
partitions" in {
setup("d", 100)
val src = current()
-
src.runWith(TestSink.probe[Any]).request(10).expectNext("d").expectComplete()
+ src.runWith(TestSink[Any]()).request(10).expectNext("d").expectComplete()
}
"find existing persistence ids in batches if there is more of them than
max-result-size-query" in {
@@ -93,7 +93,7 @@ class AllPersistenceIdsSpec extends
CassandraSpec(AllPersistenceIdsSpec.config)
}
val src = current()
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(1000)
for (_ <- 1 to 1000) {
@@ -110,7 +110,7 @@ class AllPersistenceIdsSpec extends
CassandraSpec(AllPersistenceIdsSpec.config)
setup("f", 1)
val src = all()
- val probe =
src.runWith(TestSink.probe[Any]).request(5).expectNextUnordered("e", "f")
+ val probe =
src.runWith(TestSink[Any]()).request(5).expectNextUnordered("e", "f")
setup("g", 1)
@@ -121,7 +121,7 @@ class AllPersistenceIdsSpec extends
CassandraSpec(AllPersistenceIdsSpec.config)
setup("h", 1)
setup("i", 1)
val src = all()
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(1)
probe.expectNext()
@@ -142,7 +142,7 @@ class AllPersistenceIdsSpec extends
CassandraSpec(AllPersistenceIdsSpec.config)
setup("o", 1)
val src = all()
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(2)
probe.expectNext()
probe.expectNext()
@@ -158,7 +158,7 @@ class AllPersistenceIdsSpec extends
CassandraSpec(AllPersistenceIdsSpec.config)
setup("p", 1000)
val src = all()
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(10).expectNext("p").expectNoMessage(1000.millis)
@@ -175,7 +175,7 @@ class AllPersistenceIdsSpec extends
CassandraSpec(AllPersistenceIdsSpec.config)
setup("c2", 1)
val src =
queries.currentPersistenceIdsFromMessages().filterNot(_.startsWith("persistenceInit"))
- src.runWith(TestSink.probe[Any]).request(4).expectNextUnordered("a2",
"b2", "c2").expectComplete()
+ src.runWith(TestSink[Any]()).request(4).expectNextUnordered("a2", "b2",
"c2").expectComplete()
}
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/CassandraQueryJournalOverrideSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/CassandraQueryJournalOverrideSpec.scala
index 4808542..02888a0 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/CassandraQueryJournalOverrideSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/CassandraQueryJournalOverrideSpec.scala
@@ -59,20 +59,20 @@ class CassandraQueryJournalOverrideSpec extends
CassandraSpec(CassandraQueryJour
expectMsg(Ack)
val currentEvents = journal.currentEventsByPersistenceId(pid, 0,
Long.MaxValue)
- val currentProbe =
currentEvents.map(_.event.toString).runWith(TestSink.probe[String])
+ val currentProbe =
currentEvents.map(_.event.toString).runWith(TestSink[String]())
currentProbe.request(2)
currentProbe.expectNext("cat")
currentProbe.expectComplete()
val liveEvents = journal.eventsByPersistenceId(pid, 0, Long.MaxValue)
- val liveProbe =
liveEvents.map(_.event.toString).runWith(TestSink.probe[String])
+ val liveProbe =
liveEvents.map(_.event.toString).runWith(TestSink[String]())
liveProbe.request(2)
liveProbe.expectNext("cat")
liveProbe.expectNoMessage(100.millis)
liveProbe.cancel()
val internalEvents = journal.eventsByPersistenceIdWithControl(pid, 0,
Long.MaxValue, None)
- val internalProbe =
internalEvents.map(_.event.toString).runWith(TestSink.probe[String])
+ val internalProbe =
internalEvents.map(_.event.toString).runWith(TestSink[String]())
internalProbe.request(2)
internalProbe.expectNext("cat")
liveProbe.expectNoMessage(100.millis)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventAdaptersReadSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventAdaptersReadSpec.scala
index c468c62..bde3307 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventAdaptersReadSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventAdaptersReadSpec.scala
@@ -78,7 +78,7 @@ class EventAdaptersReadSpec extends
CassandraSpec(EventAdaptersReadSpec.config)
val src = queries.currentEventsByPersistenceId("a", 0L, Long.MaxValue)
src
.map(_.event)
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(2)
.expectNext("a-1", "a-3")
.expectNoMessage(500.millis)
@@ -96,7 +96,7 @@ class EventAdaptersReadSpec extends
CassandraSpec(EventAdaptersReadSpec.config)
})
val src = queries.currentEventsByPersistenceId("b", 0L, Long.MaxValue)
-
src.map(_.event).runWith(TestSink.probe[Any]).request(10).expectNext("b-1",
"b-2", "b-2", "b-3").expectComplete()
+ src.map(_.event).runWith(TestSink[Any]()).request(10).expectNext("b-1",
"b-2", "b-2", "b-3").expectComplete()
}
"duplicate events with prefix added by the event-adapter" in {
@@ -104,7 +104,7 @@ class EventAdaptersReadSpec extends
CassandraSpec(EventAdaptersReadSpec.config)
setup("c", 1, _ => "prefixed:foo:")
val src = queries.currentEventsByPersistenceId("c", 0L, Long.MaxValue)
-
src.map(_.event).runWith(TestSink.probe[Any]).request(10).expectNext("foo-c-1").expectComplete()
+
src.map(_.event).runWith(TestSink[Any]()).request(10).expectNext("foo-c-1").expectComplete()
}
}
@@ -119,7 +119,7 @@ class EventAdaptersReadSpec extends
CassandraSpec(EventAdaptersReadSpec.config)
})
val src = queries.eventsByTag("red", NoOffset)
- val sub = src.map(_.event).runWith(TestSink.probe[Any])
+ val sub = src.map(_.event).runWith(TestSink[Any]())
sub.request(10)
sub.expectNext("d-1", "d-3", "d-5")
sub.expectNoMessage(waitTime)
@@ -135,7 +135,7 @@ class EventAdaptersReadSpec extends
CassandraSpec(EventAdaptersReadSpec.config)
})
val src = queries.eventsByTag("yellow", NoOffset)
- val sub = src.map(_.event).runWith(TestSink.probe[Any])
+ val sub = src.map(_.event).runWith(TestSink[Any]())
sub.request(10).expectNext("e-1", "e-2", "e-2",
"e-3").expectNoMessage(waitTime)
sub.cancel()
@@ -150,7 +150,7 @@ class EventAdaptersReadSpec extends
CassandraSpec(EventAdaptersReadSpec.config)
})
val src = queries.eventsByTag("green", NoOffset)
- val sub = src.map(_.event).runWith(TestSink.probe[Any])
+ val sub = src.map(_.event).runWith(TestSink[Any]())
sub.request(10).expectNext("e-1", "foo-e-2",
"e-3").expectNoMessage(waitTime)
sub.cancel()
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala
index 5e6403f..83efead 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala
@@ -47,7 +47,7 @@ class EventsByPersistenceIdFastForwardSpec
writeTestEvent(evt1)
val src = queries.eventsByPersistenceIdWithControl("f", 0L, Long.MaxValue)
- val (futureControl, probe) =
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+ val (futureControl, probe) =
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
val control = futureControl.futureValue
probe.request(5)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala
index 3a8b5ce..4fa46ce 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala
@@ -58,7 +58,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
writeTestEvent(pr5, partitionNr = 4L)
val src = queries.currentEventsByPersistenceId("mpg1", 0L, Long.MaxValue)
- val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+ val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
probe.expectNext("e1")
probe.expectNext("e5")
@@ -84,7 +84,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
writeTestEvent(pr5, partitionNr = 4L)
val src = queries.currentEventsByPersistenceId("mpg2", 0L, Long.MaxValue)
- val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+ val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
probe.expectNext("e1")
probe.expectNext("e2")
@@ -111,7 +111,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
writeTestEvent(pr5, partitionNr = 4L)
val src = queries.currentEventsByPersistenceId("mpg3", 0L, Long.MaxValue)
- val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+ val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
probe.expectNext("e1")
probe.expectNext("e2")
@@ -154,7 +154,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
writeTestEvent(pr9, partitionNr = 6L)
val src = queries.currentEventsByPersistenceId("mpg4", 0L, Long.MaxValue)
- val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+ val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
probe.expectNext("e1")
probe.expectNext("e2")
@@ -183,7 +183,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
writeTestEvent(pr4, partitionNr = 3L)
val src = queries.currentEventsByPersistenceId("mpg5", 0L, Long.MaxValue)
- val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+ val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
probe.expectNext("e3")
probe.expectNext("e4")
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
index 376f3eb..332ebbc 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
@@ -59,7 +59,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val src = queries.currentEventsByPersistenceId("a", 0L, Long.MaxValue)
src
.map(_.event)
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(2)
.expectNext("a-1", "a-2")
.expectNoMessage(noMsgTimeout)
@@ -72,19 +72,19 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
setup("b", 10)
val src = queries.currentEventsByPersistenceId("b", 5L, Long.MaxValue)
-
src.map(_.sequenceNr).runWith(TestSink.probe[Any]).request(7).expectNext(5, 6,
7, 8, 9, 10).expectComplete()
+ src.map(_.sequenceNr).runWith(TestSink[Any]()).request(7).expectNext(5,
6, 7, 8, 9, 10).expectComplete()
}
"not see any events if the stream starts after current latest event" in {
setup("c", 3)
val src = queries.currentEventsByPersistenceId("c", 5L, Long.MaxValue)
- src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectComplete()
+ src.map(_.event).runWith(TestSink[Any]()).request(5).expectComplete()
}
"find existing events up to a sequence number" in {
setup("d", 3)
val src = queries.currentEventsByPersistenceId("d", 0L, 2L)
-
src.map(_.sequenceNr).runWith(TestSink.probe[Any]).request(5).expectNext(1,
2).expectComplete()
+ src.map(_.sequenceNr).runWith(TestSink[Any]()).request(5).expectNext(1,
2).expectComplete()
}
"not see new events after demand request" in {
@@ -92,7 +92,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val src = queries.currentEventsByPersistenceId("e", 0L, Long.MaxValue)
val probe =
-
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("e-1",
"e-2").expectNoMessage(noMsgTimeout)
+ src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("e-1",
"e-2").expectNoMessage(noMsgTimeout)
ref ! "e-4"
expectMsg("e-4-done")
@@ -105,7 +105,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val src = queries.currentEventsByPersistenceId("f", 0L, Long.MaxValue)
val probe =
-
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("f-1",
"f-2").expectNoMessage(noMsgTimeout)
+ src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("f-1",
"f-2").expectNoMessage(noMsgTimeout)
probe
.expectNoMessage(noMsgTimeout)
@@ -119,7 +119,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
"stop if there are no events" in {
val src = queries.currentEventsByPersistenceId("g", 0L, Long.MaxValue)
- src.runWith(TestSink.probe[Any]).request(2).expectComplete()
+ src.runWith(TestSink[Any]()).request(2).expectComplete()
}
"produce correct sequence of sequence numbers" in {
@@ -128,7 +128,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val src = queries.currentEventsByPersistenceId("h", 0L, Long.MaxValue)
src
.map(x => (x.persistenceId, x.sequenceNr))
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(4)
.expectNext(("h", 1), ("h", 2), ("h", 3))
.expectComplete()
@@ -152,7 +152,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val src = queries.currentEventsByPersistenceId("i", 0L, Long.MaxValue)
src
.map(_.event)
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(10)
.expectNextN((1 to 10).map(i => s"i-$i"))
.expectNoMessage(noMsgTimeout)
@@ -170,7 +170,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
.eventsByPersistenceId("jo", 0L, Long.MaxValue)
.viaMat(KillSwitches.single)(Keep.right)
.map(x => (x.event, x.sequenceNr, x.offset))
- .toMat(TestSink.probe[Any])(Keep.both)
+ .toMat(TestSink[Any]())(Keep.both)
.run()
probe.request(5)
@@ -234,7 +234,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val src = queries.currentEventsByPersistenceId("i2", 0L, Long.MaxValue)
src
.map(_.event)
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(100)
.expectNextN((1 to 15).map(i => s"i2-$i"))
.expectComplete()
@@ -257,7 +257,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val src = queries.currentEventsByPersistenceId("i3", 0L, Long.MaxValue)
src
.map(_.event)
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(10)
.expectNextN((1 to 10).map(i => s"i3-$i"))
.expectNoMessage(noMsgTimeout)
@@ -273,7 +273,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
ref ! TestActor.DeleteTo(48)
expectMsg(DeleteMessagesSuccess(48))
val src = queries.currentEventsByPersistenceId("i4", 0, Long.MaxValue)
-
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("i4-49",
"i4-50").expectComplete()
+ src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("i4-49",
"i4-50").expectComplete()
}
"detect gaps" in {
@@ -286,7 +286,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
writeTestEvent(pr4)
val src = queries.currentEventsByPersistenceId("i5", 0L, Long.MaxValue)
- val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+ val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
// timeout dictated by events-by-persistence-id-gap-timeout above
probe.within(7.seconds) {
@@ -309,7 +309,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val pr1 = PersistentRepr("e1", 1L, "with-meta", "").withMetadata(meta)
writeTestEvent(pr1)
val src = queries.currentEventsByPersistenceId("with-meta", 0L,
Long.MaxValue)
-
src.map(_.eventMetadata).runWith(TestSink.probe[Any]).request(2).expectNext(Some(meta)).expectComplete()
+
src.map(_.eventMetadata).runWith(TestSink[Any]()).request(2).expectNext(Some(meta)).expectComplete()
}
}
@@ -318,7 +318,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
"find new events" in {
val ref = setup("j", 3)
val src = queries.eventsByPersistenceId("j", 0L, Long.MaxValue)
- val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("j-1",
"j-2", "j-3")
+ val probe =
src.map(_.event).runWith(TestSink[Any]()).request(5).expectNext("j-1", "j-2",
"j-3")
ref ! "j-4"
expectMsg("j-4-done")
@@ -330,7 +330,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
"find new events if the stream starts after current latest event" in {
val ref = setup("k", 4)
val src = queries.eventsByPersistenceId("k", 5L, Long.MaxValue)
- val probe =
src.map(_.sequenceNr).runWith(TestSink.probe[Any]).request(5).expectNoMessage(noMsgTimeout)
+ val probe =
src.map(_.sequenceNr).runWith(TestSink[Any]()).request(5).expectNoMessage(noMsgTimeout)
ref ! "k-5"
expectMsg("k-5-done")
@@ -349,7 +349,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
"find new events up to a sequence number" in {
val ref = setup("l", 3)
val src = queries.eventsByPersistenceId("l", 0L, 4L)
- val probe =
src.map(_.sequenceNr).runWith(TestSink.probe[Any]).request(5).expectNext(1, 2,
3)
+ val probe =
src.map(_.sequenceNr).runWith(TestSink[Any]()).request(5).expectNext(1, 2, 3)
ref ! "l-4"
expectMsg("l-4-done")
@@ -361,7 +361,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val ref = setup("m", 3)
val src = queries.eventsByPersistenceId("m", 0L, Long.MaxValue)
val probe =
-
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("m-1",
"m-2").expectNoMessage(noMsgTimeout)
+ src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("m-1",
"m-2").expectNoMessage(noMsgTimeout)
ref ! "m-4"
expectMsg("m-4-done")
@@ -376,7 +376,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val src = queries.eventsByPersistenceId("n", 0L, Long.MaxValue)
val probe =
-
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("n-1",
"n-2").expectNoMessage(noMsgTimeout)
+ src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("n-1",
"n-2").expectNoMessage(noMsgTimeout)
probe
.expectNoMessage(noMsgTimeout)
@@ -393,7 +393,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
setup("o2", 1) // Database init.
val src = queries.eventsByPersistenceId("o", 0L, Long.MaxValue)
- val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(10).expectNoMessage(noMsgTimeout)
+ val probe =
src.map(_.event).runWith(TestSink[Any]()).request(10).expectNoMessage(noMsgTimeout)
probe.cancel()
}
@@ -402,7 +402,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
setup("p2", 1) // Database init.
val src = queries.eventsByPersistenceId("p", 0L, Long.MaxValue)
- val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNoMessage(noMsgTimeout)
+ val probe =
src.map(_.event).runWith(TestSink[Any]()).request(2).expectNoMessage(noMsgTimeout)
setup("p", 2)
@@ -418,7 +418,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val probe = src
.map(_.event)
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(16)
.expectNextN((1 to 15).map(i => s"q-$i"))
.expectNoMessage(noMsgTimeout)
@@ -457,7 +457,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
val src = queries.eventsByPersistenceId("q2", 0L, Long.MaxValue)
val probe = src
.map(_.event)
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(10)
.expectNextN((1 to 10).map(i => s"q2-$i"))
.expectNoMessage(noMsgTimeout)
@@ -494,7 +494,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
writeTestEvent(pr4)
val src = queries.currentEventsByPersistenceId("gap1", 0L, Long.MaxValue)
- val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+ val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
probe.expectNext("e1")
probe.expectNext("e2")
@@ -514,7 +514,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
writeTestEvent(pr4)
val src = queries.currentEventsByPersistenceId("gap2", 0L, Long.MaxValue)
- val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+ val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
probe.expectNext("e1")
probe.expectNext("e2")
@@ -529,7 +529,7 @@ class EventsByPersistenceIdSpec extends
CassandraSpec(EventsByPersistenceIdSpec.
"not complete when empty" in {
val src = queries.eventsByPersistenceId("r", 0L, Long.MaxValue)
- val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5)
+ val probe = src.map(_.event).runWith(TestSink[Any]()).request(5)
probe.expectNoMessage(100.millis)
probe.cancel()
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala
index e4e2dfb..292c77c 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala
@@ -63,7 +63,7 @@ class EventsByPersistenceIdWithControlSpec extends
CassandraSpec(EventsByPersist
val src = queries.eventsByPersistenceIdWithControl("a", 0L,
Long.MaxValue)
- val (futureControl, probe) =
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+ val (futureControl, probe) =
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
val control = futureControl.futureValue
control.poll(3)
@@ -82,7 +82,7 @@ class EventsByPersistenceIdWithControlSpec extends
CassandraSpec(EventsByPersist
val ref = setup("b", 8)
val src = queries.eventsByPersistenceIdWithControl("b", 0L,
Long.MaxValue)
- val (futureControl, probe) =
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+ val (futureControl, probe) =
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
val control = futureControl.futureValue
control.poll(8)
@@ -103,7 +103,7 @@ class EventsByPersistenceIdWithControlSpec extends
CassandraSpec(EventsByPersist
val ref = setup("c", 2)
val src = queries.eventsByPersistenceIdWithControl("c", 0L,
Long.MaxValue)
- val (futureControl, probe) =
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+ val (futureControl, probe) =
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
val control = futureControl.futureValue
control.poll(2)
@@ -131,7 +131,7 @@ class EventsByPersistenceIdWithControlSpec extends
CassandraSpec(EventsByPersist
val ref = setup("d", 12)
val src = queries.eventsByPersistenceIdWithControl("d", 0L,
Long.MaxValue)
- val (futureControl, probe) =
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+ val (futureControl, probe) =
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
val control = futureControl.futureValue
control.poll(12)
@@ -156,7 +156,7 @@ class EventsByPersistenceIdWithControlSpec extends
CassandraSpec(EventsByPersist
setup("e", 35)
val src = queries.eventsByPersistenceIdWithControl("e", 0L,
Long.MaxValue)
- val (futureControl, probe) =
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+ val (futureControl, probe) =
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
val control = futureControl.futureValue
control.poll(35)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagPubsubSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagPubsubSpec.scala
index 8b0730b..46a6652 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagPubsubSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagPubsubSpec.scala
@@ -62,7 +62,7 @@ class EventsByTagPubsubSpec extends
CassandraSpec(EventsByTagPubsubSpec.config)
val actor = system.actorOf(TestActor.props("EventsByTagPubsubSpec_a"))
val blackSrc = queries.eventsByTag(tag = "black", offset = NoOffset)
- val probe = blackSrc.runWith(TestSink.probe[Any])
+ val probe = blackSrc.runWith(TestSink[Any]())
probe.request(2)
probe.expectNoMessage(300.millis)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
index e9ef11b..ddda87f 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
@@ -209,7 +209,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
expectMsg(s"a green leaf-done")
val greenSrc = queries.currentEventsByTag(tag = "green", offset =
NoOffset)
- val probe = greenSrc.runWith(TestSink.probe[Any])
+ val probe = greenSrc.runWith(TestSink[Any]())
probe.request(2)
probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple")
=> e }
probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green
banana") => e }
@@ -219,13 +219,13 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
probe.expectComplete()
val blackSrc = queries.currentEventsByTag(tag = "black", offset =
NoOffset)
- val probe2 = blackSrc.runWith(TestSink.probe[Any])
+ val probe2 = blackSrc.runWith(TestSink[Any]())
probe2.request(5)
probe2.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black car")
=> e }
probe2.expectComplete()
val appleSrc = queries.currentEventsByTag(tag = "apple", offset =
NoOffset)
- val probe3 = appleSrc.runWith(TestSink.probe[Any])
+ val probe3 = appleSrc.runWith(TestSink[Any]())
probe3.request(5)
probe3.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green
apple") => e }
probe3.expectComplete()
@@ -233,7 +233,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
"complete when no events" in {
val src = queries.currentEventsByTag(tag = "pink", offset = NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(2)
probe.expectComplete()
}
@@ -242,7 +242,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
val c = system.actorOf(TestActor.props("c"))
val greenSrc = queries.currentEventsByTag(tag = "green", offset =
NoOffset)
- val probe = greenSrc.runWith(TestSink.probe[Any])
+ val probe = greenSrc.runWith(TestSink[Any]())
probe.request(2)
probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple")
=> e }
probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green
banana") => e }
@@ -259,7 +259,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
"find events from timestamp offset" in {
val greenSrc1 = queries.currentEventsByTag(tag = "green", offset =
NoOffset)
- val probe1 = greenSrc1.runWith(TestSink.probe[Any])
+ val probe1 = greenSrc1.runWith(TestSink[Any]())
probe1.request(2)
val appleOffs = probe1
.expectNextPF {
@@ -280,7 +280,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
appleTimestamp should be <= bananaTimestamp
val greenSrc2 = queries.currentEventsByTag(tag = "green",
queries.timeBasedUUIDFrom(bananaTimestamp))
- val probe2 = greenSrc2.runWith(TestSink.probe[Any])
+ val probe2 = greenSrc2.runWith(TestSink[Any]())
probe2.request(10)
if (appleTimestamp == bananaTimestamp)
probe2.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green
apple") => e }
@@ -291,14 +291,14 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
"find events from UUID offset" in {
val greenSrc1 = queries.currentEventsByTag(tag = "green", offset =
NoOffset)
- val probe1 = greenSrc1.runWith(TestSink.probe[Any])
+ val probe1 = greenSrc1.runWith(TestSink[Any]())
probe1.request(2)
probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green
apple") => e }
val offs = probe1.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a
green banana") => e }.offset
probe1.cancel()
val greenSrc2 = queries.currentEventsByTag(tag = "green", offs)
- val probe2 = greenSrc2.runWith(TestSink.probe[Any])
+ val probe2 = greenSrc2.runWith(TestSink[Any]())
probe2.request(10)
probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf")
=> e }
probe2.cancel()
@@ -307,7 +307,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
"include timestamp in EventEnvelope" in {
val currentTime = System.currentTimeMillis()
val greenSrc1 = queries.currentEventsByTag(tag = "green", offset =
NoOffset)
- val probe1 = greenSrc1.runWith(TestSink.probe[EventEnvelope])
+ val probe1 = greenSrc1.runWith(TestSink[EventEnvelope]())
probe1.request(2)
val env1 = probe1.expectNext()
@@ -341,7 +341,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
writeTaggedEvent(t4, pr4, Set("T1-current"), 4, bucketSize)
val src = queries.currentEventsByTag(tag = "T1-current", offset =
NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(2)
probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
@@ -360,7 +360,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
"find new events" in {
val d = system.actorOf(TestActor.props("d"))
- withProbe(queries.eventsByTag(tag = "black", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "black", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(2)
probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black
car") => e }
@@ -381,7 +381,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
"find events from timestamp offset" in {
withProbe(
- queries.eventsByTag(tag = "green", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ queries.eventsByTag(tag = "green", offset =
NoOffset).runWith(TestSink[Any]()),
probe1 => {
probe1.request(2)
val appleOffs = probe1
@@ -401,7 +401,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
bananaTimestamp should be <= bananaTimestamp
withProbe(
- queries.eventsByTag(tag = "green",
queries.timeBasedUUIDFrom(bananaTimestamp)).runWith(TestSink.probe[Any]),
+ queries.eventsByTag(tag = "green",
queries.timeBasedUUIDFrom(bananaTimestamp)).runWith(TestSink[Any]()),
probe2 => {
probe2.request(10)
if (appleTimestamp == bananaTimestamp)
@@ -416,7 +416,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
}
"find events from UUID offset " in {
- withProbe(queries.eventsByTag(tag = "green", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "green", offset =
NoOffset).runWith(TestSink[Any]()),
probe1 => {
probe1.request(2)
probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green
apple") => e }
@@ -424,7 +424,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
probe1.cancel()
val greenSrc2 = queries.eventsByTag(tag = "green", offs)
- val probe2 = greenSrc2.runWith(TestSink.probe[Any])
+ val probe2 = greenSrc2.runWith(TestSink[Any]())
probe2.request(10)
probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green
leaf") => e }
probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green
cucumber") => e }
@@ -435,7 +435,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
"include timestamp in EventEnvelope" in {
val currentTime = System.currentTimeMillis()
val greenSrc1 = queries.eventsByTag(tag = "green", offset = NoOffset)
- val probe1 = greenSrc1.runWith(TestSink.probe[EventEnvelope])
+ val probe1 = greenSrc1.runWith(TestSink[EventEnvelope]())
probe1.request(2)
val env1 = probe1.expectNext()
@@ -462,7 +462,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
val pr2 = PersistentRepr("e2", 2L, "p1", "", writerUuid = w1)
writeTaggedEvent(t2, pr2, Set("T1-live"), 2, bucketSize)
- withProbe(queries.eventsByTag(tag = "T1-live", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T1-live", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(10)
probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
@@ -490,7 +490,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
val pr3 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
writeTaggedEvent(t3, pr3, Set("T2"), 2, bucketSize)
- withProbe(queries.eventsByTag(tag = "T2", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T2", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(10)
@@ -511,7 +511,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
"stream many events" in {
val e = system.actorOf(TestActor.props("e"))
withProbe(
- queries.eventsByTag(tag = "yellow", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ queries.eventsByTag(tag = "yellow", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
for (n <- 1 to 100)
@@ -541,7 +541,7 @@ class EventsByTagSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.config) {
}
"not complete for empty query" in {
- val probe = queries.eventsByTag(tag = "empty", offset =
NoOffset).runWith(TestSink.probe[Any])
+ val probe = queries.eventsByTag(tag = "empty", offset =
NoOffset).runWith(TestSink[Any]())
probe.request(2)
probe.expectNoMessage(waitTime)
probe.cancel()
@@ -572,7 +572,7 @@ class EventsByTagZeroEventualConsistencyDelaySpec
expectMsg(s"a green leaf-done")
val greenSrc = queries.currentEventsByTag(tag = "green", offset =
NoOffset)
- val probe = greenSrc.runWith(TestSink.probe[Any])
+ val probe = greenSrc.runWith(TestSink[Any]())
probe.request(NumberOfBananas + 10L)
probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "a green apple")
=> e }
(1 to NumberOfBananas).foreach { n =>
@@ -619,7 +619,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal
= 100ms
val p2e1 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
writeTaggedEvent(t2, p2e1, Set("T6"), 1, bucketSize)
- withProbe(queries.eventsByTag(tag = "T6", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T6", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(10)
probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") =>
e }
@@ -646,7 +646,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal
= 100ms
val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
writeTaggedEvent(t2, eventA1, Set("T7"), 1, bucketSize)
- withProbe(queries.eventsByTag(tag = "T7", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T7", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(10)
probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
@@ -676,7 +676,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal
= 100ms
val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
writeTaggedEvent(t2, eventA1, Set("T8"), 1, bucketSize)
- withProbe(queries.eventsByTag(tag = "T8", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T8", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(10)
probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B0") => e }
@@ -702,13 +702,13 @@
pekko.persistence.cassandra.events-by-tag.refresh-internal = 100ms
val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
writeTaggedEvent(t1.plusSeconds(2), eventA1, Set("T9"), 1, bucketSize)
- withProbe(queries.eventsByTag(tag = "T9", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T9", offset =
NoOffset).runWith(TestSink[Any]()),
probe1 => {
probe1.request(10)
val offs =
probe1.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") =>
e }.offset.asInstanceOf[TimeBasedUUID]
- withProbe(queries.eventsByTag(tag = "T9", offset =
offs).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T9", offset =
offs).runWith(TestSink[Any]()),
probe2 => {
probe2.request(10)
@@ -737,7 +737,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal
= 100ms
writeTaggedEvent(t1.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n,
bucketSize)
}
- withProbe(queries.eventsByTag(tag = "T10", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T10", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(1000)
probe.expectNextN(100)
@@ -767,7 +767,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal
= 100ms
val t1 = LocalDateTime.now(ZoneOffset.UTC)
withProbe(
- queries.eventsByTag(tag = "T11", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ queries.eventsByTag(tag = "T11", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(1000)
@@ -836,7 +836,7 @@ class EventsByTagStrictBySeqNoEarlyFirstOffsetSpec
// the search for delayed events should start before we get to the
current timebucket
// until 0.26/0.51 backtracking was broken and events would be skipped
- withProbe(queries.eventsByTag(tag = "T11", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T11", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(2000)
probe.expectNextN(2000)
@@ -873,7 +873,7 @@ class EventsByTagLongRefreshIntervalSpec
sender.expectNoMessage(200.millis) // try and give time for the tagged
event to be flushed so the query doesn't need to wait for the refresh interval
val offset: Offset =
- withProbe(queries.eventsByTag(tag = "animal", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "animal", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
probe.request(2)
probe.expectNextPF {
@@ -887,7 +887,7 @@ class EventsByTagLongRefreshIntervalSpec
// flush interval for tag writes is 0ms but still give some time for the
tag write to complete
sender.expectNoMessage(250.millis)
- withProbe(queries.eventsByTag(tag = "animal", offset =
offset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "animal", offset =
offset).runWith(TestSink[Any]()),
probe => {
probe.request(2)
// less than the refresh interval, previously this would evaluate the
new persistence-id timeout and then not re-evaluate
@@ -927,7 +927,7 @@ class EventsByTagStrictBySeqNoManyInCurrentTimeBucketSpec
// the search for delayed events should start before we get to the
current timebucket
// until 0.26/0.51 backtracking was broken and events would be skipped
val src = queries.currentEventsByTag(tag = "T12", offset = NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(2000)
probe.expectNextN(200)
probe.expectComplete()
@@ -962,7 +962,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends
AbstractEventsByTagSpec(Even
}
withProbe(
- queries.eventsByTag(tag = "T13", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ queries.eventsByTag(tag = "T13", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
val requested1 = 150L
@@ -1034,7 +1034,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends
AbstractEventsByTagSpec(Even
writeTaggedEvent(t2, eventB, Set("T14"), n - 112, bucketSize)
}
- withProbe(queries.eventsByTag(tag = "T14", offset =
NoOffset).runWith(TestSink.probe[Any]),
+ withProbe(queries.eventsByTag(tag = "T14", offset =
NoOffset).runWith(TestSink[Any]()),
probe => {
val requested1 = 130L
@@ -1073,7 +1073,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends
AbstractEventsByTagSpec(Even
}
val src = queries.currentEventsByTag(tag = "T15", offset = NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
(1 to 10).foreach { _ =>
probe.request(30)
@@ -1114,7 +1114,7 @@ class EventsByTagSpecBackTrackingLongRefreshInterval
val tagName = "back-track-previous-bucket-no-refresh"
writeTaggedEvent(PersistentRepr("e1", 1L, "p2", ""), Set(tagName), 1,
bucketSize)
val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(10)
// bring the offset forward with an event for a new persistence id
probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "e1") => e }
@@ -1160,7 +1160,7 @@ class EventsByTagSpecBackTracking
writeTaggedEvent(t1.plusMinutes(1), PersistentRepr("e2", 2L, "p1", ""),
Set("back-track"), 2, bucketSize)
val src = queries.eventsByTag(tag = "back-track", offset = NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(10)
probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
@@ -1191,7 +1191,7 @@ class EventsByTagSpecBackTracking
writeTaggedEvent(t1.plusHours(1), PersistentRepr("e2", 2L, "p1", ""),
Set(tagName), 2, bucketSize)
val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(10)
probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
@@ -1219,7 +1219,7 @@ class EventsByTagSpecBackTracking
"find new persistence ids that were missed" in {
val tagName = "back-track-new-persistence-id"
val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(10)
writeTaggedEvent(PersistentRepr("e1", 1L, "p1", ""), Set(tagName), 1,
bucketSize)
writeTaggedEvent(PersistentRepr("e2", 2L, "p1", ""), Set(tagName), 2,
bucketSize)
@@ -1240,7 +1240,7 @@ class EventsByTagSpecBackTracking
"sort delayed events by timeuuid" in {
val tagName = "back-track-sort-delayed-events"
val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(10)
writeTaggedEvent(PersistentRepr("e1", 1L, "p1", ""), Set(tagName), 1,
bucketSize)
probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
@@ -1265,7 +1265,7 @@ class EventsByTagSpecBackTracking
"work for many delayed events for different persistence ids" in {
val tagName = "back-track-many-persistence-ids"
val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
- val probe = src.runWith(TestSink.probe[Any])
+ val probe = src.runWith(TestSink[Any]())
probe.request(1005)
// short period is 10m
val start = today.minusMinutes(9)
@@ -1379,7 +1379,7 @@ class EventsByTagPersistenceIdCleanupSpec extends
AbstractEventsByTagSpec(Events
val query =
queries.eventsByTag("cleanup-tag",
TimeBasedUUID(Uuids.startOf(t1.toInstant(ZoneOffset.UTC).toEpochMilli - 1L)))
- val probe = query.runWith(TestSink.probe[Any])
+ val probe = query.runWith(TestSink[Any]())
probe.request(10)
probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 1L,
"cleanup-1") => e }
probe.expectNoMessage(cleanupPeriod + 250.millis)
@@ -1411,14 +1411,14 @@ class EventsByTagDisabledSpec extends
AbstractEventsByTagSpec(EventsByTagSpec.di
"fail current events by tag queries" in {
val greenSrc = queries.currentEventsByTag(tag = "green", offset =
NoOffset)
- val probe = greenSrc.runWith(TestSink.probe[Any])
+ val probe = greenSrc.runWith(TestSink[Any]())
probe.request(1)
probe.expectError().getMessage should include("Events by tag queries are
disabled")
}
"fail live events by tag queries" in {
val greenSrc = queries.eventsByTag(tag = "green", offset = NoOffset)
- val probe = greenSrc.runWith(TestSink.probe[Any])
+ val probe = greenSrc.runWith(TestSink[Any]())
probe.request(1)
probe.expectError().getMessage should include("Events by tag queries are
disabled")
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStageSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStageSpec.scala
index adb38c4..ba266c0 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStageSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStageSpec.scala
@@ -94,7 +94,7 @@ class EventsByTagStageSpec
send(ref, Tagged("e-4", Set("tag-2")))
val stream = queries.currentEventsByTag("tag-2", NoOffset)
- val sub = stream.toMat(TestSink.probe[EventEnvelope])(Keep.right).run()
+ val sub = stream.toMat(TestSink[EventEnvelope]())(Keep.right).run()
sub.request(2)
sub.expectNoMessage(50.millis) // eventual consistency delay prevents
events coming right away
@@ -108,7 +108,7 @@ class EventsByTagStageSpec
"empty tag completes" in {
val stream: Source[EventEnvelope, NotUsed] =
queries.currentEventsByTag("bogus", NoOffset)
- val sub = stream.toMat(TestSink.probe[EventEnvelope])(Keep.right).run()
+ val sub = stream.toMat(TestSink[EventEnvelope]())(Keep.right).run()
sub.request(1)
sub.expectComplete()
}
@@ -120,7 +120,7 @@ class EventsByTagStageSpec
}
val stream = queries.currentEventsByTag("paged", NoOffset)
- val sub = stream.runWith(TestSink.probe[EventEnvelope])
+ val sub = stream.runWith(TestSink[EventEnvelope]())
sub.request(fetchSize + 1L)
(1L to (fetchSize + 1)).foreach { i =>
@@ -142,7 +142,7 @@ class EventsByTagStageSpec
bucketSize)
val tagStream = queries.currentEventsByTag("CurrentPreviousBuckets",
NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(2)
sub.expectNextPF { case EventEnvelope(_, "p-3", 1, "e-1") => }
@@ -180,7 +180,7 @@ class EventsByTagStageSpec
bucketSize)
val tagStream =
queries.currentEventsByTag("CurrentPreviousMultipleBuckets", NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(2)
sub.expectNextPF { case EventEnvelope(_, "l-4", 1, "e-1") => }
@@ -205,7 +205,7 @@ class EventsByTagStageSpec
writeTaggedEvent(PersistentRepr("p1e4", 4, "p-1"), Set(tag), 4,
times(4), bucketSize)
val tagStream = queries.currentEventsByTag(tag, NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(4)
sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -236,7 +236,7 @@ class EventsByTagStageSpec
writeTaggedEvent(PersistentRepr("p1e4", 4, "p-1"), Set(tag), 4,
times(4), bucketSize)
val tagStream: Source[EventEnvelope, NotUsed] =
queries.currentEventsByTag(tag, NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(4)
sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -258,7 +258,7 @@ class EventsByTagStageSpec
writeTaggedEvent(thisBucket, PersistentRepr("p1e4", 4, "p-1"), Set(tag),
4, bucketSize)
val tagStream = queries.currentEventsByTag(tag, NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(5)
sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -287,7 +287,7 @@ class EventsByTagStageSpec
val tagStream = queries.currentEventsByTag(
tag,
queries.timeBasedUUIDFrom(twoBucketsAgo.minusMinutes(1).toInstant(ZoneOffset.UTC).toEpochMilli))
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(3)
val f0: PartialFunction[Any, Any] = { case EventEnvelope(_, "p-1", 10,
"p1e10") => }
@@ -309,7 +309,7 @@ class EventsByTagStageSpec
val stream: Source[EventEnvelope, NotUsed] =
queries.eventsByTag("bogus", NoOffset)
- val sub = stream.toMat(TestSink.probe[EventEnvelope])(Keep.right).run()
+ val sub = stream.toMat(TestSink[EventEnvelope]())(Keep.right).run()
sub.request(1)
sub.expectNoMessage(waitTime)
}
@@ -319,7 +319,7 @@ class EventsByTagStageSpec
send(ref, Tagged("e-1", Set("tag-3")))
val blackSrc = queries.eventsByTag(tag = "tag-3", offset = NoOffset)
- val probe = blackSrc.runWith(TestSink.probe[EventEnvelope])
+ val probe = blackSrc.runWith(TestSink[EventEnvelope]())
probe.request(2)
probe.expectNextPF { case EventEnvelope(_, "b", 1L, "e-1") => }
probe.expectNoMessage(waitTime)
@@ -356,7 +356,7 @@ class EventsByTagStageSpec
bucketSize)
val tagStream = queries.eventsByTag("LivePreviousBuckets", NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(2)
sub.expectNextPF { case EventEnvelope(_, "l-4", 1, "e-1") => }
@@ -394,7 +394,7 @@ class EventsByTagStageSpec
writeTaggedEvent(PersistentRepr("p1e4", 4, "p-1"), Set(tag), 4,
times(4), bucketSize)
val tagStream = queries.eventsByTag(tag, NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(4)
sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -423,7 +423,7 @@ class EventsByTagStageSpec
writeTaggedEvent(PersistentRepr("p1e6", 6, "p-1"), Set(tag), 6,
times(6), bucketSize)
val tagStream = queries.eventsByTag(tag, NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(4)
sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -457,7 +457,7 @@ class EventsByTagStageSpec
writeTaggedEvent(PersistentRepr("p2e3", 3, "p-2"), Set(tag), 3,
times(3), bucketSize)
val tagStream = queries.eventsByTag(tag, NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(10)
sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -487,7 +487,7 @@ class EventsByTagStageSpec
writeTaggedEvent(nowTime.plusSeconds(1), PersistentRepr("e-3", 3,
"p-1"), Set(tag), 3, bucketSize)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(5)
sub.expectNoMessage(waitTime)
@@ -510,7 +510,7 @@ class EventsByTagStageSpec
writeTaggedEvent(thisBucket, PersistentRepr("p1e4", 4, "p-1"), Set(tag),
4, bucketSize)
val tagStream = queries.eventsByTag(tag, NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(5)
sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -538,7 +538,7 @@ class EventsByTagStageSpec
queries.eventsByTag(
tag,
queries.timeBasedUUIDFrom(nowTime.minusSeconds(1).toInstant(ZoneOffset.UTC).toEpochMilli))
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(4)
sub.expectNoMessage(100.millis)
@@ -558,7 +558,7 @@ class EventsByTagStageSpec
val nowTime = LocalDateTime.now(ZoneOffset.UTC)
writeTaggedEvent(nowTime.plusSeconds(1), PersistentRepr("p1e1", 1,
"p1"), Set(tag), 1, bucketSize)
val tagStream = queries.eventsByTag(tag, NoOffset)
- val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+ val sub = tagStream.runWith(TestSink[EventEnvelope]())
sub.request(2)
sub.expectNextPF { case EventEnvelope(_, "p1", 1, "p1e1") => }
writeTaggedEvent(nowTime.plusSeconds(2), PersistentRepr("p1e10000",
10000, "p1"), Set(tag), 10000, bucketSize)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala
index f8ca36f..f0822bd 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala
@@ -57,7 +57,7 @@ class CassandraReadJournalSpec extends
CassandraSpec(CassandraReadJournalSpec.co
expectMsg("a-1-done")
val src = javaQueries.eventsByPersistenceId("a", 0L, Long.MaxValue)
-
src.asScala.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("a").cancel()
+
src.asScala.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("a").cancel()
}
"start current eventsByPersistenceId query" in {
@@ -66,14 +66,14 @@ class CassandraReadJournalSpec extends
CassandraSpec(CassandraReadJournalSpec.co
expectMsg("b-1-done")
val src = javaQueries.currentEventsByPersistenceId("b", 0L,
Long.MaxValue)
-
src.asScala.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("b").expectComplete()
+
src.asScala.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("b").expectComplete()
}
"start eventsByTag query" in {
val src = javaQueries.eventsByTag("a", Offset.noOffset)
src.asScala
.map(_.persistenceId)
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(10)
.expectNext("a")
.expectNoMessage(100.millis)
@@ -82,7 +82,7 @@ class CassandraReadJournalSpec extends
CassandraSpec(CassandraReadJournalSpec.co
"start current eventsByTag query" in {
val src = javaQueries.currentEventsByTag("a", Offset.noOffset)
-
src.asScala.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("a").expectComplete()
+
src.asScala.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("a").expectComplete()
}
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala
index 42780df..44c0d17 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala
@@ -58,7 +58,7 @@ class CassandraReadJournalSpec extends
CassandraSpec(CassandraReadJournalSpec.co
expectMsg("a-1-done")
val src = queries.eventsByPersistenceId("a", 0L, Long.MaxValue)
-
src.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("a").cancel()
+
src.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("a").cancel()
}
"start current eventsByPersistenceId query" in {
@@ -67,7 +67,7 @@ class CassandraReadJournalSpec extends
CassandraSpec(CassandraReadJournalSpec.co
expectMsg("b-1-done")
val src = queries.currentEventsByPersistenceId("b", 0L, Long.MaxValue)
-
src.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("b").expectComplete()
+
src.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("b").expectComplete()
}
// these tests rely on events written in previous tests
@@ -75,7 +75,7 @@ class CassandraReadJournalSpec extends
CassandraSpec(CassandraReadJournalSpec.co
val src = queries.eventsByTag("a", NoOffset)
src
.map(_.persistenceId)
- .runWith(TestSink.probe[Any])
+ .runWith(TestSink[Any]())
.request(10)
.expectNext("a")
.expectNoMessage(100.millis)
@@ -84,7 +84,7 @@ class CassandraReadJournalSpec extends
CassandraSpec(CassandraReadJournalSpec.co
"start current eventsByTag query" in {
val src = queries.currentEventsByTag("a", NoOffset)
-
src.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("a").expectComplete()
+
src.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("a").expectComplete()
}
"insert Cassandra metrics to Cassandra Metrics Registry" in {
diff --git a/project/PekkoConnectorsDependency.scala
b/project/PekkoConnectorsDependency.scala
index 0d6d898..6ba49c1 100644
--- a/project/PekkoConnectorsDependency.scala
+++ b/project/PekkoConnectorsDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoConnectorsDependency extends PekkoDependency {
override val checkProject: String = "pekko-connectors-cassandra"
override val module: Option[String] = Some("connectors")
- override val currentVersion: String = "1.1.0"
+ override val currentVersion: String = "1.2.0"
}
diff --git a/project/PekkoCoreDependency.scala
b/project/PekkoCoreDependency.scala
index 992921f..61d5bd7 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
- override val currentVersion: String = "1.1.5"
+ override val currentVersion: String = "1.3.0"
}
diff --git a/project/PekkoManagementDependency.scala
b/project/PekkoManagementDependency.scala
index 342018d..ce712ec 100644
--- a/project/PekkoManagementDependency.scala
+++ b/project/PekkoManagementDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoManagementDependency extends PekkoDependency {
override val checkProject: String = "pekko-discovery-aws-api-async"
override val module: Option[String] = Some("management")
- override val currentVersion: String = "1.1.0-M1"
+ override val currentVersion: String = "1.1.1"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]