This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/pekko-persistence-cassandra.git

commit b19a95a72cc9a43d5d56e0c0ba8a9f809712135e
Author: Scala Steward <[email protected]>
AuthorDate: Sat Jan 17 16:27:57 2026 +0000

    Reformat with scalafmt 3.10.4
    
    Executed command: scalafmt --non-interactive
---
 build.sbt                                          | 12 ++++++---
 .../compaction/LeveledCompactionStrategy.scala     |  3 +--
 .../compaction/SizeTieredCompactionStrategy.scala  |  2 +-
 .../cassandra/journal/CassandraTagRecovery.scala   |  9 +++----
 .../persistence/cassandra/journal/TagWriter.scala  |  3 ++-
 .../query/EventsByPersistenceIdStage.scala         |  9 ++++---
 .../cassandra/query/EventsByTagStage.scala         |  7 ++++--
 .../cassandra/cleanup/CleanupSpec.scala            |  3 ++-
 .../persistence/cassandra/journal/BufferSpec.scala |  3 ++-
 .../cassandra/journal/ManyActorsLoadSpec.scala     |  4 ++-
 .../cassandra/journal/TagWriterSpec.scala          | 29 +++++++++++-----------
 11 files changed, 46 insertions(+), 38 deletions(-)

diff --git a/build.sbt b/build.sbt
index 2be7aa5..a89777f 100644
--- a/build.sbt
+++ b/build.sbt
@@ -171,14 +171,18 @@ lazy val docs = project
       "extref.github.base_url" -> 
s"https://github.com/apache/pekko-persistence-jdbc/blob/${if (isSnapshot.value) 
"main"
         else "v" + version.value}/%s",
       // Connectors
-      "extref.pekko-connectors.base_url" -> 
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.pekkoConnectorsVersionInDocs}/%s";,
-      "scaladoc.org.apache.pekko.stream.connectors.base_url" -> 
s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.pekkoConnectorsVersionInDocs}/";,
+      "extref.pekko-connectors.base_url" ->
+      
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.pekkoConnectorsVersionInDocs}/%s";,
+      "scaladoc.org.apache.pekko.stream.connectors.base_url" ->
+      
s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.pekkoConnectorsVersionInDocs}/";,
       "javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
       // Cassandra
       "extref.cassandra.base_url" -> 
s"https://cassandra.apache.org/doc/${Dependencies.cassandraVersionInDocs}/%s";,
       // Datastax Java driver
-      "extref.java-driver.base_url" -> 
s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.driverVersionInDocs}/%s";,
-      "javadoc.com.datastax.oss.base_url" -> 
s"https://docs.datastax.com/en/drivers/java/${Dependencies.driverVersionInDocs}/";,
+      "extref.java-driver.base_url" ->
+      
s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.driverVersionInDocs}/%s";,
+      "javadoc.com.datastax.oss.base_url" ->
+      
s"https://docs.datastax.com/en/drivers/java/${Dependencies.driverVersionInDocs}/";,
       // Java
       "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/";,
       // Scala
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
index 0b0c3cb..68956bc 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
@@ -40,8 +40,7 @@ class LeveledCompactionStrategy(config: Config)
 object LeveledCompactionStrategy extends 
CassandraCompactionStrategyConfig[LeveledCompactionStrategy] {
   override val ClassName: String = "LeveledCompactionStrategy"
 
-  override def propertyKeys: List[String] =
-    (BaseCompactionStrategy.propertyKeys ++ List("sstable_size_in_mb")).sorted
+  override def propertyKeys: List[String] = 
(BaseCompactionStrategy.propertyKeys ++ List("sstable_size_in_mb")).sorted
 
   override def fromConfig(config: Config): LeveledCompactionStrategy =
     new LeveledCompactionStrategy(config)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
index b6ebd87..f659057 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
@@ -60,7 +60,7 @@ object SizeTieredCompactionStrategy extends 
CassandraCompactionStrategyConfig[Si
 
   override def propertyKeys: List[String] =
     (BaseCompactionStrategy.propertyKeys ++
-      List("bucket_high", "bucket_low", "max_threshold", "min_threshold", 
"min_sstable_size")).sorted
+    List("bucket_high", "bucket_low", "max_threshold", "min_threshold", 
"min_sstable_size")).sorted
 
   override def fromConfig(config: Config): SizeTieredCompactionStrategy =
     new SizeTieredCompactionStrategy(config)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala
index 9898a59..6afe452 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.scala
@@ -21,11 +21,7 @@ import pekko.event.LoggingAdapter
 import pekko.persistence.cassandra.journal.CassandraJournal.{ SequenceNr, Tag }
 import pekko.persistence.cassandra.journal.TagWriter.TagProgress
 import pekko.persistence.cassandra.journal.TagWriters.{
-  PersistentActorStarting,
-  PersistentActorStartingAck,
-  SetTagProgress,
-  TagProcessAck,
-  TagWrite
+  PersistentActorStarting, PersistentActorStartingAck, SetTagProgress, 
TagProcessAck, TagWrite
 }
 import pekko.persistence.cassandra.Extractors.RawEvent
 import pekko.stream.scaladsl.Sink
@@ -72,7 +68,8 @@ import 
pekko.stream.connectors.cassandra.scaladsl.CassandraSession
       })
       .map(rs =>
         rs.foldLeft(Map.empty[String, TagProgress]) { (acc, row) =>
-          acc + (row.getString("tag") -> TagProgress(
+          acc +
+          (row.getString("tag") -> TagProgress(
             persistenceId,
             row.getLong("sequence_nr"),
             row.getLong("tag_pid_sequence_nr")))
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala
index 37b419f..fa8016f 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala
@@ -359,7 +359,8 @@ import scala.util.{ Failure, Success, Try }
 
             acc + (event.persistenceId -> PidProgress(from, event.sequenceNr, 
tagPidSequenceNr, event.timeUuid))
           case None =>
-            acc + (event.persistenceId -> PidProgress(
+            acc +
+            (event.persistenceId -> PidProgress(
               event.sequenceNr,
               event.sequenceNr,
               tagPidSequenceNr,
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
index 1d89724..48c842d 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
@@ -247,8 +247,7 @@ import scala.util.{ Failure, Success, Try }
           partition = nextPartition
       }
 
-      def partitionNr(sequenceNr: Long): Long =
-        (sequenceNr - 1L) / journalSettings.targetPartitionSize
+      def partitionNr(sequenceNr: Long): Long = (sequenceNr - 1L) / 
journalSettings.targetPartitionSize
 
       override def preStart(): Unit = {
         queryState = QueryInProgress(switchPartition = false, fetchMore = 
false, System.nanoTime())
@@ -413,11 +412,13 @@ import scala.util.{ Failure, Success, Try }
             } else {
               val row = rs.one()
               val sequenceNr = extractSeqNr(row)
-              if ((sequenceNr < expectedNextSeqNr && fastForwardEnabled) || 
pendingFastForward
+              if ((sequenceNr < expectedNextSeqNr && fastForwardEnabled) ||
+                pendingFastForward
                   .isDefined && pendingFastForward.get > sequenceNr) {
                 // skip event due to fast forward
                 tryPushOne()
-              } else if (pendingFastForward.isEmpty && querySettings
+              } else if (pendingFastForward.isEmpty &&
+                querySettings
                   .gapFreeSequenceNumbers && sequenceNr > expectedNextSeqNr) {
                 // we will probably now come in here which isn't what we want
                 lookingForMissingSeqNr match {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
index e468c10..50c3757 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
@@ -662,7 +662,9 @@ import scala.util.{ Failure, Success, Try }
               .tagPidSequenceNumberUpdate(repr.persistenceId, (1, repr.offset, 
System.currentTimeMillis())))
           push(out, repr)
           false
-        } else if (usingOffset && (stageState.currentTimeBucket.inPast || 
eventsByTagSettings
+        } else if (usingOffset &&
+          (stageState.currentTimeBucket.inPast ||
+          eventsByTagSettings
             .newPersistenceIdScanTimeout == Duration.Zero)) {
           // If we're in the past and this is an offset query we assume this is
           // the first tagPidSequenceNr
@@ -680,7 +682,8 @@ import scala.util.{ Failure, Success, Try }
         } else {
           if (log.isDebugEnabled) {
             log.debug(
-              s"[${stageUuid}] " + " [{}]: Persistence Id not in metadata: 
[{}] does not start at tag pid sequence nr 1. " +
+              s"[${stageUuid}] " +
+              " [{}]: Persistence Id not in metadata: [{}] does not start at 
tag pid sequence nr 1. " +
               "This could either be that the events are before the offset, 
that the metadata has been dropped or that they are delayed. " +
               "Tag pid sequence nr found: [{}]. Looking for lower tag pid 
sequence nrs for [{}] in the current and previous buckets.",
               session.tag,
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/cleanup/CleanupSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/cleanup/CleanupSpec.scala
index 2e73cf8..2d9a36e 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/cleanup/CleanupSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/cleanup/CleanupSpec.scala
@@ -269,7 +269,8 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) 
with DirectWriting {
         .futureValue shouldEqual List("evt-7", "evt-8", "evt-9")
     }
 
-    "clean up before snapshot including timestamp that results in all events 
kept for one persistence id" taggedAs RequiresCassandraThree in {
+    "clean up before snapshot including timestamp that results in all events 
kept for one persistence id" taggedAs
+    RequiresCassandraThree in {
       val pid = nextPid
       val p = system.actorOf(TestActor.props(pid))
       (1 to 3).foreach { i =>
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/BufferSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/BufferSpec.scala
index 1c4d6d2..62e08e8 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/BufferSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/BufferSpec.scala
@@ -53,7 +53,8 @@ class BufferSpec extends AnyWordSpec with Matchers with 
BeforeAndAfterAll {
         .add(AwaitingWrite(List((e2, 12)), OptionVal.None))
 
       buffer.shouldWrite() shouldEqual true
-      buffer.nextBatch shouldEqual (List(
+      buffer.nextBatch shouldEqual
+      (List(
         AwaitingWrite(List((e1, 1)), OptionVal.None),
         AwaitingWrite(List((e2, 12)), OptionVal.None)))
     }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/ManyActorsLoadSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/ManyActorsLoadSpec.scala
index d70aac4..e28f75a 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/ManyActorsLoadSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/ManyActorsLoadSpec.scala
@@ -85,7 +85,9 @@ class ManyActorsLoadSpec extends 
CassandraSpec(ManyActorsLoadSpec.config) {
 
       val rounds = 1 // increase this to 10 when benchmarking
       val deadline =
-        Deadline.now + rounds * system.settings.config
+        Deadline.now +
+        rounds *
+        system.settings.config
           
.getDuration("pekko.persistence.cassandra.events-by-tag.scanning-flush-interval",
 TimeUnit.MILLISECONDS)
           .millis + 2.seconds
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagWriterSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagWriterSpec.scala
index b84625f..684b739 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagWriterSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagWriterSpec.scala
@@ -253,22 +253,21 @@ class TagWriterSpec
       val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 
100, flushInterval = 500.millis))
       val bucket = nowBucket()
 
-      val allEvents =
-        (1 to 6).foldLeft(Vector.empty[Serialized]) {
-          case (acc, n) =>
-            val evt = event("p1", n, s"e-$n", bucket)
-            val events = acc :+ evt
-            ref ! TagWrite(tagName, Vector(evt))
-            Thread.sleep(200)
-            if (n == 3) {
-              probe.within(200.millis) {
-                probe.expectMsg(events.map(evt => toEw(evt, evt.sequenceNr)))
-                probe.expectMsg(
-                  ProgressWrite("p1", events.last.sequenceNr, 
events.last.sequenceNr, events.last.timeUuid))
-              }
+      val allEvents = (1 to 6).foldLeft(Vector.empty[Serialized]) {
+        case (acc, n) =>
+          val evt = event("p1", n, s"e-$n", bucket)
+          val events = acc :+ evt
+          ref ! TagWrite(tagName, Vector(evt))
+          Thread.sleep(200)
+          if (n == 3) {
+            probe.within(200.millis) {
+              probe.expectMsg(events.map(evt => toEw(evt, evt.sequenceNr)))
+              probe.expectMsg(
+                ProgressWrite("p1", events.last.sequenceNr, 
events.last.sequenceNr, events.last.timeUuid))
             }
-            events
-        }
+          }
+          events
+      }
 
       val remainingFlushedEvents = allEvents.drop(3)
       probe.expectMsg(remainingFlushedEvents.map(evt => toEw(evt, 
evt.sequenceNr)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to