This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new e552ef4 Add support for querying last known sequence number by
persistenceId. (#267)
e552ef4 is described below
commit e552ef439388a7737619e6ca2b6cbe9fe8b088c3
Author: Siyavash Habashi <[email protected]>
AuthorDate: Sun Feb 16 13:08:20 2025 +0100
Add support for querying last known sequence number by persistenceId. (#267)
* Add support for querying last known sequence number by persistenceId.
* Align to existing code style.
* Update license headers for new files.
* Align to existing code style.
* Remove marker trait.
* mima issue
* Create CurrentLastKnownSequenceNumberByPersistenceIdTest.scala
* Return java.lang.Long value in javadsl.
---------
Co-authored-by: PJ Fanning <[email protected]>
---
.../LastKnownSequenceNumber.excludes | 19 +++++++
.../jdbc/query/dao/DefaultReadJournalDao.scala | 3 ++
.../jdbc/query/dao/ReadJournalDao.scala | 8 +++
.../jdbc/query/dao/ReadJournalQueries.scala | 7 +++
.../query/dao/legacy/ByteArrayReadJournalDao.scala | 3 ++
.../jdbc/query/dao/legacy/ReadJournalQueries.scala | 7 +++
.../jdbc/query/javadsl/JdbcReadJournal.scala | 18 +++++++
.../jdbc/query/scaladsl/JdbcReadJournal.scala | 9 ++++
...astKnownSequenceNumberByPersistenceIdTest.scala | 58 ++++++++++++++++++++++
.../persistence/jdbc/query/QueryTestSpec.scala | 3 ++
.../jdbc/query/dao/TestProbeReadJournalDao.scala | 2 +
...astKnownSequenceNumberByPersistenceIdTest.scala | 36 ++++++++++++++
12 files changed, 173 insertions(+)
diff --git
a/core/src/main/mima-filters/1.1.x.backwards.excludes/LastKnownSequenceNumber.excludes
b/core/src/main/mima-filters/1.1.x.backwards.excludes/LastKnownSequenceNumber.excludes
new file mode 100644
index 0000000..03f1fcf
--- /dev/null
+++
b/core/src/main/mima-filters/1.1.x.backwards.excludes/LastKnownSequenceNumber.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# https://github.com/apache/pekko-persistence-jdbc/pull/267
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.jdbc.query.dao.ReadJournalDao.lastPersistenceIdSequenceNumber")
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala
index df14153..33cd949 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/DefaultReadJournalDao.scala
@@ -60,6 +60,9 @@ class DefaultReadJournalDao(
override def journalSequence(offset: Long, limit: Long): Source[Long,
NotUsed] =
Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset,
limit)).result))
+ override def lastPersistenceIdSequenceNumber(persistenceId: String):
Future[Option[Long]] =
+ db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result)
+
override def maxJournalSequence(): Future[Long] =
db.run(queries.maxJournalSequenceQuery.result)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalDao.scala
index 10fe78a..e55ce88 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalDao.scala
@@ -49,6 +49,14 @@ trait ReadJournalDao extends JournalDaoWithReadMessages {
*/
def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed]
+ /**
+ * Returns the last known sequence number for the given `persistenceId`.
Empty if the `persistenceId` is unknown.
+ *
+ * @param persistenceId The `persistenceId` for which the last known
sequence number should be returned.
+ * @return Some sequence number or None if the `persistenceId` is unknown.
+ */
+ def lastPersistenceIdSequenceNumber(persistenceId: String):
Future[Option[Long]]
+
/**
* @return The value of the maximum (ordering) id in the journal
*/
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala
index 4d2884b..cbe11d0 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala
@@ -29,6 +29,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val
readJournalConfig: ReadJo
val messagesQuery = Compiled(_messagesQuery _)
val eventsByTag = Compiled(_eventsByTag _)
val journalSequenceQuery = Compiled(_journalSequenceQuery _)
+ val lastPersistenceIdSequenceNumberQuery =
Compiled(_lastPersistenceIdSequenceNumberQuery _)
val maxJournalSequenceQuery = Compiled {
JournalTable.map(_.ordering).max.getOrElse(0L)
}
@@ -43,6 +44,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val
readJournalConfig: ReadJo
baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
}
+ private def _lastPersistenceIdSequenceNumberQuery(persistenceId:
Rep[String]) =
+ baseTableQuery()
+ .filter(_.persistenceId === persistenceId)
+ .map(_.sequenceNumber)
+ .max
+
private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala
index 42a00d0..448390c 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala
@@ -59,6 +59,9 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with
BaseJournalDaoWith
.via(serializer.deserializeFlow)
}
+ override def lastPersistenceIdSequenceNumber(persistenceId: String):
Future[Option[Long]] =
+ db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result)
+
override def messages(
persistenceId: String,
fromSequenceNr: Long,
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala
index 7640678..12b6e24 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala
@@ -28,6 +28,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val
readJournalConfig: ReadJo
val messagesQuery = Compiled(_messagesQuery _)
val eventsByTag = Compiled(_eventsByTag _)
val journalSequenceQuery = Compiled(_journalSequenceQuery _)
+ val lastPersistenceIdSequenceNumberQuery =
Compiled(_lastPersistenceIdSequenceNumberQuery _)
val maxJournalSequenceQuery = Compiled {
JournalTable.map(_.ordering).max.getOrElse(0L)
}
@@ -38,6 +39,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val
readJournalConfig: ReadJo
private def _allPersistenceIdsDistinct(max: ConstColumn[Long]):
Query[Rep[String], String, Seq] =
baseTableQuery().map(_.persistenceId).distinct.take(max)
+ private def _lastPersistenceIdSequenceNumberQuery(persistenceId:
Rep[String]) =
+ baseTableQuery()
+ .filter(_.persistenceId === persistenceId)
+ .map(_.sequenceNumber)
+ .max
+
private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/javadsl/JdbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/javadsl/JdbcReadJournal.scala
index 871cfe6..e2009ae 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/javadsl/JdbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/javadsl/JdbcReadJournal.scala
@@ -15,11 +15,17 @@
package org.apache.pekko.persistence.jdbc.query.javadsl
import org.apache.pekko
+
+import java.util.Optional
+import java.util.concurrent.CompletionStage
+
import pekko.NotUsed
import pekko.persistence.jdbc.query.scaladsl.{ JdbcReadJournal =>
ScalaJdbcReadJournal }
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.persistence.query.javadsl._
import pekko.stream.javadsl.Source
+import pekko.util.FutureConverters._
+import pekko.util.OptionConverters._
object JdbcReadJournal {
final val Identifier = ScalaJdbcReadJournal.Identifier
@@ -132,4 +138,16 @@ class JdbcReadJournal(journal: ScalaJdbcReadJournal)
*/
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope,
NotUsed] =
journal.eventsByTag(tag, offset).asJava
+
+ /**
+ * Returns the last known sequence number for the given `persistenceId`.
Empty if the `persistenceId` is unknown.
+ *
+ * @param persistenceId The `persistenceId` for which the last known
sequence number should be returned.
+ * @return Some sequence number or None if the `persistenceId` is unknown.
+ */
+ def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String):
CompletionStage[Optional[java.lang.Long]] =
+ journal
+ .currentLastKnownSequenceNumberByPersistenceId(persistenceId)
+ .asJava
+ .thenApply(_.map(java.lang.Long.valueOf).toJava)
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
index 633644e..f55067e 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
@@ -317,4 +317,13 @@ class JdbcReadJournal(config: Config, configPath:
String)(implicit val system: E
def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
eventsByTag(tag, offset, terminateAfterOffset = None)
+
+ /**
+ * Returns the last known sequence number for the given `persistenceId`.
Empty if the `persistenceId` is unknown.
+ *
+ * @param persistenceId The `persistenceId` for which the last known
sequence number should be returned.
+ * @return Some sequence number or None if the `persistenceId` is unknown.
+ */
+ def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String):
Future[Option[Long]] =
+ readJournalDao.lastPersistenceIdSequenceNumber(persistenceId)
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala
new file mode 100644
index 0000000..47b5a5c
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.query
+
+import org.scalatest.concurrent.ScalaFutures
+
+abstract class CurrentLastKnownSequenceNumberByPersistenceIdTest(config:
String) extends QueryTestSpec(config)
+ with ScalaFutures {
+
+ it should "return None for unknown persistenceId" in withActorSystem {
implicit system =>
+ val journalOps = new ScalaJdbcReadJournalOperations(system)
+
+ journalOps
+ .currentLastKnownSequenceNumberByPersistenceId("unknown")
+ .futureValue shouldBe None
+ }
+
+ it should "return last sequence number for known persistenceId" in
withActorSystem { implicit system =>
+ val journalOps = new ScalaJdbcReadJournalOperations(system)
+
+ withTestActors() { (actor1, _, _) =>
+ actor1 ! 1
+ actor1 ! 2
+ actor1 ! 3
+ actor1 ! 4
+
+ eventually {
+ journalOps
+ .currentLastKnownSequenceNumberByPersistenceId("my-1")
+ .futureValue shouldBe Some(4)
+
+ // Just ensuring that query targets the correct persistenceId.
+ journalOps
+ .currentLastKnownSequenceNumberByPersistenceId("my-2")
+ .futureValue shouldBe None
+ }
+ }
+ }
+}
+
+class H2ScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
+ extends
CurrentLastKnownSequenceNumberByPersistenceIdTest("h2-shared-db-application.conf")
+ with H2Cleaner
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
index 509edac..0d3c65b 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
@@ -110,6 +110,9 @@ class ScalaJdbcReadJournalOperations(readJournal:
JdbcReadJournal)(implicit syst
tp.within(within)(f(tp))
}
+ def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String):
Future[Option[Long]] =
+ readJournal.currentLastKnownSequenceNumberByPersistenceId(persistenceId)
+
override def countJournal: Future[Long] =
readJournal
.currentPersistenceIds()
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala
index af3c551..cd15c57 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/dao/TestProbeReadJournalDao.scala
@@ -54,6 +54,8 @@ class TestProbeReadJournalDao(val probe: TestProbe) extends
ReadJournalDao {
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] =
???
+ override def lastPersistenceIdSequenceNumber(persistenceId: String):
Future[Option[Long]] = ???
+
/**
* Returns a Source of bytes for a certain persistenceId
*/
diff --git
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala
new file mode 100644
index 0000000..35eb0ea
--- /dev/null
+++
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/CurrentLastKnownSequenceNumberByPersistenceIdTest.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.persistence.jdbc.integration
+
+import org.apache.pekko.persistence.jdbc.query.{
+ CurrentLastKnownSequenceNumberByPersistenceIdTest,
+ MysqlCleaner,
+ OracleCleaner,
+ PostgresCleaner,
+ SqlServerCleaner
+}
+
+// Note: these tests use the shared-db configs, the test for all (so not only
current) events use the regular db config
+
+class PostgresScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
+ extends
CurrentLastKnownSequenceNumberByPersistenceIdTest("postgres-shared-db-application.conf")
+ with PostgresCleaner
+
+class MySQLScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
+ extends
CurrentLastKnownSequenceNumberByPersistenceIdTest("mysql-shared-db-application.conf")
+ with MysqlCleaner
+
+class OracleScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
+ extends
CurrentLastKnownSequenceNumberByPersistenceIdTest("oracle-shared-db-application.conf")
+ with OracleCleaner
+
+class SqlServerScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
+ extends
CurrentLastKnownSequenceNumberByPersistenceIdTest("sqlserver-shared-db-application.conf")
+ with SqlServerCleaner
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]