This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new e9bfe50 copy over changes from akka projections 1.4.1 (#504)
e9bfe50 is described below
commit e9bfe5051b814670b9b62650071271107b830e55
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 30 13:01:46 2026 +0100
copy over changes from akka projections 1.4.1 (#504)
* Port akka-projection PRs #898 and #908: batch offset inserts and fix
grouped async offsets
* scalafmt
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
---
.../internal/InternalProjectionState.scala | 10 +++-
.../pekko/projection/internal/OffsetStrategy.scala | 7 +++
.../jdbc/internal/JdbcProjectionImpl.scala | 8 +--
r2dbc/src/main/resources/reference.conf | 7 +++
.../projection/r2dbc/R2dbcProjectionSettings.scala | 27 +++++++---
.../r2dbc/internal/R2dbcOffsetStore.scala | 60 ++++++++++++++++------
.../r2dbc/internal/R2dbcProjectionImpl.scala | 19 ++++---
.../r2dbc/scaladsl/R2dbcProjection.scala | 4 +-
.../slick/internal/SlickProjectionImpl.scala | 6 ++-
9 files changed, 105 insertions(+), 43 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
b/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
index 904b64b..904faa0 100644
---
a/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
+++
b/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
@@ -236,7 +236,10 @@ private[projection] abstract class
InternalProjectionState[Offset, Envelope](
}
}
- private def exactlyOnceProcessing(
+ /**
+ * ExactlyOnce or async store of offset by the handler adapter.
+ */
+ private def offsetStoredByHandlerProcessing(
source: Source[ProjectionContextImpl[Offset, Envelope], NotUsed],
recoveryStrategy: HandlerRecoveryStrategy): Source[Done, NotUsed] = {
@@ -444,7 +447,10 @@ private[projection] abstract class
InternalProjectionState[Offset, Envelope](
val composedSource: Source[Done, NotUsed] =
offsetStrategy match {
case ExactlyOnce(recoveryStrategyOpt) =>
- exactlyOnceProcessing(source,
recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
+ offsetStoredByHandlerProcessing(source,
recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
+
+ case OffsetStoredByHandler(recoveryStrategyOpt) =>
+ offsetStoredByHandlerProcessing(source,
recoveryStrategyOpt.getOrElse(settings.recoveryStrategy))
case AtLeastOnce(afterEnvelopesOpt, orAfterDurationOpt,
recoveryStrategyOpt) =>
atLeastOnceProcessing(
diff --git
a/core/src/main/scala/org/apache/pekko/projection/internal/OffsetStrategy.scala
b/core/src/main/scala/org/apache/pekko/projection/internal/OffsetStrategy.scala
index 9c8ff4c..a693690 100644
---
a/core/src/main/scala/org/apache/pekko/projection/internal/OffsetStrategy.scala
+++
b/core/src/main/scala/org/apache/pekko/projection/internal/OffsetStrategy.scala
@@ -56,6 +56,13 @@ private[projection] final case class AtLeastOnce(
recoveryStrategy: Option[HandlerRecoveryStrategy] = None)
extends OffsetStrategy
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[projection] final case class OffsetStoredByHandler(recoveryStrategy:
Option[HandlerRecoveryStrategy] = None)
+ extends OffsetStrategy
+
/**
* INTERNAL API
*/
diff --git
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
index 7fbfd1a..3057e84 100644
---
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
+++
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
@@ -39,6 +39,7 @@ import pekko.projection.internal.HandlerStrategy
import pekko.projection.internal.InternalProjection
import pekko.projection.internal.InternalProjectionState
import pekko.projection.internal.ManagementState
+import pekko.projection.internal.OffsetStoredByHandler
import pekko.projection.internal.OffsetStrategy
import pekko.projection.internal.ProjectionSettings
import pekko.projection.internal.SettingsImpl
@@ -206,7 +207,7 @@ private[projection] class JdbcProjectionImpl[Offset,
Envelope, S <: JdbcSession]
.copy(afterEnvelopes = Some(afterEnvelopes), orAfterDuration =
Some(afterDuration)))
/**
- * Settings for GroupedSlickProjection
+ * Settings for GroupedJdbcProjection
*/
override def withGroup(
groupAfterEnvelopes: Int,
@@ -218,8 +219,9 @@ private[projection] class JdbcProjectionImpl[Offset,
Envelope, S <: JdbcSession]
override def withRecoveryStrategy(
recoveryStrategy: HandlerRecoveryStrategy): JdbcProjectionImpl[Offset,
Envelope, S] = {
val newStrategy = offsetStrategy match {
- case s: ExactlyOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
- case s: AtLeastOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
+ case s: ExactlyOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
+ case s: AtLeastOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
+ case s: OffsetStoredByHandler => s.copy(recoveryStrategy =
Some(recoveryStrategy))
// NOTE: AtMostOnce has its own withRecoveryStrategy variant
// this method is not available for AtMostOnceProjection
case s: AtMostOnce => s
diff --git a/r2dbc/src/main/resources/reference.conf
b/r2dbc/src/main/resources/reference.conf
index 6f8ff0b..7ca39ce 100644
--- a/r2dbc/src/main/resources/reference.conf
+++ b/r2dbc/src/main/resources/reference.conf
@@ -36,6 +36,13 @@ pekko.projection.r2dbc {
# Remove old entries outside the time-window from the offset store database
# with this frequency.
delete-interval = 1 minute
+
+ # Batch size for inserts of timestamp offsets. Can be used to improve
throughput for projections
+ # with many events per persistence id and that is using
`groupedWithinAsync`.
+ # For Postgres (and Yugabyte) this will use multi-row inserts for this
number of records. For MySQL
+ # this will use multiple single row inserts.
+ # Use 0 to disable batching and always use single row inserts.
+ offset-batch-size = 20
}
# By default it shares connection-factory with pekko-persistence-r2dbc
(write side),
diff --git
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
index 954b1cd..1f5aa2e 100644
---
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
+++
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
@@ -48,7 +48,8 @@ object R2dbcProjectionSettings {
evictInterval = config.getDuration("offset-store.evict-interval"),
deleteInterval = config.getDuration("offset-store.delete-interval"),
logDbCallsExceeding,
- warnAboutFilteredEventsInFlow =
config.getBoolean("warn-about-filtered-events-in-flow")
+ warnAboutFilteredEventsInFlow =
config.getBoolean("warn-about-filtered-events-in-flow"),
+ offsetBatchSize = config.getInt("offset-store.offset-batch-size")
)
}
@@ -100,7 +101,8 @@ object R2dbcProjectionSettings {
evictInterval,
deleteInterval,
logDbCallsExceeding,
- warnAboutFilteredEventsInFlow
+ warnAboutFilteredEventsInFlow,
+ offsetBatchSize = 10
)
}
@@ -116,12 +118,13 @@ final class R2dbcProjectionSettings private (
val evictInterval: JDuration,
val deleteInterval: JDuration,
val logDbCallsExceeding: FiniteDuration,
- val warnAboutFilteredEventsInFlow: Boolean
+ val warnAboutFilteredEventsInFlow: Boolean,
+ val offsetBatchSize: Int
) extends Serializable {
override def toString: String =
s"R2dbcProjectionSettings($dialect, $schema, $offsetTable,
$timestampOffsetTable, $managementTable, " +
- s"$useConnectionFactory, $timeWindow, $keepNumberOfEntries,
$evictInterval, $deleteInterval, $logDbCallsExceeding,
$warnAboutFilteredEventsInFlow)"
+ s"$useConnectionFactory, $timeWindow, $keepNumberOfEntries,
$evictInterval, $deleteInterval, $logDbCallsExceeding,
$warnAboutFilteredEventsInFlow, $offsetBatchSize)"
override def equals(other: Any): Boolean =
other match {
@@ -132,7 +135,8 @@ final class R2dbcProjectionSettings private (
timeWindow == that.timeWindow && keepNumberOfEntries ==
that.keepNumberOfEntries &&
evictInterval == that.evictInterval && deleteInterval ==
that.deleteInterval &&
logDbCallsExceeding == that.logDbCallsExceeding &&
- warnAboutFilteredEventsInFlow == that.warnAboutFilteredEventsInFlow
+ warnAboutFilteredEventsInFlow == that.warnAboutFilteredEventsInFlow &&
+ offsetBatchSize == that.offsetBatchSize
case _ => false
}
@@ -149,7 +153,8 @@ final class R2dbcProjectionSettings private (
evictInterval,
deleteInterval,
logDbCallsExceeding,
- warnAboutFilteredEventsInFlow
+ warnAboutFilteredEventsInFlow,
+ offsetBatchSize
)
val h = values.foldLeft(MurmurHash3.productSeed) { case (h, value) =>
MurmurHash3.mix(h, value.##)
@@ -169,7 +174,8 @@ final class R2dbcProjectionSettings private (
evictInterval: JDuration = evictInterval,
deleteInterval: JDuration = deleteInterval,
logDbCallsExceeding: FiniteDuration = logDbCallsExceeding,
- warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow
+ warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
+ offsetBatchSize: Int = offsetBatchSize
): R2dbcProjectionSettings =
new R2dbcProjectionSettings(
dialect,
@@ -183,7 +189,8 @@ final class R2dbcProjectionSettings private (
evictInterval,
deleteInterval,
logDbCallsExceeding,
- warnAboutFilteredEventsInFlow
+ warnAboutFilteredEventsInFlow,
+ offsetBatchSize
)
def withDialect(dialect: Dialect): R2dbcProjectionSettings =
@@ -222,6 +229,10 @@ final class R2dbcProjectionSettings private (
def withWarnAboutFilteredEventsInFlow(warnAboutFilteredEventsInFlow:
Boolean): R2dbcProjectionSettings =
copy(warnAboutFilteredEventsInFlow = warnAboutFilteredEventsInFlow)
+ /** @since 2.0.0 */
+ def withOffsetBatchSize(offsetBatchSize: Int): R2dbcProjectionSettings =
+ copy(offsetBatchSize = offsetBatchSize)
+
val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
offsetTable
val timestampOffsetTableWithSchema: String = schema.map(_ +
".").getOrElse("") + timestampOffsetTable
val managementTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
managementTable
diff --git
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index e7f14a0..a8d4231 100644
---
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -255,6 +255,14 @@ private[projection] class R2dbcOffsetStore(
(projection_name, projection_key, slice, persistence_id, seq_nr,
timestamp_offset, timestamp_consumed)
VALUES (?,?,?,?,?,?, $timestampSql)"""
+ private val insertTimestampOffsetBatchSql: String = {
+ val values = (1 to settings.offsetBatchSize).map(_ => s"(?,?,?,?,?,?,
$timestampSql)").mkString(", ")
+ sql"""
+ INSERT INTO $timestampOffsetTable
+ (projection_name, projection_key, slice, persistence_id, seq_nr,
timestamp_offset, timestamp_consumed)
+ VALUES $values"""
+ }
+
// delete less than a timestamp
private val deleteOldTimestampOffsetSql: String = sql"""
DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND
projection_name = ? AND timestamp_offset < ?
@@ -559,7 +567,7 @@ private[projection] class R2dbcOffsetStore(
}
}
- protected def bindTimestampOffsetRecord(stmt: Statement, record: Record):
Statement = {
+ protected def bindTimestampOffsetRecord(stmt: Statement, record: Record,
bindStartIndex: Int = 0): Statement = {
val slice = persistenceExt.sliceForPersistenceId(record.pid)
val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
@@ -569,12 +577,12 @@ private[projection] class R2dbcOffsetStore(
s"[$minSlice - $maxSlice] but received slice [$slice] for
persistenceId [${record.pid}]")
stmt
- .bind(0, projectionId.name)
- .bind(1, projectionId.key)
- .bind(2, slice)
- .bind(3, record.pid)
- .bind(4, record.seqNr)
- .bind(5, record.timestamp)
+ .bind(bindStartIndex, projectionId.name)
+ .bind(bindStartIndex + 1, projectionId.key)
+ .bind(bindStartIndex + 2, slice)
+ .bind(bindStartIndex + 3, record.pid)
+ .bind(bindStartIndex + 4, record.seqNr)
+ .bind(bindStartIndex + 5, record.timestamp)
}
protected def insertTimestampOffsetInTx(conn: Connection, records:
immutable.IndexedSeq[Record]): Future[Long] = {
@@ -582,19 +590,39 @@ private[projection] class R2dbcOffsetStore(
logger.trace2("saving timestamp offset [{}], {}", records.last.timestamp,
records)
- val statement = conn.createStatement(insertTimestampOffsetSql)
-
if (records.size == 1) {
+ val statement = conn.createStatement(insertTimestampOffsetSql)
val boundStatement = bindTimestampOffsetRecord(statement, records.head)
R2dbcExecutor.updateOneInTx(boundStatement)
} else {
- // TODO Try Batch without bind parameters for better performance. Risk
of sql injection for these parameters is low.
- val boundStatement =
- records.foldLeft(statement) { (stmt, rec) =>
- stmt.add()
- bindTimestampOffsetRecord(stmt, rec)
+ val batchSize = settings.offsetBatchSize
+ val batches = if (batchSize > 0) records.size / batchSize else 0
+ val batchResult = if (batches > 0) {
+ val batchStatements = (0 until batches).map { i =>
+ val stmt = conn.createStatement(insertTimestampOffsetBatchSql)
+ records.slice(i * batchSize, i * batchSize +
batchSize).zipWithIndex.foreach {
+ case (rec, recIdx) =>
+ bindTimestampOffsetRecord(stmt, rec, recIdx * 6) // 6 bind
parameters per record
+ }
+ stmt
}
- R2dbcExecutor.updateBatchInTx(boundStatement)
+ R2dbcExecutor.updateInTx(batchStatements).map(_.sum)
+ } else Future.successful(0L)
+
+ batchResult.flatMap { batchResultCount =>
+ val remainingRecords = records.drop(batches * batchSize)
+ if (remainingRecords.nonEmpty) {
+ val statement = conn.createStatement(insertTimestampOffsetSql)
+ val boundStatement = remainingRecords.foldLeft(statement) { (stmt,
rec) =>
+ stmt.add()
+ bindTimestampOffsetRecord(stmt, rec)
+ }
+ // This "batch" statement is not efficient, see issue #897
+ R2dbcExecutor
+ .updateBatchInTx(boundStatement)
+ .map(_ + batchResultCount)(ExecutionContext.parasitic)
+ } else Future.successful(batchResultCount)
+ }
}
}
@@ -1106,7 +1134,7 @@ private[projection] class R2dbcOffsetStore(
case change: DurableStateChange[_] if
change.offset.isInstanceOf[TimestampOffset] =>
// in case additional types are added
throw new IllegalArgumentException(
- s"DurableStateChange [${change.getClass.getName}] not implemented
yet. Please report bug at
https://github.com/apache/pekko-persistence-r2dbc/issues")
+ s"DurableStateChange [${change.getClass.getName}] not implemented
yet. Please report bug at https://github.com/apache/pekko-projection/issues")
case _ => None
}
}
diff --git
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index ab8cf1a..18d7fa2 100644
---
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -56,6 +56,7 @@ import pekko.projection.internal.HandlerStrategy
import pekko.projection.internal.InternalProjection
import pekko.projection.internal.InternalProjectionState
import pekko.projection.internal.ManagementState
+import pekko.projection.internal.OffsetStoredByHandler
import pekko.projection.internal.OffsetStrategy
import pekko.projection.internal.ProjectionContextImpl
import pekko.projection.internal.ProjectionSettings
@@ -392,17 +393,14 @@ private[projection] object R2dbcProjectionImpl {
} else {
Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env,
sourceProvider))).flatMap {
loadedEnvelopes =>
+ val offsets =
loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector
val filteredEnvelopes =
loadedEnvelopes.filterNot(isFilteredEvent)
if (filteredEnvelopes.isEmpty) {
- offsetStore.addInflights(loadedEnvelopes)
- FutureDone
+ offsetStore.saveOffsets(offsets)
} else {
- delegate
- .process(filteredEnvelopes)
- .map { _ =>
- offsetStore.addInflights(loadedEnvelopes)
- Done
- }
+ delegate.process(filteredEnvelopes).flatMap { _ =>
+ offsetStore.saveOffsets(offsets)
+ }
}
}
}
@@ -584,8 +582,9 @@ private[projection] class R2dbcProjectionImpl[Offset,
Envelope](
override def withRecoveryStrategy(
recoveryStrategy: HandlerRecoveryStrategy): R2dbcProjectionImpl[Offset,
Envelope] = {
val newStrategy = offsetStrategy match {
- case s: ExactlyOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
- case s: AtLeastOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
+ case s: ExactlyOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
+ case s: AtLeastOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
+ case s: OffsetStoredByHandler => s.copy(recoveryStrategy =
Some(recoveryStrategy))
// NOTE: AtMostOnce has its own withRecoveryStrategy variant
// this method is not available for AtMostOnceProjection
case s: AtMostOnce => s
diff --git
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala
index 5e48a76..c7c7246 100644
---
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala
+++
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala
@@ -14,7 +14,6 @@
package org.apache.pekko.projection.r2dbc.scaladsl
import scala.collection.immutable
-import scala.concurrent.duration.Duration
import org.apache.pekko
import pekko.Done
@@ -30,6 +29,7 @@ import pekko.projection.internal.ExactlyOnce
import pekko.projection.internal.FlowHandlerStrategy
import pekko.projection.internal.GroupedHandlerStrategy
import pekko.projection.internal.NoopStatusObserver
+import pekko.projection.internal.OffsetStoredByHandler
import pekko.projection.internal.SingleHandlerStrategy
import pekko.projection.r2dbc.R2dbcProjectionSettings
import pekko.projection.r2dbc.internal.R2dbcProjectionImpl
@@ -318,7 +318,7 @@ object R2dbcProjection {
settingsOpt = None,
sourceProvider,
restartBackoffOpt = None,
- offsetStrategy = AtLeastOnce(afterEnvelopes = Some(1), orAfterDuration =
Some(Duration.Zero)),
+ offsetStrategy = OffsetStoredByHandler(),
handlerStrategy = GroupedHandlerStrategy(adaptedHandler),
NoopStatusObserver,
offsetStore)
diff --git
a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
index 27c4671..bdb43be 100644
---
a/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
+++
b/slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickProjectionImpl.scala
@@ -38,6 +38,7 @@ import pekko.projection.internal.HandlerStrategy
import pekko.projection.internal.InternalProjection
import pekko.projection.internal.InternalProjectionState
import pekko.projection.internal.ManagementState
+import pekko.projection.internal.OffsetStoredByHandler
import pekko.projection.internal.OffsetStrategy
import pekko.projection.internal.ProjectionSettings
import pekko.projection.internal.SettingsImpl
@@ -138,8 +139,9 @@ private[projection] class SlickProjectionImpl[Offset,
Envelope, P <: JdbcProfile
recoveryStrategy: HandlerRecoveryStrategy): SlickProjectionImpl[Offset,
Envelope, P] = {
val newStrategy =
offsetStrategy match {
- case s: ExactlyOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
- case s: AtLeastOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
+ case s: ExactlyOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
+ case s: AtLeastOnce => s.copy(recoveryStrategy =
Some(recoveryStrategy))
+ case s: OffsetStoredByHandler => s.copy(recoveryStrategy =
Some(recoveryStrategy))
// NOTE: AtMostOnce has its own withRecoveryStrategy variant
// this method is not available for AtMostOnceProjection
case s: AtMostOnce => s
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]