This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new 8683f70 projection-r2dbc: support mysql (#495)
8683f70 is described below
commit 8683f703b00491f7e7dd3f8f73df44c24ad8b528
Author: PJ Fanning <[email protected]>
AuthorDate: Mon May 25 16:10:27 2026 +0100
projection-r2dbc: support mysql (#495)
* try mysql
* Update integration-tests-r2dbc.yml
* Update integration-tests-r2dbc.yml
* Update create_tables_mysql.sql
* Fix MySQL R2DBC issues: batch insert ordering and sql interpolation in
tests
* Fix MySQL R2DBC: move dialect-specific fixes into MySQLR2dbcOffsetStore
- Revert insertTimestampOffsetInTx in R2dbcOffsetStore back to original
add-before-bind ordering (correct for PostgreSQL/Yugabyte)
- Extract bindTimestampOffsetRecord as a protected helper method
- Extract executeDeleteOldTimestampOffsets as a protected hook method
- Make timestampOffsetTable, insertTimestampOffsetSql, and logger protected
so MySQL subclass can access them
- Override insertTimestampOffsetInTx in MySQLR2dbcOffsetStore with
bind-before-add ordering required by the MySQL R2DBC driver
- Override executeDeleteOldTimestampOffsets in MySQLR2dbcOffsetStore with
MySQL-compatible SQL: CONCAT() instead of ||, NOT IN (?,?,...) instead
of = ANY (?), dynamic placeholders for the exclusion list, and handles
the empty-exclusion-list case
* Fix compile error: replace trace2 with trace in MySQLR2dbcOffsetStore
* Fix MySQL test failures: identifier quoting and timestamp precision (#13)
* fix: MySQL test issues - double-quoted table name and timestamp precision
* fix: make createTableSql dialect-aware, preserving double-quote quoting
for postgres/yugabyte
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
* Fix savePaused to accept MySQL ON DUPLICATE KEY UPDATE row count and
include count in error (#14)
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
* Add MySQLR2dbcOffsetStoreSqlSpec unit tests for delete SQL correctness
(#15)
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
* Update MySQLR2dbcOffsetStoreSqlSpec.scala
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.github/workflows/integration-tests-r2dbc.yml | 51 ++++++++++
docker-files/docker-compose-mysql.yml | 30 ++++++
r2dbc-int-test/ddl-scripts/create_tables_mysql.sql | 112 +++++++++++++++++++++
.../r2dbc/EventSourcedEndToEndSpec.scala | 4 +-
.../r2dbc/MySQLR2dbcOffsetStoreSqlSpec.scala | 74 ++++++++++++++
.../projection/r2dbc/R2dbcOffsetStoreSpec.scala | 4 +-
.../projection/r2dbc/R2dbcProjectionSpec.scala | 14 ++-
.../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala | 2 +-
.../r2dbc/internal/R2dbcOffsetStore.scala | 78 +++++++-------
.../internal/mysql/MySQLR2dbcOffsetStore.scala | 93 +++++++++++++++++
10 files changed, 420 insertions(+), 42 deletions(-)
diff --git a/.github/workflows/integration-tests-r2dbc.yml
b/.github/workflows/integration-tests-r2dbc.yml
index 9920981..d5319f4 100644
--- a/.github/workflows/integration-tests-r2dbc.yml
+++ b/.github/workflows/integration-tests-r2dbc.yml
@@ -124,3 +124,54 @@ jobs:
- name: Print logs on failure
if: ${{ failure() }}
run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;
+
+ mysql-test:
+ name: Build and Test Integration for R2DBC with MySQL
+ runs-on: ubuntu-22.04
+ if: github.repository == 'apache/pekko-projection'
+ strategy:
+ fail-fast: false
+ matrix:
+ include:
+ - { java-version: 17, scala-version: 2.13, sbt-opts: '' }
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd #
v6.0.2
+ with:
+ fetch-depth: 0
+ fetch-tags: true
+
+ - name: Checkout GitHub merge
+ if: github.event.pull_request
+ run: |-
+ git fetch origin pull/${{ github.event.pull_request.number
}}/merge:scratch
+ git checkout scratch
+
+ - name: Cache Coursier cache
+ uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 #
v8.1.0
+
+ - name: Setup JDK ${{ matrix.java-version }}
+ uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 #
v5.2.0
+ with:
+ java-version: ${{ matrix.java-version }}
+ distribution: temurin
+
+ - name: Install sbt
+ uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+
+ - name: Start MySQL
+ run: |-
+ docker compose -f docker-files/docker-compose-mysql.yml up -d --wait
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root --database=mysql <
r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root -e 'CREATE SCHEMA database1;'
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root --database=database1 <
r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root -e 'CREATE SCHEMA database2;'
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root --database=database2 <
r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
+
+ - name: test
+ run: sbt -Dpekko.persistence.r2dbc.dialect=mysql
-Dpekko.projection.r2dbc.dialect=mysql ++${{ matrix.scala-version }}
"r2dbc-int-test/test" ${{ matrix.extraOpts }}
+
+ - name: Print logs on failure
+ if: ${{ failure() }}
+ run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;
diff --git a/docker-files/docker-compose-mysql.yml
b/docker-files/docker-compose-mysql.yml
new file mode 100644
index 0000000..df19260
--- /dev/null
+++ b/docker-files/docker-compose-mysql.yml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+ mysql-db:
+ image: mysql:9.5.0
+ container_name: docker-mysql-db-1
+ ports:
+ - 3306:3306
+ environment:
+ MYSQL_ROOT_PASSWORD: root
+ healthcheck:
+ test: [ "CMD", "mysqladmin", "--password=root", "ping", "-h",
"127.0.0.1" ]
+ interval: 1s
+ timeout: 1s
+ retries: 60
diff --git a/r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
b/r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
new file mode 100644
index 0000000..e006404
--- /dev/null
+++ b/r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
@@ -0,0 +1,112 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE TABLE IF NOT EXISTS event_journal(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ db_timestamp TIMESTAMP(6) NOT NULL,
+
+ event_ser_id INTEGER NOT NULL,
+ event_ser_manifest VARCHAR(255) NOT NULL,
+ event_payload BLOB NOT NULL,
+
+ deleted BOOLEAN DEFAULT FALSE NOT NULL,
+ writer VARCHAR(255) NOT NULL,
+ adapter_manifest VARCHAR(255),
+ tags JSON, -- stored as a JSON array of strings
+
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BLOB,
+
+ PRIMARY KEY(persistence_id, seq_nr)
+);
+
+-- `event_journal_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX event_journal_slice_idx ON event_journal(slice, entity_type,
db_timestamp, seq_nr);
+
+CREATE TABLE IF NOT EXISTS snapshot(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ write_timestamp BIGINT NOT NULL,
+ ser_id INTEGER NOT NULL,
+ ser_manifest VARCHAR(255) NOT NULL,
+ snapshot BLOB NOT NULL,
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BLOB,
+
+ PRIMARY KEY(persistence_id)
+);
+
+CREATE TABLE IF NOT EXISTS durable_state (
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ revision BIGINT NOT NULL,
+ db_timestamp TIMESTAMP(6) NOT NULL,
+
+ state_ser_id INTEGER NOT NULL,
+ state_ser_manifest VARCHAR(255),
+ state_payload BLOB NOT NULL,
+ tags JSON, -- stored as a JSON array of strings
+
+ PRIMARY KEY(persistence_id, revision)
+);
+
+-- `durable_state_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type,
db_timestamp, revision);
+
+-- Primitive offset types are stored in this table.
+-- If only timestamp based offsets are used this table is optional.
+-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table
is not created.
+CREATE TABLE IF NOT EXISTS projection_offset_store (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ current_offset VARCHAR(255) NOT NULL,
+ manifest VARCHAR(32) NOT NULL,
+ mergeable BOOLEAN NOT NULL,
+ last_updated BIGINT NOT NULL,
+ PRIMARY KEY(projection_name, projection_key)
+);
+
+-- Timestamp based offsets are stored in this table.
+CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ slice INT NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ -- timestamp_offset is the db_timestamp of the original event
+ timestamp_offset TIMESTAMP(6) NOT NULL,
+ -- timestamp_consumed is when the offset was stored
+ -- the consumer lag is timestamp_consumed - timestamp_offset
+ timestamp_consumed TIMESTAMP(6) NOT NULL,
+ PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
+);
+
+CREATE TABLE IF NOT EXISTS projection_management (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ paused BOOLEAN NOT NULL,
+ last_updated BIGINT NOT NULL,
+ PRIMARY KEY(projection_name, projection_key)
+);
diff --git
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index 5aa1c50..b8b2015 100644
---
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -32,7 +32,8 @@ import pekko.actor.typed.scaladsl.LoggerOps
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.QuerySettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
@@ -144,6 +145,7 @@ class EventSourcedEndToEndSpec
private val querySettings =
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
private val projectionSettings = R2dbcProjectionSettings(system)
private val stringSerializer =
SerializationExtension(system).serializerFor(classOf[String])
+ private implicit val dialect: Dialect = projectionSettings.dialect
override protected def beforeAll(): Unit = {
super.beforeAll()
diff --git
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/MySQLR2dbcOffsetStoreSqlSpec.scala
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/MySQLR2dbcOffsetStoreSqlSpec.scala
new file mode 100644
index 0000000..73f1f1e
--- /dev/null
+++
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/MySQLR2dbcOffsetStoreSqlSpec.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.projection.r2dbc
+
+import org.apache.pekko.projection.r2dbc.internal.mysql.MySQLR2dbcOffsetStore
+import org.scalatest.TestSuite
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+/**
+ * Unit tests for the SQL strings generated by [[MySQLR2dbcOffsetStore]].
+ *
+ * These tests run without a database and are intended to catch regressions
such as the
+ * `NOT CONCAT(...) IN` vs `CONCAT(...) NOT IN` operator-placement bug fixed in
+ * https://github.com/apache/pekko-projection/pull/495.
+ */
+class MySQLR2dbcOffsetStoreSqlSpec extends AnyWordSpec with TestSuite with
Matchers {
+
+ private val table = "projection.timestamp_offset"
+
+ "MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql" should {
+
+ "produce a simple DELETE when there are no exclusions" in {
+ val sql = MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(table,
0)
+ sql should include("DELETE FROM")
+ sql should include(table)
+ sql should include("timestamp_offset < ?")
+ (sql should not).include("NOT IN")
+ (sql should not).include("CONCAT")
+ }
+
+ "produce a DELETE with CONCAT…NOT IN for a single exclusion" in {
+ val sql = MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(table,
1)
+ sql should include("DELETE FROM")
+ sql should include(table)
+ sql should include("timestamp_offset < ?")
+ // Operator must be `CONCAT(…) NOT IN`, not `NOT CONCAT(…) IN`
+ sql should include("CONCAT(persistence_id, '-', seq_nr) NOT IN")
+ (sql should not).include("NOT CONCAT")
+ sql should endWith("NOT IN (?)")
+ }
+
+ "produce a DELETE with CONCAT…NOT IN for multiple exclusions" in {
+ val sql = MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(table,
3)
+ sql should include("DELETE FROM")
+ sql should include(table)
+ sql should include("timestamp_offset < ?")
+ sql should include("CONCAT(persistence_id, '-', seq_nr) NOT IN")
+ (sql should not).include("NOT CONCAT")
+ sql should endWith("NOT IN (?, ?, ?)")
+ }
+
+ "include the correct number of placeholders" in {
+ for (n <- 1 to 5) {
+ val sql =
MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(table, n)
+ val expectedPlaceholders = Seq.fill(n)("?").mkString(", ")
+ sql should include(s"NOT IN ($expectedPlaceholders)")
+ }
+ }
+
+ "include the table name in the generated SQL" in {
+ val customTable = "my_schema.my_projection_offset"
+ val sql =
MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(customTable, 2)
+ sql should startWith(s"DELETE FROM $customTable")
+ }
+ }
+}
diff --git
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index 0276314..0fab868 100644
---
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -22,7 +22,8 @@ import
pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorSystem
import pekko.persistence.query.Sequence
import pekko.persistence.query.TimeBasedUUID
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.projection.MergeableOffset
import pekko.projection.ProjectionId
import pekko.projection.internal.ManagementState
@@ -43,6 +44,7 @@ class R2dbcOffsetStoreSpec
private val clock = TestClock.nowMillis()
private val settings = R2dbcProjectionSettings(testKit.system)
+ private implicit val dialect: Dialect = settings.dialect
private def createOffsetStore(projectionId: ProjectionId) =
R2dbcOffsetStore.fromConfig(projectionId, None, system, settings,
r2dbcExecutor, clock)
diff --git
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
index 2aa118d..44e663f 100644
---
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
+++
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
@@ -33,7 +33,7 @@ import
pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.actor.typed.ActorSystem
import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.projection.HandlerRecoveryStrategy
import pekko.projection.OffsetVerification
@@ -97,12 +97,17 @@ object R2dbcProjectionSpec {
object TestRepository {
val table = "projection_spec_model"
- val createTableSql: String =
- s"""|CREATE table IF NOT EXISTS "$table" (
+ def createTableSql(dialect: Dialect): String = {
+ val quotedTable = dialect match {
+ case Dialect.MySQL => table
+ case _ => s""""$table""""
+ }
+ s"""|CREATE table IF NOT EXISTS $quotedTable (
| id VARCHAR(255) NOT NULL,
| concatenated VARCHAR(255) NOT NULL,
| PRIMARY KEY(id)
|);""".stripMargin
+ }
}
final case class TestRepository(session: R2dbcSession, settings:
R2dbcProjectionSettings)(
@@ -110,6 +115,7 @@ object R2dbcProjectionSpec {
import TestRepository.table
private val logger = LoggerFactory.getLogger(this.getClass)
+ private implicit val dialect: Dialect = settings.dialect
def concatToText(id: String, payload: String): Future[Done] = {
val savedStrOpt = findById(id)
@@ -200,7 +206,7 @@ class R2dbcProjectionSpec
super.beforeAll()
Await.result(r2dbcExecutor.executeDdl("beforeAll createTable") { conn =>
- conn.createStatement(TestRepository.createTableSql)
+ conn.createStatement(TestRepository.createTableSql(settings.dialect))
}, 10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete
from ${TestRepository.table}")),
diff --git
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index 0451967..349a726 100644
---
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -162,7 +162,7 @@ class R2dbcTimestampOffsetProjectionSpec
super.beforeAll()
Await.result(r2dbcExecutor.executeDdl("beforeAll createTable") { conn =>
- conn.createStatement(TestRepository.createTableSql)
+ conn.createStatement(TestRepository.createTableSql(settings.dialect))
}, 10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete
from ${TestRepository.table}")),
diff --git
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index 7f7c2e5..e7f14a0 100644
---
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -230,7 +230,7 @@ private[projection] class R2dbcOffsetStore(
protected lazy val timestampSql: String = "transaction_timestamp()"
// FIXME include projectionId in all log messages
- private val logger = LoggerFactory.getLogger(this.getClass)
+ protected val logger = LoggerFactory.getLogger(this.getClass)
private val persistenceExt = Persistence(system)
@@ -240,7 +240,7 @@ private[projection] class R2dbcOffsetStore(
import offsetSerialization.fromStorageRepresentation
import offsetSerialization.toStorageRepresentation
- private val timestampOffsetTable = settings.timestampOffsetTableWithSchema
+ protected val timestampOffsetTable: String =
settings.timestampOffsetTableWithSchema
protected val offsetTable: String = settings.offsetTableWithSchema
protected val managementTable: String = settings.managementTableWithSchema
@@ -250,7 +250,7 @@ private[projection] class R2dbcOffsetStore(
SELECT slice, persistence_id, seq_nr, timestamp_offset
FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name
= ?"""
- private val insertTimestampOffsetSql: String = sql"""
+ protected val insertTimestampOffsetSql: String = sql"""
INSERT INTO $timestampOffsetTable
(projection_name, projection_key, slice, persistence_id, seq_nr,
timestamp_offset, timestamp_consumed)
VALUES (?,?,?,?,?,?, $timestampSql)"""
@@ -559,25 +559,25 @@ private[projection] class R2dbcOffsetStore(
}
}
- private def insertTimestampOffsetInTx(conn: Connection, records:
immutable.IndexedSeq[Record]): Future[Long] = {
- def bindRecord(stmt: Statement, record: Record): Statement = {
- val slice = persistenceExt.sliceForPersistenceId(record.pid)
- val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
- val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
- if (slice < minSlice || slice > maxSlice)
- throw new IllegalArgumentException(
- s"This offset store [$projectionId] manages slices " +
- s"[$minSlice - $maxSlice] but received slice [$slice] for
persistenceId [${record.pid}]")
-
- stmt
- .bind(0, projectionId.name)
- .bind(1, projectionId.key)
- .bind(2, slice)
- .bind(3, record.pid)
- .bind(4, record.seqNr)
- .bind(5, record.timestamp)
- }
+ protected def bindTimestampOffsetRecord(stmt: Statement, record: Record):
Statement = {
+ val slice = persistenceExt.sliceForPersistenceId(record.pid)
+ val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
+ val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
+ if (slice < minSlice || slice > maxSlice)
+ throw new IllegalArgumentException(
+ s"This offset store [$projectionId] manages slices " +
+ s"[$minSlice - $maxSlice] but received slice [$slice] for
persistenceId [${record.pid}]")
+
+ stmt
+ .bind(0, projectionId.name)
+ .bind(1, projectionId.key)
+ .bind(2, slice)
+ .bind(3, record.pid)
+ .bind(4, record.seqNr)
+ .bind(5, record.timestamp)
+ }
+ protected def insertTimestampOffsetInTx(conn: Connection, records:
immutable.IndexedSeq[Record]): Future[Long] = {
require(records.nonEmpty)
logger.trace2("saving timestamp offset [{}], {}", records.last.timestamp,
records)
@@ -585,14 +585,14 @@ private[projection] class R2dbcOffsetStore(
val statement = conn.createStatement(insertTimestampOffsetSql)
if (records.size == 1) {
- val boundStatement = bindRecord(statement, records.head)
+ val boundStatement = bindTimestampOffsetRecord(statement, records.head)
R2dbcExecutor.updateOneInTx(boundStatement)
} else {
// TODO Try Batch without bind parameters for better performance. Risk
of sql injection for these parameters is low.
val boundStatement =
records.foldLeft(statement) { (stmt, rec) =>
stmt.add()
- bindRecord(stmt, rec)
+ bindTimestampOffsetRecord(stmt, rec)
}
R2dbcExecutor.updateBatchInTx(boundStatement)
}
@@ -885,15 +885,7 @@ private[projection] class R2dbcOffsetStore(
s"${record.pid}-${record.seqNr}"
}.toArray
- val result = r2dbcExecutor.updateOne("delete old timestamp offset") {
conn =>
- conn
- .createStatement(deleteOldTimestampOffsetSql)
- .bind(0, minSlice)
- .bind(1, maxSlice)
- .bind(2, projectionId.name)
- .bind(3, until)
- .bind(4, notInLatestBySlice)
- }
+ val result = executeDeleteOldTimestampOffsets(minSlice, maxSlice,
until, notInLatestBySlice)
result.failed.foreach { exc =>
idle.set(false) // try again next tick
@@ -917,6 +909,22 @@ private[projection] class R2dbcOffsetStore(
}
}
+ protected def executeDeleteOldTimestampOffsets(
+ minSlice: Int,
+ maxSlice: Int,
+ until: Instant,
+ notInLatestBySlice: Array[String]): Future[Long] = {
+ r2dbcExecutor.updateOne("delete old timestamp offset") { conn =>
+ conn
+ .createStatement(deleteOldTimestampOffsetSql)
+ .bind(0, minSlice)
+ .bind(1, maxSlice)
+ .bind(2, projectionId.name)
+ .bind(3, until)
+ .bind(4, notInLatestBySlice)
+ }
+ }
+
/**
* Resetting an offset. Deletes newer offsets. Used from
ProjectionManagement. Doesn't update in-memory state because
* the projection is supposed to be stopped/started for this operation.
@@ -1057,9 +1065,9 @@ private[projection] class R2dbcOffsetStore(
.bind(3, Instant.now(clock).toEpochMilli)
}
.flatMap {
- case i if i == 1 => Future.successful(Done)
- case _ =>
- Future.failed(new RuntimeException(s"Failed to update management
table for $projectionId"))
+ case i if i >= 1 => Future.successful(Done)
+ case i =>
+ Future.failed(new RuntimeException(s"Failed to update management
table for $projectionId, row count: $i"))
}
}
diff --git
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
index fe988c0..af70295 100644
---
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
+++
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
@@ -20,6 +20,10 @@
package org.apache.pekko.projection.r2dbc.internal.mysql
import java.time.Clock
+import java.time.Instant
+
+import scala.collection.immutable
+import scala.concurrent.Future
import org.apache.pekko
import pekko.actor.typed.ActorSystem
@@ -30,6 +34,29 @@ import pekko.projection.BySlicesSourceProvider
import pekko.projection.ProjectionId
import pekko.projection.r2dbc.R2dbcProjectionSettings
import pekko.projection.r2dbc.internal.R2dbcOffsetStore
+import io.r2dbc.spi.Connection
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[projection] object MySQLR2dbcOffsetStore {
+
+ /**
+ * Builds the SQL string for deleting old timestamp offsets. When
`notInCount` is 0 the plain
+ * deletion query (no exclusion list) is returned. When `notInCount > 0` the
query uses
+ * `CONCAT(persistence_id, '-', seq_nr) NOT IN (?, …)` — note the placement
of `NOT IN` rather
+ * than `NOT CONCAT(…) IN`, which is invalid SQL.
+ */
+ private[projection] def buildDeleteOldTimestampOffsetsSql(tableName: String,
notInCount: Int): String = {
+ if (notInCount == 0) {
+ s"DELETE FROM $tableName WHERE slice BETWEEN ? AND ? AND projection_name
= ? AND timestamp_offset < ?"
+ } else {
+ val placeholders = Seq.fill(notInCount)("?").mkString(", ")
+ s"DELETE FROM $tableName WHERE slice BETWEEN ? AND ? AND projection_name
= ? AND timestamp_offset < ? AND CONCAT(persistence_id, '-', seq_nr) NOT IN
($placeholders)"
+ }
+ }
+}
/**
* INTERNAL API
@@ -63,4 +90,70 @@ private[projection] class MySQLR2dbcOffsetStore(
ON DUPLICATE KEY UPDATE
paused = excluded.paused,
last_updated = excluded.last_updated"""
+
+ /**
+ * MySQL's r2dbc driver validates that all parameters are bound before
`add()` is called
+ * on a batch statement, unlike PostgreSQL's driver. We therefore bind the
first record
+ * before folding over the remaining records with `add()`.
+ */
+ override protected def insertTimestampOffsetInTx(
+ conn: Connection,
+ records: immutable.IndexedSeq[R2dbcOffsetStore.Record]): Future[Long] = {
+ require(records.nonEmpty)
+
+ logger.trace("saving timestamp offset [{}], {}", records.last.timestamp,
records)
+
+ val statement = conn.createStatement(insertTimestampOffsetSql)
+
+ if (records.size == 1) {
+ val boundStatement = bindTimestampOffsetRecord(statement, records.head)
+ R2dbcExecutor.updateOneInTx(boundStatement)
+ } else {
+ // Bind the first record before calling add() for the rest; MySQL
validates all parameters
+ // are bound on the current batch row before accepting add().
+ val boundStatement =
+ records.tail.foldLeft(bindTimestampOffsetRecord(statement,
records.head)) { (stmt, rec) =>
+ stmt.add()
+ bindTimestampOffsetRecord(stmt, rec)
+ }
+ R2dbcExecutor.updateBatchInTx(boundStatement)
+ }
+ }
+
+ /**
+ * MySQL does not support `= ANY (?)` with an array parameter or the `||`
string concatenation
+ * operator. This override builds the DELETE SQL dynamically using
`CONCAT()` and
+ * `NOT IN (?, ?, ...)` with one placeholder per exclusion entry.
+ */
+ override protected def executeDeleteOldTimestampOffsets(
+ minSlice: Int,
+ maxSlice: Int,
+ until: Instant,
+ notInLatestBySlice: Array[String]): Future[Long] = {
+ r2dbcExecutor.updateOne("delete old timestamp offset") { conn =>
+ val stmt = if (notInLatestBySlice.isEmpty) {
+ conn
+ .createStatement(
+
MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(timestampOffsetTable,
0))
+ .bind(0, minSlice)
+ .bind(1, maxSlice)
+ .bind(2, projectionId.name)
+ .bind(3, until)
+ } else {
+ val s = conn
+ .createStatement(
+ MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(
+ timestampOffsetTable,
+ notInLatestBySlice.length))
+ .bind(0, minSlice)
+ .bind(1, maxSlice)
+ .bind(2, projectionId.name)
+ .bind(3, until)
+ notInLatestBySlice.zipWithIndex.foldLeft(s) { case (st, (value, idx))
=>
+ st.bind(4 + idx, value)
+ }
+ }
+ stmt
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]