This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pekko-persistence-cassandra.git
commit b19a95a72cc9a43d5d56e0c0ba8a9f809712135e Author: Scala Steward <[email protected]> AuthorDate: Sat Jan 17 16:27:57 2026 +0000 Reformat with scalafmt 3.10.4 Executed command: scalafmt --non-interactive --- build.sbt | 12 ++++++--- .../compaction/LeveledCompactionStrategy.scala | 3 +-- .../compaction/SizeTieredCompactionStrategy.scala | 2 +- .../cassandra/journal/CassandraTagRecovery.scala | 9 +++---- .../persistence/cassandra/journal/TagWriter.scala | 3 ++- .../query/EventsByPersistenceIdStage.scala | 9 ++++--- .../cassandra/query/EventsByTagStage.scala | 7 ++++-- .../cassandra/cleanup/CleanupSpec.scala | 3 ++- .../persistence/cassandra/journal/BufferSpec.scala | 3 ++- .../cassandra/journal/ManyActorsLoadSpec.scala | 4 ++- .../cassandra/journal/TagWriterSpec.scala | 29 +++++++++++----------- 11 files changed, 46 insertions(+), 38 deletions(-) diff --git a/build.sbt b/build.sbt index 2be7aa5..a89777f 100644 --- a/build.sbt +++ b/build.sbt @@ -171,14 +171,18 @@ lazy val docs = project "extref.github.base_url" -> s"https://github.com/apache/pekko-persistence-jdbc/blob/${if (isSnapshot.value) "main" else "v" + version.value}/%s", // Connectors - "extref.pekko-connectors.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.pekkoConnectorsVersionInDocs}/%s", - "scaladoc.org.apache.pekko.stream.connectors.base_url" -> s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.pekkoConnectorsVersionInDocs}/", + "extref.pekko-connectors.base_url" -> + s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.pekkoConnectorsVersionInDocs}/%s", + "scaladoc.org.apache.pekko.stream.connectors.base_url" -> + s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.pekkoConnectorsVersionInDocs}/", "javadoc.org.apache.pekko.stream.connectors.base_url" -> "", // Cassandra "extref.cassandra.base_url" -> s"https://cassandra.apache.org/doc/${Dependencies.cassandraVersionInDocs}/%s", // Datastax Java driver - "extref.java-driver.base_url" -> s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.driverVersionInDocs}/%s", - "javadoc.com.datastax.oss.base_url" -> s"https://docs.datastax.com/en/drivers/java/${Dependencies.driverVersionInDocs}/", + "extref.java-driver.base_url" -> + s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.driverVersionInDocs}/%s", + "javadoc.com.datastax.oss.base_url" -> + s"https://docs.datastax.com/en/drivers/java/${Dependencies.driverVersionInDocs}/", // Java "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/", // Scala diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/LeveledCompactionStrategy.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/LeveledCompactionStrategy.scala index 0b0c3cb..68956bc 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/LeveledCompactionStrategy.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/LeveledCompactionStrategy.scala @@ -40,8 +40,7 @@ class LeveledCompactionStrategy(config: Config) object LeveledCompactionStrategy extends CassandraCompactionStrategyConfig[LeveledCompactionStrategy] { override val ClassName: String = "LeveledCompactionStrategy" - override def propertyKeys: List[String] = - (BaseCompactionStrategy.propertyKeys ++ List("sstable_size_in_mb")).sorted + override def propertyKeys: List[String] = (BaseCompactionStrategy.propertyKeys ++ List("sstable_size_in_mb")).sorted override def fromConfig(config: Config): LeveledCompactionStrategy = new LeveledCompactionStrategy(config) diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala index b6ebd87..f659057 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala @@ -60,7 +60,7 @@ object SizeTieredCompactionStrategy extends CassandraCompactionStrategyConfig[Si override def propertyKeys: List[String] = (BaseCompactionStrategy.propertyKeys ++ - List("bucket_high", "bucket_low", "max_threshold", "min_threshold", "min_sstable_size")).sorted + List("bucket_high", "bucket_low", "max_threshold", "min_threshold", "min_sstable_size")).sorted override def fromConfig(config: Config): SizeTieredCompactionStrategy = new SizeTieredCompactionStrategy(config) diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala index 9898a59..6afe452 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala @@ -21,11 +21,7 @@ import pekko.event.LoggingAdapter import pekko.persistence.cassandra.journal.CassandraJournal.{ SequenceNr, Tag } import pekko.persistence.cassandra.journal.TagWriter.TagProgress import pekko.persistence.cassandra.journal.TagWriters.{ - PersistentActorStarting, - PersistentActorStartingAck, - SetTagProgress, - TagProcessAck, - TagWrite + PersistentActorStarting, PersistentActorStartingAck, SetTagProgress, TagProcessAck, TagWrite } import pekko.persistence.cassandra.Extractors.RawEvent import pekko.stream.scaladsl.Sink @@ -72,7 +68,8 @@ import pekko.stream.connectors.cassandra.scaladsl.CassandraSession }) .map(rs => rs.foldLeft(Map.empty[String, TagProgress]) { (acc, row) => - acc + (row.getString("tag") -> TagProgress( + acc + + (row.getString("tag") -> TagProgress( persistenceId, row.getLong("sequence_nr"), row.getLong("tag_pid_sequence_nr"))) diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala index 37b419f..fa8016f 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala @@ -359,7 +359,8 @@ import scala.util.{ Failure, Success, Try } acc + (event.persistenceId -> PidProgress(from, event.sequenceNr, tagPidSequenceNr, event.timeUuid)) case None => - acc + (event.persistenceId -> PidProgress( + acc + + (event.persistenceId -> PidProgress( event.sequenceNr, event.sequenceNr, tagPidSequenceNr, diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala index 1d89724..48c842d 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala @@ -247,8 +247,7 @@ import scala.util.{ Failure, Success, Try } partition = nextPartition } - def partitionNr(sequenceNr: Long): Long = - (sequenceNr - 1L) / journalSettings.targetPartitionSize + def partitionNr(sequenceNr: Long): Long = (sequenceNr - 1L) / journalSettings.targetPartitionSize override def preStart(): Unit = { queryState = QueryInProgress(switchPartition = false, fetchMore = false, System.nanoTime()) @@ -413,11 +412,13 @@ import scala.util.{ Failure, Success, Try } } else { val row = rs.one() val sequenceNr = extractSeqNr(row) - if ((sequenceNr < expectedNextSeqNr && fastForwardEnabled) || pendingFastForward + if ((sequenceNr < expectedNextSeqNr && fastForwardEnabled) || + pendingFastForward .isDefined && pendingFastForward.get > sequenceNr) { // skip event due to fast forward tryPushOne() - } else if (pendingFastForward.isEmpty && querySettings + } else if (pendingFastForward.isEmpty && + querySettings .gapFreeSequenceNumbers && sequenceNr > expectedNextSeqNr) { // we will probably now come in here which isn't what we want lookingForMissingSeqNr match { diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala index e468c10..50c3757 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala @@ -662,7 +662,9 @@ import scala.util.{ Failure, Success, Try } .tagPidSequenceNumberUpdate(repr.persistenceId, (1, repr.offset, System.currentTimeMillis()))) push(out, repr) false - } else if (usingOffset && (stageState.currentTimeBucket.inPast || eventsByTagSettings + } else if (usingOffset && + (stageState.currentTimeBucket.inPast || + eventsByTagSettings .newPersistenceIdScanTimeout == Duration.Zero)) { // If we're in the past and this is an offset query we assume this is // the first tagPidSequenceNr @@ -680,7 +682,8 @@ import scala.util.{ Failure, Success, Try } } else { if (log.isDebugEnabled) { log.debug( - s"[${stageUuid}] " + " [{}]: Persistence Id not in metadata: [{}] does not start at tag pid sequence nr 1. " + + s"[${stageUuid}] " + + " [{}]: Persistence Id not in metadata: [{}] does not start at tag pid sequence nr 1. " + "This could either be that the events are before the offset, that the metadata has been dropped or that they are delayed. " + "Tag pid sequence nr found: [{}]. Looking for lower tag pid sequence nrs for [{}] in the current and previous buckets.", session.tag, diff --git a/core/src/test/scala/org/apache/pekko/persistence/cassandra/cleanup/CleanupSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/cassandra/cleanup/CleanupSpec.scala index 2e73cf8..2d9a36e 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/cassandra/cleanup/CleanupSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/cassandra/cleanup/CleanupSpec.scala @@ -269,7 +269,8 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting { .futureValue shouldEqual List("evt-7", "evt-8", "evt-9") } - "clean up before snapshot including timestamp that results in all events kept for one persistence id" taggedAs RequiresCassandraThree in { + "clean up before snapshot including timestamp that results in all events kept for one persistence id" taggedAs + RequiresCassandraThree in { val pid = nextPid val p = system.actorOf(TestActor.props(pid)) (1 to 3).foreach { i => diff --git a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/BufferSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/BufferSpec.scala index 1c4d6d2..62e08e8 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/BufferSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/BufferSpec.scala @@ -53,7 +53,8 @@ class BufferSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { .add(AwaitingWrite(List((e2, 12)), OptionVal.None)) buffer.shouldWrite() shouldEqual true - buffer.nextBatch shouldEqual (List( + buffer.nextBatch shouldEqual + (List( AwaitingWrite(List((e1, 1)), OptionVal.None), AwaitingWrite(List((e2, 12)), OptionVal.None))) } diff --git a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/ManyActorsLoadSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/ManyActorsLoadSpec.scala index d70aac4..e28f75a 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/ManyActorsLoadSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/ManyActorsLoadSpec.scala @@ -85,7 +85,9 @@ class ManyActorsLoadSpec extends CassandraSpec(ManyActorsLoadSpec.config) { val rounds = 1 // increase this to 10 when benchmarking val deadline = - Deadline.now + rounds * system.settings.config + Deadline.now + + rounds * + system.settings.config .getDuration("pekko.persistence.cassandra.events-by-tag.scanning-flush-interval", TimeUnit.MILLISECONDS) .millis + 2.seconds diff --git a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagWriterSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagWriterSpec.scala index b84625f..684b739 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagWriterSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagWriterSpec.scala @@ -253,22 +253,21 @@ class TagWriterSpec val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 100, flushInterval = 500.millis)) val bucket = nowBucket() - val allEvents = - (1 to 6).foldLeft(Vector.empty[Serialized]) { - case (acc, n) => - val evt = event("p1", n, s"e-$n", bucket) - val events = acc :+ evt - ref ! TagWrite(tagName, Vector(evt)) - Thread.sleep(200) - if (n == 3) { - probe.within(200.millis) { - probe.expectMsg(events.map(evt => toEw(evt, evt.sequenceNr))) - probe.expectMsg( - ProgressWrite("p1", events.last.sequenceNr, events.last.sequenceNr, events.last.timeUuid)) - } + val allEvents = (1 to 6).foldLeft(Vector.empty[Serialized]) { + case (acc, n) => + val evt = event("p1", n, s"e-$n", bucket) + val events = acc :+ evt + ref ! TagWrite(tagName, Vector(evt)) + Thread.sleep(200) + if (n == 3) { + probe.within(200.millis) { + probe.expectMsg(events.map(evt => toEw(evt, evt.sequenceNr))) + probe.expectMsg( + ProgressWrite("p1", events.last.sequenceNr, events.last.sequenceNr, events.last.timeUuid)) } - events - } + } + events + } val remainingFlushedEvents = allEvents.drop(3) probe.expectMsg(remainingFlushedEvents.map(evt => toEw(evt, evt.sequenceNr))) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
