This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 719e9eb  Core implementation for MySQL support (#175)
719e9eb is described below

commit 719e9eb6dd878409dc56036472d6d408030e08ba
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Fri Nov 15 19:23:09 2024 +0200

    Core implementation for MySQL support (#175)
    
    * Core implementation for MySQL support
    
    * Add documentation for mysql-specific configuration
    
    * Addressing various PR comments
    
    * Fix license headers
    
    * Remove spurious projection dependency on mysql
---
 .github/workflows/build-test.yml                   |  55 ++++++++++
 core/src/main/resources/reference.conf             |   2 +-
 .../r2dbc/ConnectionFactoryProvider.scala          |  28 ++++--
 .../pekko/persistence/r2dbc/R2dbcSettings.scala    |   4 +
 .../persistence/r2dbc/internal/R2dbcExecutor.scala |   3 +-
 .../pekko/persistence/r2dbc/internal/Sql.scala     |  30 +++++-
 .../persistence/r2dbc/journal/JournalDao.scala     |  52 +++++++---
 .../persistence/r2dbc/journal/R2dbcJournal.scala   |   6 +-
 .../r2dbc/journal/mysql/MySQLJournalDao.scala      |  70 +++++++++++++
 .../r2dbc/query/scaladsl/QueryDao.scala            |  36 +++++--
 .../r2dbc/query/scaladsl/R2dbcReadJournal.scala    |  11 +-
 .../r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala |  93 +++++++++++++++++
 .../r2dbc/snapshot/R2dbcSnapshotStore.scala        |   6 +-
 .../persistence/r2dbc/snapshot/SnapshotDao.scala   |  31 ++++--
 .../r2dbc/snapshot/mysql/MySQLSnapshotDao.scala    |  52 ++++++++++
 .../r2dbc/state/scaladsl/DurableStateDao.scala     |  41 ++++++--
 .../state/scaladsl/R2dbcDurableStateStore.scala    |  15 +--
 .../scaladsl/mysql/MySQLDurableStateDao.scala      |  92 +++++++++++++++++
 .../pekko/persistence/r2dbc/TestConfig.scala       |  15 +++
 .../r2dbc/journal/PersistTagsSpec.scala            |  11 ++
 .../query/EventsBySliceBacktrackingSpec.scala      |   5 +-
 ddl-scripts/create_tables_mysql.sql                | 112 +++++++++++++++++++++
 ddl-scripts/drop_tables_mysql.sql                  |  23 +++++
 docker/docker-compose-mysql.yml                    |  30 ++++++
 docs/src/main/paradox/connection-config.md         |   3 +
 docs/src/test/resources/application-mysql.conf     |  32 ++++++
 project/Dependencies.scala                         |   2 +
 27 files changed, 780 insertions(+), 80 deletions(-)

diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index a4696d6..6e7d7cd 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -132,6 +132,61 @@ jobs:
       - name: test
         run: sbt -Dpekko.persistence.r2dbc.dialect=yugabyte 
-Dpekko.projection.r2dbc.dialect=yugabyte ++${{ matrix.SCALA_VERSION }} test
 
+  test-mysql:
+    name: Run tests with MySQL
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        SCALA_VERSION: [ 2.12, 2.13, 3.3 ]
+        JAVA_VERSION: [ 11, 17, 21 ]
+        # only compiling on JDK 8, because certain tests depend on the higher 
timestamp precision added in JDK 9
+        include:
+          - JAVA_VERSION: 8
+            SCALA_VERSION: 2.12
+            COMPILE_ONLY: true
+          - JAVA_VERSION: 8
+            SCALA_VERSION: 2.13
+            COMPILE_ONLY: true
+          - JAVA_VERSION: 8
+            SCALA_VERSION: 3.3
+            COMPILE_ONLY: true
+    if: github.repository == 'apache/pekko-persistence-r2dbc'
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v4
+        with:
+          fetch-depth: 0
+          fetch-tags: true
+
+      - name: Checkout GitHub merge
+        if: github.event.pull_request
+        run: |-
+          git fetch origin pull/${{ github.event.pull_request.number 
}}/merge:scratch
+          git checkout scratch
+
+      - name: Setup Java ${{ matrix.JAVA_VERSION }}
+        uses: actions/setup-java@v4
+        with:
+          distribution: temurin
+          java-version: ${{ matrix.JAVA_VERSION }}
+
+      - name: Install sbt
+        uses: sbt/setup-sbt@v1
+
+      - name: Cache Coursier cache
+        uses: coursier/cache-action@v6
+
+      - name: Enable jvm-opts
+        run: cp .jvmopts-ci .jvmopts
+
+      - name: Start DB
+        run: |-
+          docker compose -f docker/docker-compose-mysql.yml up -d --wait
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
+
+      - name: test
+        run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ 
matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' || 
'test' }}
+
   test-docs:
     name: Docs
     runs-on: ubuntu-latest
diff --git a/core/src/main/resources/reference.conf 
b/core/src/main/resources/reference.conf
index 4e205ab..1419a80 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -101,7 +101,7 @@ pekko.persistence.r2dbc {
 // #connection-settings
 pekko.persistence.r2dbc {
 
-  # postgres or yugabyte
+  # postgres, yugabyte or mysql
   dialect = "postgres"
 
   # set this to your database schema if applicable, empty by default
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
index c8af66b..612e765 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
@@ -19,24 +19,25 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.util.{ Failure, Success }
-
 import com.typesafe.config.Config
+import io.r2dbc.pool.ConnectionPool
+import io.r2dbc.pool.ConnectionPoolConfiguration
+import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
+import io.r2dbc.postgresql.client.SSLMode
+import io.r2dbc.spi.ConnectionFactories
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.ConnectionFactoryOptions
+import io.r2dbc.spi.Option
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.CoordinatedShutdown
 import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.Extension
 import pekko.actor.typed.ExtensionId
-import pekko.persistence.r2dbc.ConnectionFactoryProvider.{ 
ConnectionFactoryOptionsCustomizer, NoopCustomizer }
+import 
pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer
+import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.util.ccompat.JavaConverters._
-import io.r2dbc.pool.ConnectionPool
-import io.r2dbc.pool.ConnectionPoolConfiguration
-import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
-import io.r2dbc.postgresql.client.SSLMode
-import io.r2dbc.spi.ConnectionFactories
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.ConnectionFactoryOptions
 
 object ConnectionFactoryProvider extends 
ExtensionId[ConnectionFactoryProvider] {
   def createExtension(system: ActorSystem[_]): ConnectionFactoryProvider = new 
ConnectionFactoryProvider(system)
@@ -149,6 +150,12 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) 
extends Extension {
         builder.option(PostgresqlConnectionFactoryProvider.SSL_ROOT_CERT, 
settings.sslRootCert)
     }
 
+    if (settings.driver == "mysql") {
+      // Either `connectionTimeZone = SERVER` or 
`forceConnectionTimeZoneToSession = true` need to be set for timezones to work 
correctly,
+      // likely caused by bug in 
https://github.com/asyncer-io/r2dbc-mysql/pull/240.
+      builder.option(Option.valueOf("connectionTimeZone"), "SERVER")
+    }
+
     ConnectionFactories.get(customizer(builder, config).build())
   }
 
@@ -158,7 +165,8 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) 
extends Extension {
     val connectionFactory = createConnectionFactory(settings, customizer, 
config)
 
     val evictionInterval = {
-      import settings.{ maxIdleTime, maxLifeTime }
+      import settings.maxIdleTime
+      import settings.maxLifeTime
       if (maxIdleTime <= Duration.Zero && maxLifeTime <= Duration.Zero) {
         JDuration.ZERO
       } else if (maxIdleTime <= Duration.Zero) {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index 49e4860..dd8856f 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -56,6 +56,7 @@ final class R2dbcSettings(config: Config) {
   val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match {
     case "yugabyte" => Dialect.Yugabyte
     case "postgres" => Dialect.Postgres
+    case "mysql"    => Dialect.MySQL
     case other =>
       throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported 
dialects are [yugabyte, postgres].")
   }
@@ -92,6 +93,9 @@ sealed trait Dialect
 object Dialect {
   case object Postgres extends Dialect
   case object Yugabyte extends Dialect
+
+  /** @since 1.1.0 */
+  case object MySQL extends Dialect
 }
 
 /**
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
index b757efe..894b8fa 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
@@ -325,7 +325,8 @@ class R2dbcExecutor(val connectionFactory: 
ConnectionFactory, log: Logger, logDb
           connection.close().asFutureDone().map { _ =>
             val durationMicros = durationInMicros(startTime)
             if (durationMicros >= logDbCallsExceedingMicros)
-              log.info("{} - DB call completed [{}] in [{}] µs", logPrefix, 
r.toString, durationMicros: java.lang.Long)
+              log.info("{} - DB call completed [{}] in [{}] µs", logPrefix, 
Option(r).map(_.toString).orNull,
+                durationMicros: java.lang.Long)
             r
           }
         }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
index f1884f4..affe56f 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
@@ -14,8 +14,10 @@
 package org.apache.pekko.persistence.r2dbc.internal
 
 import scala.annotation.varargs
-
-import org.apache.pekko.annotation.InternalStableApi
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.annotation.InternalStableApi
+import pekko.persistence.r2dbc.Dialect
 
 /**
  * INTERNAL API: Utility to format SQL strings. Replaces `?` with numbered 
`\$1`, `\$2` for bind parameters. Trims
@@ -24,6 +26,21 @@ import org.apache.pekko.annotation.InternalStableApi
 @InternalStableApi
 object Sql {
 
+  /**
+   * INTERNAL API
+   */
+  @InternalApi
+  private[r2dbc] implicit class DialectOps(dialect: Dialect) {
+    def replaceParameters(sql: String): String = {
+      dialect match {
+        case Dialect.Postgres | Dialect.Yugabyte =>
+          fillInParameterNumbers(sql)
+        case Dialect.MySQL =>
+          sql
+      }
+    }
+  }
+
   /**
    * Scala string interpolation with `sql` prefix. Replaces `?` with numbered 
`\$1`, `\$2` for bind parameters. Trims
    * whitespace, including line breaks. Standard string interpolation 
arguments `$` can be used.
@@ -33,6 +50,15 @@ object Sql {
       fillInParameterNumbers(trimLineBreaks(sc.s(args: _*)))
   }
 
+  /**
+   * INTERNAL API
+   */
+  @InternalApi
+  private[r2dbc] implicit class DialectInterpolation(val sc: StringContext) 
extends AnyVal {
+    def sql(args: Any*)(implicit dialect: Dialect): String =
+      dialect.replaceParameters(trimLineBreaks(sc.s(args: _*)))
+  }
+
   /**
    * Java API: Replaces `?` with numbered `\$1`, `\$2` for bind parameters. 
Trims whitespace, including line breaks. The
    * arguments are used like in [[java.lang.String.format]].
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 8ca0198..4434cfd 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -17,20 +17,22 @@ import java.time.Instant
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
 import pekko.dispatch.ExecutionContexts
 import pekko.persistence.Persistence
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
+import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
 import pekko.persistence.typed.PersistenceId
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.Row
-import io.r2dbc.spi.Statement
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -71,6 +73,19 @@ private[r2dbc] object JournalDao {
     }
   }
 
+  def fromConfig(
+      journalSettings: R2dbcSettings,
+      sharedConfigPath: String
+  )(implicit system: ActorSystem[_], ec: ExecutionContext): JournalDao = {
+    val connectionFactory =
+      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory")
+    journalSettings.dialect match {
+      case Dialect.Postgres | Dialect.Yugabyte =>
+        new JournalDao(journalSettings, connectionFactory)
+      case Dialect.MySQL =>
+        new MySQLJournalDao(journalSettings, connectionFactory)
+    }
+  }
 }
 
 /**
@@ -86,13 +101,16 @@ private[r2dbc] class JournalDao(journalSettings: 
R2dbcSettings, connectionFactor
   import JournalDao.SerializedJournalRow
   import JournalDao.log
 
+  implicit protected val dialect: Dialect = journalSettings.dialect
+  protected lazy val timestampSql: String = "transaction_timestamp()"
+
   private val persistenceExt = Persistence(system)
 
   private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, 
journalSettings.logDbCallsExceeding)(ec, system)
 
-  private val journalTable = journalSettings.journalTableWithSchema
+  protected val journalTable: String = journalSettings.journalTableWithSchema
 
-  private val (insertEventWithParameterTimestampSql, 
insertEventWithTransactionTimestampSql) = {
+  protected val (insertEventWithParameterTimestampSql: String, 
insertEventWithTransactionTimestampSql: String) = {
     val baseSql =
       s"INSERT INTO $journalTable " +
       "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, 
event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, 
meta_ser_manifest, meta_payload, db_timestamp) " +
@@ -132,7 +150,7 @@ private[r2dbc] class JournalDao(journalSettings: 
R2dbcSettings, connectionFactor
   private val insertDeleteMarkerSql = sql"""
     INSERT INTO $journalTable
     (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, 
adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
-    VALUES (?, ?, ?, ?, transaction_timestamp(), ?, ?, ?, ?, ?, ?)"""
+    VALUES (?, ?, ?, ?, $timestampSql, ?, ?, ?, ?, ?, ?)"""
 
   /**
    * All events must be for the same persistenceId.
@@ -217,12 +235,18 @@ private[r2dbc] class JournalDao(journalSettings: 
R2dbcSettings, connectionFactor
         result.foreach { _ =>
           log.debug("Wrote [{}] events for persistenceId [{}]", 1, 
events.head.persistenceId)
         }
-      result
+      if (useTimestampFromDb) {
+        result
+      } else {
+        result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
+      }
     } else {
       val result = r2dbcExecutor.updateInBatchReturning(s"batch insert 
[$persistenceId], [$totalEvents] events")(
         connection =>
-          events.foldLeft(connection.createStatement(insertSql)) { (stmt, 
write) =>
-            stmt.add()
+          events.zipWithIndex.foldLeft(connection.createStatement(insertSql)) 
{ case (stmt, (write, idx)) =>
+            if (idx != 0) {
+              stmt.add()
+            }
             bind(stmt, write)
           },
         row => row.get(0, classOf[Instant]))
@@ -230,7 +254,11 @@ private[r2dbc] class JournalDao(journalSettings: 
R2dbcSettings, connectionFactor
         result.foreach { _ =>
           log.debug("Wrote [{}] events for persistenceId [{}]", 1, 
events.head.persistenceId)
         }
-      result.map(_.head)(ExecutionContexts.parasitic)
+      if (useTimestampFromDb) {
+        result.map(_.head)(ExecutionContexts.parasitic)
+      } else {
+        result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
+      }
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
index eb111cf..d141ebd 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
@@ -36,7 +36,6 @@ import pekko.persistence.PersistentRepr
 import pekko.persistence.journal.AsyncWriteJournal
 import pekko.persistence.journal.Tagged
 import pekko.persistence.query.PersistenceQuery
-import pekko.persistence.r2dbc.ConnectionFactoryProvider
 import pekko.persistence.r2dbc.R2dbcSettings
 import pekko.persistence.r2dbc.internal.PubSub
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata
@@ -97,10 +96,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, 
cfgPath: String) extends
   private val serialization: Serialization = 
SerializationExtension(context.system)
   private val journalSettings = 
R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath))
 
-  private val journalDao =
-    new JournalDao(
-      journalSettings,
-      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory"))
+  private val journalDao = JournalDao.fromConfig(journalSettings, 
sharedConfigPath)
   private val query = 
PersistenceQuery(system).readJournalFor[R2dbcReadJournal](sharedConfigPath + 
".query")
 
   private val pubSub: Option[PubSub] =
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
new file mode 100644
index 0000000..9adba88
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.journal.mysql
+
+import scala.concurrent.ExecutionContext
+import io.r2dbc.spi.ConnectionFactory
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.JournalDao
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] object MySQLJournalDao {
+  def settingRequirements(journalSettings: R2dbcSettings): Unit = {
+    // Application timestamps are used because MySQL does not have 
transaction_timestamp like Postgres. In future releases
+    // they could be tried to be emulated, but the benefits are questionable - 
no matter where the timestamps are generated,
+    // risk of clock skews remains.
+    require(journalSettings.useAppTimestamp,
+      "use-app-timestamp config must be on for MySQL support")
+    // Supporting the non-monotonic increasing timestamps by incrementing the 
timestamp within the insert queries based on
+    // latest row in the database seems to cause deadlocks when running tests 
like PersistTimestampSpec. Possibly this could
+    // be fixed.
+    require(journalSettings.dbTimestampMonotonicIncreasing,
+      "db-timestamp-monotonic-increasing config must be on for MySQL support")
+    // Also, missing RETURNING implementation makes grabbing the timestamp 
generated by the database less efficient - this
+    // applies for both of the requirements above.
+  }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] class MySQLJournalDao(
+    journalSettings: R2dbcSettings,
+    connectionFactory: ConnectionFactory)(
+    implicit ec: ExecutionContext, system: ActorSystem[_]
+) extends JournalDao(journalSettings, connectionFactory) {
+  MySQLJournalDao.settingRequirements(journalSettings)
+
+  override lazy val timestampSql: String = "NOW(6)"
+
+  override val insertEventWithParameterTimestampSql: String =
+    sql"INSERT INTO $journalTable " +
+    "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, 
event_ser_id, event_ser_manifest, " +
+    "event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, 
db_timestamp) " +
+    s"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 0fff879..46e59b0 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -19,28 +19,42 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
-
+import io.r2dbc.spi.ConnectionFactory
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
-import pekko.persistence.Persistence
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
 import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.JournalDao
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
+import pekko.persistence.r2dbc.query.scaladsl.mysql.MySQLQueryDao
 import pekko.stream.scaladsl.Source
-import io.r2dbc.spi.ConnectionFactory
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
 object QueryDao {
   val log: Logger = LoggerFactory.getLogger(classOf[QueryDao])
+
+  def fromConfig(
+      journalSettings: R2dbcSettings,
+      sharedConfigPath: String
+  )(implicit system: ActorSystem[_], ec: ExecutionContext): QueryDao = {
+    val connectionFactory =
+      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory")
+    journalSettings.dialect match {
+      case Dialect.Postgres | Dialect.Yugabyte =>
+        new QueryDao(journalSettings, connectionFactory)
+      case Dialect.MySQL =>
+        new MySQLQueryDao(journalSettings, connectionFactory)
+    }
+  }
 }
 
 /**
@@ -54,12 +68,15 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
   import JournalDao.readMetadata
   import QueryDao.log
 
-  private val journalTable = settings.journalTableWithSchema
+  implicit protected val dialect: Dialect = settings.dialect
+  protected lazy val statementTimestampSql: String = "statement_timestamp()"
+
+  protected val journalTable = settings.journalTableWithSchema
 
   private val currentDbTimestampSql =
     "SELECT transaction_timestamp() AS db_timestamp"
 
-  private def eventsBySlicesRangeSql(
+  protected def eventsBySlicesRangeSql(
       toDbTimestampParam: Boolean,
       behindCurrentTime: FiniteDuration,
       backtracking: Boolean,
@@ -96,10 +113,11 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
     settings.dialect match {
       case Dialect.Yugabyte => s"slice BETWEEN $minSlice AND $maxSlice"
       case Dialect.Postgres => s"slice in (${(minSlice to 
maxSlice).mkString(",")})"
+      case unhandled        => throw new IllegalArgumentException(s"Unable to 
handle dialect [$unhandled]")
     }
   }
 
-  private def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
+  protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
     sql"""
       SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) 
AS count
       FROM $journalTable
@@ -116,12 +134,12 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
     WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
 
   private val selectOneEventSql = sql"""
-    SELECT slice, entity_type, db_timestamp, statement_timestamp() AS 
read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, 
meta_ser_id, meta_ser_manifest, meta_payload
+    SELECT slice, entity_type, db_timestamp, $statementTimestampSql AS 
read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, 
meta_ser_id, meta_ser_manifest, meta_payload
     FROM $journalTable
     WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
 
   private val selectEventsSql = sql"""
-    SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, 
statement_timestamp() AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, 
meta_payload
+    SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, 
meta_payload
     from $journalTable
     WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
     AND deleted = false
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 91ea310..4280f11 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -19,10 +19,11 @@ import scala.collection.immutable
 import scala.collection.mutable
 import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
-
+import com.typesafe.config.Config
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.actor.ExtendedActorSystem
+import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.pubsub.Topic
 import pekko.actor.typed.scaladsl.adapter._
 import pekko.annotation.InternalApi
@@ -48,7 +49,6 @@ import pekko.serialization.SerializationExtension
 import pekko.stream.OverflowStrategy
 import pekko.stream.scaladsl.Flow
 import pekko.stream.scaladsl.Source
-import com.typesafe.config.Config
 import org.slf4j.LoggerFactory
 
 object R2dbcReadJournal {
@@ -76,14 +76,13 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
   private val sharedConfigPath = cfgPath.replaceAll("""\.query$""", "")
   private val settings = 
R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))
 
-  private val typedSystem = system.toTyped
+  private implicit val typedSystem: ActorSystem[_] = system.toTyped
   import typedSystem.executionContext
   private val serialization = SerializationExtension(system)
   private val persistenceExt = Persistence(system)
   private val connectionFactory = ConnectionFactoryProvider(typedSystem)
     .connectionFactoryFor(sharedConfigPath + ".connection-factory")
-  private val queryDao =
-    new QueryDao(settings, connectionFactory)(typedSystem.executionContext, 
typedSystem)
+  private val queryDao = QueryDao.fromConfig(settings, sharedConfigPath)
 
   private val _bySlice: BySliceQuery[SerializedJournalRow, EventEnvelope[Any]] 
= {
     val createEnvelope: (TimestampOffset, SerializedJournalRow) => 
EventEnvelope[Any] = (offset, row) => {
@@ -108,7 +107,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
   private def bySlice[Event]: BySliceQuery[SerializedJournalRow, 
EventEnvelope[Event]] =
     _bySlice.asInstanceOf[BySliceQuery[SerializedJournalRow, 
EventEnvelope[Event]]]
 
-  private val journalDao = new JournalDao(settings, 
connectionFactory)(typedSystem.executionContext, typedSystem)
+  private val journalDao = JournalDao.fromConfig(settings, sharedConfigPath)
 
   def extractEntityTypeFromPersistenceId(persistenceId: String): String =
     PersistenceId.extractEntityType(persistenceId)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
new file mode 100644
index 0000000..8cbc5b6
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.query.scaladsl.mysql
+
+import java.time.Instant
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.FiniteDuration
+import io.r2dbc.spi.ConnectionFactory
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.query.scaladsl.QueryDao
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] class MySQLQueryDao(
+    journalSettings: R2dbcSettings,
+    connectionFactory: ConnectionFactory
+)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends 
QueryDao(journalSettings, connectionFactory) {
+
+  override lazy val statementTimestampSql: String = "NOW(6)"
+
+  override def eventsBySlicesRangeSql(
+      toDbTimestampParam: Boolean,
+      behindCurrentTime: FiniteDuration,
+      backtracking: Boolean,
+      minSlice: Int,
+      maxSlice: Int): String = {
+
+    def toDbTimestampParamCondition =
+      if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
+
+    def behindCurrentTimeIntervalCondition =
+      if (behindCurrentTime > Duration.Zero)
+        s"AND db_timestamp < DATE_SUB($statementTimestampSql, INTERVAL 
'${behindCurrentTime.toMicros}' MICROSECOND)"
+      else ""
+
+    val selectColumns = {
+      if (backtracking)
+        s"SELECT slice, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp "
+      else
+        s"SELECT slice, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, meta_ser_id, meta_ser_manifest, meta_payload "
+    }
+
+    sql"""
+      $selectColumns
+      FROM $journalTable
+      WHERE entity_type = ?
+      AND slice BETWEEN $minSlice AND $maxSlice
+      AND db_timestamp >= ? $toDbTimestampParamCondition 
$behindCurrentTimeIntervalCondition
+      AND deleted = false
+      ORDER BY db_timestamp, seq_nr
+      LIMIT ?"""
+  }
+
+  override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
+    sql"""
+      SELECT CAST(UNIX_TIMESTAMP(db_timestamp) AS SIGNED) / 10 AS bucket, 
count(*) AS count
+      FROM $journalTable
+      WHERE entity_type = ?
+      AND slice BETWEEN $minSlice AND $maxSlice
+      AND db_timestamp >= ? AND db_timestamp <= ?
+      AND deleted = false
+      GROUP BY bucket ORDER BY bucket LIMIT ?
+      """
+  }
+
+  override def currentDbTimestamp(): Future[Instant] = 
Future.successful(Instant.now())
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
index 9402583..7d637ad 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
@@ -17,7 +17,7 @@ import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.scaladsl.adapter._
 import pekko.persistence.{ SelectedSnapshot, SnapshotMetadata, 
SnapshotSelectionCriteria }
-import pekko.persistence.r2dbc.{ ConnectionFactoryProvider, R2dbcSettings }
+import pekko.persistence.r2dbc.R2dbcSettings
 import pekko.persistence.snapshot.SnapshotStore
 import pekko.serialization.{ Serialization, SerializationExtension }
 import com.typesafe.config.Config
@@ -59,9 +59,7 @@ private[r2dbc] final class R2dbcSnapshotStore(cfg: Config, 
cfgPath: String) exte
   private val dao = {
     val sharedConfigPath = cfgPath.replaceAll("""\.snapshot$""", "")
     val settings = 
R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath))
-    new SnapshotDao(
-      settings,
-      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory"))
+    SnapshotDao.fromConfig(settings, sharedConfigPath)
   }
 
   def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): 
Future[Option[SelectedSnapshot]] =
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
index 16968b3..043ff6e 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
@@ -15,19 +15,21 @@ package org.apache.pekko.persistence.r2dbc.snapshot
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
 import pekko.dispatch.ExecutionContexts
 import pekko.persistence.Persistence
 import pekko.persistence.SnapshotSelectionCriteria
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
+import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.snapshot.mysql.MySQLSnapshotDao
 import pekko.persistence.typed.PersistenceId
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.Row
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -66,6 +68,19 @@ private[r2dbc] object SnapshotDao {
               row.get("meta_ser_manifest", classOf[String])))
       })
 
+  def fromConfig(
+      journalSettings: R2dbcSettings,
+      sharedConfigPath: String
+  )(implicit system: ActorSystem[_], ec: ExecutionContext): SnapshotDao = {
+    val connectionFactory =
+      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory")
+    journalSettings.dialect match {
+      case Dialect.Postgres | Dialect.Yugabyte =>
+        new SnapshotDao(journalSettings, connectionFactory)
+      case Dialect.MySQL =>
+        new MySQLSnapshotDao(journalSettings, connectionFactory)
+    }
+  }
 }
 
 /**
@@ -74,16 +89,18 @@ private[r2dbc] object SnapshotDao {
  * Class for doing db interaction outside of an actor to avoid mistakes in 
future callbacks
  */
 @InternalApi
-private[r2dbc] final class SnapshotDao(settings: R2dbcSettings, 
connectionFactory: ConnectionFactory)(implicit
+private[r2dbc] class SnapshotDao(settings: R2dbcSettings, connectionFactory: 
ConnectionFactory)(implicit
     ec: ExecutionContext,
     system: ActorSystem[_]) {
   import SnapshotDao._
 
-  private val snapshotTable = settings.snapshotsTableWithSchema
+  implicit protected val dialect: Dialect = settings.dialect
+
+  protected val snapshotTable: String = settings.snapshotsTableWithSchema
   private val persistenceExt = Persistence(system)
   private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(ec, system)
 
-  private val upsertSql = sql"""
+  protected val upsertSql = sql"""
     INSERT INTO $snapshotTable
     (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, 
ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest)
     VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
new file mode 100644
index 0000000..e725168
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.snapshot.mysql
+
+import scala.concurrent.ExecutionContext
+import io.r2dbc.spi.ConnectionFactory
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.snapshot.SnapshotDao
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] class MySQLSnapshotDao(
+    settings: R2dbcSettings, connectionFactory: ConnectionFactory
+)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends 
SnapshotDao(settings, connectionFactory) {
+
+  override val upsertSql = sql"""
+    INSERT INTO $snapshotTable
+    (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, 
ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest)
+    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) AS excluded
+    ON DUPLICATE KEY UPDATE
+      seq_nr = excluded.seq_nr,
+      write_timestamp = excluded.write_timestamp,
+      snapshot = excluded.snapshot,
+      ser_id = excluded.ser_id,
+      ser_manifest = excluded.ser_manifest,
+      meta_payload = excluded.meta_payload,
+      meta_ser_id = excluded.meta_ser_id,
+      meta_ser_manifest = excluded.meta_ser_manifest"""
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 03cdc88..80ef95b 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -19,7 +19,9 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
-
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.R2dbcDataIntegrityViolationException
+import io.r2dbc.spi.Statement
 import org.apache.pekko
 import pekko.Done
 import pekko.NotUsed
@@ -27,18 +29,17 @@ import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
 import pekko.dispatch.ExecutionContexts
 import pekko.persistence.Persistence
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
 import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
 import pekko.persistence.typed.PersistenceId
 import pekko.stream.scaladsl.Source
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.R2dbcDataIntegrityViolationException
-import io.r2dbc.spi.Statement
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -61,6 +62,20 @@ import org.slf4j.LoggerFactory
       extends BySliceQuery.SerializedRow {
     override def seqNr: Long = revision
   }
+
+  def fromConfig(
+      journalSettings: R2dbcSettings,
+      sharedConfigPath: String
+  )(implicit system: ActorSystem[_], ec: ExecutionContext): DurableStateDao = {
+    val connectionFactory =
+      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory")
+    journalSettings.dialect match {
+      case Dialect.Postgres | Dialect.Yugabyte =>
+        new DurableStateDao(journalSettings, connectionFactory)
+      case Dialect.MySQL =>
+        new MySQLDurableStateDao(journalSettings, connectionFactory)
+    }
+  }
 }
 
 /**
@@ -75,16 +90,19 @@ private[r2dbc] class DurableStateDao(settings: 
R2dbcSettings, connectionFactory:
     extends BySliceQuery.Dao[DurableStateDao.SerializedStateRow] {
   import DurableStateDao._
 
+  implicit protected val dialect: Dialect = settings.dialect
+  protected lazy val transactionTimestampSql: String = 
"transaction_timestamp()"
+
   private val persistenceExt = Persistence(system)
   private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(ec, system)
 
-  private val stateTable = settings.durableStateTableWithSchema
+  protected val stateTable = settings.durableStateTableWithSchema
 
   private val selectStateSql: String = sql"""
     SELECT revision, state_ser_id, state_ser_manifest, state_payload, 
db_timestamp
     FROM $stateTable WHERE persistence_id = ?"""
 
-  private def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
+  protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
     sql"""
      SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) 
AS count
      FROM $stateTable
@@ -99,20 +117,21 @@ private[r2dbc] class DurableStateDao(settings: 
R2dbcSettings, connectionFactory:
     settings.dialect match {
       case Dialect.Yugabyte => s"slice BETWEEN $minSlice AND $maxSlice"
       case Dialect.Postgres => s"slice in (${(minSlice to 
maxSlice).mkString(",")})"
+      case unhandled        => throw new IllegalArgumentException(s"Unable to 
handle dialect [$unhandled]")
     }
   }
 
   private val insertStateSql: String = sql"""
     INSERT INTO $stateTable
     (slice, entity_type, persistence_id, revision, state_ser_id, 
state_ser_manifest, state_payload, tags, db_timestamp)
-    VALUES (?, ?, ?, ?, ?, ?, ?, ?, transaction_timestamp())"""
+    VALUES (?, ?, ?, ?, ?, ?, ?, ?, $transactionTimestampSql)"""
 
   private val updateStateSql: String = {
     val timestamp =
       if (settings.dbTimestampMonotonicIncreasing)
-        "transaction_timestamp()"
+        s"$transactionTimestampSql"
       else
-        "GREATEST(transaction_timestamp(), " +
+        s"GREATEST($transactionTimestampSql, " +
         s"(SELECT db_timestamp + '1 microsecond'::interval FROM $stateTable 
WHERE persistence_id = ? AND revision = ?))"
 
     val revisionCondition =
@@ -141,7 +160,7 @@ private[r2dbc] class DurableStateDao(settings: 
R2dbcSettings, connectionFactory:
   private val allPersistenceIdsAfterSql =
     sql"SELECT persistence_id from $stateTable WHERE persistence_id > ? ORDER 
BY persistence_id LIMIT ?"
 
-  private def stateBySlicesRangeSql(
+  protected def stateBySlicesRangeSql(
       maxDbTimestampParam: Boolean,
       behindCurrentTime: FiniteDuration,
       backtracking: Boolean,
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index 546f954..4d7de65 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -16,11 +16,12 @@ package org.apache.pekko.persistence.r2dbc.state.scaladsl
 import scala.collection.immutable
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-
+import com.typesafe.config.Config
 import org.apache.pekko
 import pekko.Done
 import pekko.NotUsed
 import pekko.actor.ExtendedActorSystem
+import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.scaladsl.adapter._
 import pekko.dispatch.ExecutionContexts
 import pekko.persistence.Persistence
@@ -30,7 +31,6 @@ import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.UpdatedDurableState
 import 
pekko.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery
 import pekko.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
-import pekko.persistence.r2dbc.ConnectionFactoryProvider
 import pekko.persistence.r2dbc.R2dbcSettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.ContinuousQuery
@@ -40,7 +40,6 @@ import pekko.persistence.state.scaladsl.GetObjectResult
 import pekko.serialization.SerializationExtension
 import pekko.serialization.Serializers
 import pekko.stream.scaladsl.Source
-import com.typesafe.config.Config
 import org.slf4j.LoggerFactory
 
 object R2dbcDurableStateStore {
@@ -59,15 +58,11 @@ class R2dbcDurableStateStore[A](system: 
ExtendedActorSystem, config: Config, cfg
   private val sharedConfigPath = cfgPath.replaceAll("""\.state$""", "")
   private val settings = 
R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))
 
-  private val typedSystem = system.toTyped
+  private implicit val typedSystem: ActorSystem[_] = system.toTyped
+  implicit val ec: ExecutionContext = system.dispatcher
   private val serialization = SerializationExtension(system)
   private val persistenceExt = Persistence(system)
-  private val stateDao =
-    new DurableStateDao(
-      settings,
-      
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(sharedConfigPath + 
".connection-factory"))(
-      typedSystem.executionContext,
-      typedSystem)
+  private val stateDao = DurableStateDao.fromConfig(settings, sharedConfigPath)
 
   private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] 
= {
     val createEnvelope: (TimestampOffset, SerializedStateRow) => 
DurableStateChange[A] = (offset, row) => {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
new file mode 100644
index 0000000..385dc21
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.state.scaladsl.mysql
+
+import java.time.Instant
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.FiniteDuration
+import io.r2dbc.spi.ConnectionFactory
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
+import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] class MySQLDurableStateDao(
+    settings: R2dbcSettings,
+    connectionFactory: ConnectionFactory
+)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends 
DurableStateDao(settings, connectionFactory) {
+  MySQLJournalDao.settingRequirements(settings)
+
+  override lazy val transactionTimestampSql: String = "NOW(6)"
+
+  override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
+    sql"""
+     SELECT CAST(UNIX_TIMESTAMP(db_timestamp) AS SIGNED) / 10 AS bucket, 
count(*) AS count
+     FROM $stateTable
+     WHERE entity_type = ?
+     AND slice BETWEEN $minSlice AND $maxSlice
+     AND db_timestamp >= ? AND db_timestamp <= ?
+     GROUP BY bucket ORDER BY bucket LIMIT ?
+     """
+  }
+
+  override def stateBySlicesRangeSql(
+      maxDbTimestampParam: Boolean,
+      behindCurrentTime: FiniteDuration,
+      backtracking: Boolean,
+      minSlice: Int,
+      maxSlice: Int): String = {
+
+    def maxDbTimestampParamCondition =
+      if (maxDbTimestampParam) s"AND db_timestamp < ?" else ""
+
+    def behindCurrentTimeIntervalCondition =
+      if (behindCurrentTime > Duration.Zero)
+        s"AND db_timestamp < DATE_SUB(NOW(6), INTERVAL 
'${behindCurrentTime.toMicros}' MICROSECOND)"
+      else ""
+
+    val selectColumns =
+      if (backtracking)
+        "SELECT persistence_id, revision, db_timestamp, NOW(6) AS 
read_db_timestamp "
+      else
+        "SELECT persistence_id, revision, db_timestamp, NOW(6) AS 
read_db_timestamp, state_ser_id, state_ser_manifest, state_payload "
+
+    sql"""
+      $selectColumns
+      FROM $stateTable
+      WHERE entity_type = ?
+      AND slice BETWEEN $minSlice AND $maxSlice
+      AND db_timestamp >= ? $maxDbTimestampParamCondition 
$behindCurrentTimeIntervalCondition
+      ORDER BY db_timestamp, revision
+      LIMIT ?"""
+  }
+
+  override def currentDbTimestamp(): Future[Instant] = 
Future.successful(Instant.now())
+}
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
index e54fd8b..161262e 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
@@ -44,6 +44,21 @@ object TestConfig {
             database = "yugabyte"
           }
           """)
+      case "mysql" =>
+        ConfigFactory.parseString("""
+          pekko.persistence.r2dbc{
+            connection-factory {
+              driver = "mysql"
+              host = "localhost"
+              port = 3306
+              user = "root"
+              password = "root"
+              database = "mysql"
+            }
+            db-timestamp-monotonic-increasing = on
+            use-app-timestamp = on
+          }
+          """)
     }
 
     // using load here so that connection-factory can be overridden
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
index 7a490ed..d0c6fcc 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
@@ -40,7 +40,18 @@ class PersistTagsSpec
 
   case class Row(pid: String, seqNr: Long, tags: Set[String])
 
+  private lazy val dialect = 
system.settings.config.getString("pekko.persistence.r2dbc.dialect")
+
+  private lazy val testEnabled: Boolean = {
+    // tags are not implemented for MySQL
+    dialect != "mysql"
+  }
+
   "Persist tags" should {
+    if (!testEnabled) {
+      info(s"PersistTagsSpec not enabled for $dialect")
+      pending
+    }
 
     "be the same for events stored in same transaction" in {
       val numberOfEntities = 9
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index a88010f..d980798 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -16,7 +16,6 @@ package org.apache.pekko.persistence.r2dbc.query
 import java.time.Instant
 
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@@ -24,11 +23,12 @@ import pekko.actor.typed.ActorSystem
 import pekko.persistence.query.NoOffset
 import pekko.persistence.query.PersistenceQuery
 import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
 import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
 import pekko.serialization.SerializationExtension
@@ -55,6 +55,7 @@ class EventsBySliceBacktrackingSpec
   // to be able to store events with specific timestamps
   private def writeEvent(slice: Int, persistenceId: String, seqNr: Long, 
timestamp: Instant, event: String): Unit = {
     log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, 
seqNr: java.lang.Long, event, timestamp)
+    implicit val dialect: Dialect = settings.dialect
     val insertEventSql = sql"""
       INSERT INTO ${settings.journalTableWithSchema}
       (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, 
adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
diff --git a/ddl-scripts/create_tables_mysql.sql 
b/ddl-scripts/create_tables_mysql.sql
new file mode 100644
index 0000000..67554e7
--- /dev/null
+++ b/ddl-scripts/create_tables_mysql.sql
@@ -0,0 +1,112 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE TABLE IF NOT EXISTS event_journal(
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  db_timestamp TIMESTAMP(6) NOT NULL,
+
+  event_ser_id INTEGER NOT NULL,
+  event_ser_manifest VARCHAR(255) NOT NULL,
+  event_payload BLOB NOT NULL,
+
+  deleted BOOLEAN DEFAULT FALSE NOT NULL,
+  writer VARCHAR(255) NOT NULL,
+  adapter_manifest VARCHAR(255),
+  tags TEXT, -- FIXME no array type, is this the best option?
+
+  meta_ser_id INTEGER,
+  meta_ser_manifest VARCHAR(255),
+  meta_payload BLOB,
+
+  PRIMARY KEY(persistence_id, seq_nr)
+);
+
+-- `event_journal_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX event_journal_slice_idx ON event_journal(slice, entity_type, 
db_timestamp, seq_nr);
+
+CREATE TABLE IF NOT EXISTS snapshot(
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  write_timestamp BIGINT NOT NULL,
+  ser_id INTEGER NOT NULL,
+  ser_manifest VARCHAR(255) NOT NULL,
+  snapshot BLOB NOT NULL,
+  meta_ser_id INTEGER,
+  meta_ser_manifest VARCHAR(255),
+  meta_payload BLOB,
+
+  PRIMARY KEY(persistence_id)
+);
+
+CREATE TABLE IF NOT EXISTS durable_state (
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  revision BIGINT NOT NULL,
+  db_timestamp TIMESTAMP(6) NOT NULL,
+
+  state_ser_id INTEGER NOT NULL,
+  state_ser_manifest VARCHAR(255),
+  state_payload BLOB NOT NULL,
+  tags TEXT, -- FIXME no array type, is this the best option?
+
+  PRIMARY KEY(persistence_id, revision)
+);
+
+-- `durable_state_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type, 
db_timestamp, revision);
+
+-- Primitive offset types are stored in this table.
+-- If only timestamp based offsets are used this table is optional.
+-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table 
is not created.
+CREATE TABLE IF NOT EXISTS projection_offset_store (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  current_offset VARCHAR(255) NOT NULL,
+  manifest VARCHAR(32) NOT NULL,
+  mergeable BOOLEAN NOT NULL,
+  last_updated BIGINT NOT NULL,
+  PRIMARY KEY(projection_name, projection_key)
+);
+
+-- Timestamp based offsets are stored in this table.
+CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  slice INT NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  -- timestamp_offset is the db_timestamp of the original event
+  timestamp_offset TIMESTAMP(6) NOT NULL,
+  -- timestamp_consumed is when the offset was stored
+  -- the consumer lag is timestamp_consumed - timestamp_offset
+  timestamp_consumed TIMESTAMP(6) NOT NULL,
+  PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
+);
+
+CREATE TABLE IF NOT EXISTS projection_management (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  paused BOOLEAN NOT NULL,
+  last_updated BIGINT NOT NULL,
+  PRIMARY KEY(projection_name, projection_key)
+);
diff --git a/ddl-scripts/drop_tables_mysql.sql 
b/ddl-scripts/drop_tables_mysql.sql
new file mode 100644
index 0000000..ecef573
--- /dev/null
+++ b/ddl-scripts/drop_tables_mysql.sql
@@ -0,0 +1,23 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+DROP TABLE IF EXISTS event_journal;
+DROP TABLE IF EXISTS snapshot;
+DROP TABLE IF EXISTS durable_state;
+DROP TABLE IF EXISTS projection_offset_store;
+DROP TABLE IF EXISTS projection_timestamp_offset_store;
+DROP TABLE IF EXISTS projection_management;
diff --git a/docker/docker-compose-mysql.yml b/docker/docker-compose-mysql.yml
new file mode 100644
index 0000000..3a7c069
--- /dev/null
+++ b/docker/docker-compose-mysql.yml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+  mysql-db:
+    image: mysql:9.1.0
+    container_name: docker-mysql-db-1
+    ports:
+      - 3306:3306
+    environment:
+      MYSQL_ROOT_PASSWORD: root
+    healthcheck:
+      test: [ "CMD", "mysqladmin", "--password=root", "ping", "-h", 
"127.0.0.1" ]
+      interval: 1s
+      timeout: 1s
+      retries: 60
diff --git a/docs/src/main/paradox/connection-config.md 
b/docs/src/main/paradox/connection-config.md
index 8f6dc82..c96a6b6 100644
--- a/docs/src/main/paradox/connection-config.md
+++ b/docs/src/main/paradox/connection-config.md
@@ -9,6 +9,9 @@ Postgres:
 Yugabyte:
 : @@snip 
[application.conf](/docs/src/test/resources/application-yugabyte.conf) { 
#connection-settings }
 
+MySQL:
+: @@snip [application.conf](/docs/src/test/resources/application-mysql.conf) { 
#connection-settings }
+
 ## Reference configuration 
 
 The following can be overridden in your `application.conf` for the connection 
settings:
diff --git a/docs/src/test/resources/application-mysql.conf 
b/docs/src/test/resources/application-mysql.conf
new file mode 100644
index 0000000..e645b4a
--- /dev/null
+++ b/docs/src/test/resources/application-mysql.conf
@@ -0,0 +1,32 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko.persistence.journal.plugin = "pekko.persistence.r2dbc.journal"
+pekko.persistence.snapshot-store.plugin = "pekko.persistence.r2dbc.snapshot"
+pekko.persistence.state.plugin = "pekko.persistence.r2dbc.state"
+
+// #connection-settings
+pekko.persistence.r2dbc {
+  dialect = "mysql"
+  connection-factory {
+    driver = "mysql"
+    host = "localhost"
+    host = ${?DB_HOST}
+    port = 3306
+    database = "mysql"
+    database = ${?DB_NAME}
+    user = "root"
+    user = ${?DB_USER}
+    password = "root"
+    password = ${?DB_PASSWORD}
+
+    db-timestamp-monotonic-increasing = on
+    use-app-timestamp = on
+
+    # ssl {
+    #   enabled = on
+    #   mode = "VERIFY_CA"
+    #   root-cert = "/path/db_root.crt"
+    # }
+  }
+}
+// #connection-settings
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index c01be9b..b0818d4 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -36,6 +36,7 @@ object Dependencies {
     val r2dbcPool = "io.r2dbc" % "r2dbc-pool" % "1.0.2.RELEASE"
     val r2dbcPostgres = Seq(
       "org.postgresql" % "r2dbc-postgresql" % "1.0.7.RELEASE")
+    val r2dbcMysql = "io.asyncer" % "r2dbc-mysql" % "1.3.0"
   }
 
   object TestDeps {
@@ -75,6 +76,7 @@ object Dependencies {
     pekkoPersistenceQuery,
     r2dbcSpi,
     r2dbcPool,
+    r2dbcMysql % "provided,test",
     TestDeps.pekkoPersistenceTck,
     TestDeps.pekkoStreamTestkit,
     TestDeps.pekkoActorTestkitTyped,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to