This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 5419e7a  fix: Use explicit microsecond timestamp resolution (#373)
5419e7a is described below

commit 5419e7a5a7fe1c288e70e39e74c95d4ca167dd22
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 09:57:41 2026 +0100

    fix: Use explicit microsecond timestamp resolution (#373)
    
    * fix: Use explicit microsecond timestamp resolution (port of 
akka/akka-persistence-r2dbc#324)
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/dc2cec70-bd9c-428c-a89c-8e7e8eb22750
    
    Co-authored-by: pjfanning <[email protected]>
    
    * fix: Truncate timestamps to microsecond resolution to match Postgres 
precision
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/dc2cec70-bd9c-428c-a89c-8e7e8eb22750
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../persistence/r2dbc/internal/BySliceQuery.scala  |  5 ++--
 .../r2dbc/internal/InstantFactory.scala            | 33 ++++++++++++++++++++++
 .../persistence/r2dbc/journal/R2dbcJournal.scala   |  3 +-
 .../r2dbc/query/scaladsl/QueryDao.scala            |  3 +-
 .../r2dbc/state/scaladsl/DurableStateDao.scala     |  3 +-
 .../r2dbc/internal/BySliceQueryBucketsSpec.scala   |  2 +-
 .../query/EventsBySliceBacktrackingSpec.scala      |  5 ++--
 .../r2dbc/query/EventsBySlicePubSubSpec.scala      |  5 ++--
 .../r2dbc/EventSourcedEndToEndSpec.scala           |  2 +-
 .../projection/r2dbc/R2dbcOffsetStoreSpec.scala    |  2 +-
 .../r2dbc/R2dbcOffsetStoreStateSpec.scala          |  8 +++---
 .../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala |  6 ++--
 .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala      | 18 ++++++------
 .../apache/pekko/projection/r2dbc/TestClock.scala  | 29 +++++++++++--------
 14 files changed, 84 insertions(+), 40 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
index dc5515d..fca0486 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
@@ -31,6 +31,7 @@ import pekko.persistence.r2dbc.BufferSize
 import pekko.persistence.r2dbc.BySliceQuerySettings
 import pekko.persistence.r2dbc.RefreshInterval
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
+import pekko.persistence.r2dbc.internal.InstantFactory
 import pekko.stream.scaladsl.Flow
 import pekko.stream.scaladsl.Source
 import org.slf4j.Logger
@@ -107,7 +108,7 @@ import org.slf4j.Logger
     import Buckets.Count
     import Buckets.EpochSeconds
 
-    val createdAt: Instant = Instant.now()
+    val createdAt: Instant = InstantFactory.now()
 
     def findTimeForLimit(from: Instant, atLeastCounts: Int): Option[Instant] = 
{
       val fromEpochSeconds = from.toEpochMilli / 1000
@@ -461,7 +462,7 @@ import org.slf4j.Logger
     // Don't run this too frequently
     if ((state.buckets.isEmpty ||
       JDuration
-        .between(state.buckets.createdAt, Instant.now())
+        .between(state.buckets.createdAt, InstantFactory.now())
         .compareTo(eventBucketCountInterval) > 0) &&
       // For Durable State we always refresh the bucket counts at the 
interval. For Event Sourced we know
       // that they don't change because events are append only.
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/InstantFactory.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/InstantFactory.scala
new file mode 100644
index 0000000..b8d0b3a
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/InstantFactory.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import java.time.Instant
+import java.time.temporal.ChronoUnit
+
+import org.apache.pekko.annotation.InternalApi
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object InstantFactory {
+
+  /**
+   * Current time truncated to microseconds. The reason for using microseconds 
is that Postgres timestamps has the
+   * resolution of microseconds but some OS/JDK (Linux/JDK17) has Instant 
resolution of nanoseconds.
+   */
+  def now(): Instant =
+    Instant.now().truncatedTo(ChronoUnit.MICROS)
+
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
index 9b79519..a95b0b5 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
@@ -33,6 +33,7 @@ import pekko.persistence.PersistentRepr
 import pekko.persistence.journal.AsyncWriteJournal
 import pekko.persistence.journal.Tagged
 import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.internal.InstantFactory
 import pekko.persistence.r2dbc.internal.PubSub
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
@@ -107,7 +108,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, 
cfgPath: String) extends
 
   override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): 
Future[immutable.Seq[Try[Unit]]] = {
     def atomicWrite(atomicWrite: AtomicWrite): Future[Instant] = {
-      val timestamp = if (journalSettings.useAppTimestamp) Instant.now() else 
JournalDao.EmptyDbTimestamp
+      val timestamp = if (journalSettings.useAppTimestamp) 
InstantFactory.now() else JournalDao.EmptyDbTimestamp
       val serialized: Try[Seq[SerializedJournalRow]] = Try {
         atomicWrite.payload.map { pr =>
           val (event, tags) = pr.payload match {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 11b207f..49d70b6 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -31,6 +31,7 @@ import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
 import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
 import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
+import pekko.persistence.r2dbc.internal.InstantFactory
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.JournalDao
@@ -242,7 +243,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
       limit: Int): Future[Seq[Bucket]] = {
 
     val toTimestamp = {
-      val now = Instant.now() // not important to use database time
+      val now = InstantFactory.now() // not important to use database time
       if (fromTimestamp == Instant.EPOCH)
         now
       else {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 2eda936..2b988de 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -30,6 +30,7 @@ import pekko.persistence.r2dbc.StateSettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
+import pekko.persistence.r2dbc.internal.InstantFactory
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
@@ -531,7 +532,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
       limit: Int): Future[Seq[Bucket]] = {
 
     val toTimestamp = {
-      val now = Instant.now() // not important to use database time
+      val now = InstantFactory.now() // not important to use database time
       if (fromTimestamp == Instant.EPOCH)
         now
       else {
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala
index 47a8642..6fa43c4 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala
@@ -25,7 +25,7 @@ import org.scalatest.wordspec.AnyWordSpec
 
 class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers 
{
 
-  private val startTime = Instant.now()
+  private val startTime = InstantFactory.now()
   private val firstBucketStartTime = startTime.plusSeconds(60)
   private val firstBucketStartEpochSeconds = firstBucketStartTime.toEpochMilli 
/ 1000
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index 3b244b5..b0dd655 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -30,6 +30,7 @@ import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
 import pekko.persistence.r2dbc.TestDbLifecycle
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.internal.InstantFactory
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
 import pekko.serialization.SerializationExtension
@@ -98,7 +99,7 @@ class EventsBySliceBacktrackingSpec
       val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
 
       // don't let behind-current-time be a reason for not finding events
-      val startTime = Instant.now().minusSeconds(10 * 60)
+      val startTime = InstantFactory.now().minusSeconds(10 * 60)
 
       writeEvent(slice1, pid1, 1L, startTime, "e1-1")
       writeEvent(slice1, pid1, 2L, startTime.plusMillis(1), "e1-2")
@@ -182,7 +183,7 @@ class EventsBySliceBacktrackingSpec
       val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
 
       // don't let behind-current-time be a reason for not finding events
-      val startTime = Instant.now().minusSeconds(10 * 60)
+      val startTime = InstantFactory.now().minusSeconds(10 * 60)
 
       writeEvent(slice1, pid1, 1L, startTime, "e1-1")
       writeEvent(slice1, pid1, 2L, startTime.plusMillis(2), "e1-2")
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 553446b..edbb96f 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -36,6 +36,7 @@ import 
pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
 import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.internal.InstantFactory
 import pekko.persistence.r2dbc.internal.PubSub
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
@@ -90,9 +91,9 @@ class EventsBySlicePubSubSpec
   }
 
   private def createEnvelope(pid: PersistenceId, seqNr: Long, evt: String): 
EventEnvelope[String] = {
-    val now = Instant.now()
+    val now = InstantFactory.now()
     EventEnvelope(
-      TimestampOffset(Instant.now, Map(pid.id -> seqNr)),
+      TimestampOffset(now, Map(pid.id -> seqNr)),
       pid.id,
       seqNr,
       evt,
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index b68c9e7..aeddf55 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -300,7 +300,7 @@ class EventSourcedEndToEndSpec
       val pid2 = nextPid(entityType)
       val pid3 = nextPid(entityType)
 
-      val startTime = Instant.now()
+      val startTime = TestClock.nowMicros().instant()
       val oldTime = 
startTime.minus(projectionSettings.timeWindow).minusSeconds(60)
       writeEvent(pid1, 1L, startTime, "e1-1")
 
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index b957eb7..ccd63ff 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -41,7 +41,7 @@ class R2dbcOffsetStoreSpec
   override def typedSystem: ActorSystem[_] = system
 
   // test clock for testing of the `last_updated` Instant
-  private val clock = new TestClock
+  private val clock = TestClock.nowMillis()
 
   private val settings = R2dbcProjectionSettings(testKit.system)
 
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala
index 73f859f..318475d 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala
@@ -26,7 +26,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with 
TestSuite with Matchers
 
   "R2dbcOffsetStore.State" should {
     "add records and keep track of pids and latest offset" in {
-      val t0 = Instant.now()
+      val t0 = TestClock.nowMillis().instant()
       val state1 = State.empty
         .add(Vector(Record("p1", 1, t0), Record("p1", 2, t0.plusMillis(1)), 
Record("p1", 3, t0.plusMillis(2))))
       state1.byPid("p1").seqNr shouldBe 3L
@@ -53,7 +53,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with 
TestSuite with Matchers
 
     // reproducer of issue #173
     "include highest seqNr in seen of latestOffset" in {
-      val t0 = Instant.now()
+      val t0 = TestClock.nowMillis().instant()
       val records =
         Vector(Record("p4", 9, t0), Record("p2", 2, t0), Record("p3", 5, t0), 
Record("p2", 1, t0), Record("p1", 3, t0))
       val state = State(records)
@@ -63,7 +63,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with 
TestSuite with Matchers
     }
 
     "evict old" in {
-      val t0 = Instant.now()
+      val t0 = TestClock.nowMillis().instant()
       val state1 = State.empty
         .add(
           Vector(
@@ -97,7 +97,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with 
TestSuite with Matchers
     }
 
     "find duplicate" in {
-      val t0 = Instant.now()
+      val t0 = TestClock.nowMillis().instant()
       val state =
         State(Vector(Record("p1", 1, t0), Record("p2", 2, t0.plusMillis(1)), 
Record("p3", 3, t0.plusMillis(2))))
       state.isDuplicate(Record("p1", 1, t0)) shouldBe true
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index dc130b3..00048b7 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -14,7 +14,6 @@
 package org.apache.pekko.projection.r2dbc
 
 import java.time.Instant
-import java.time.temporal.ChronoUnit
 import java.time.{ Duration => JDuration }
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicBoolean
@@ -252,7 +251,7 @@ class R2dbcTimestampOffsetProjectionSpec
 
   }
 
-  private val clock = new TestClock
+  private val clock = TestClock.nowMicros()
   def tick(): TestClock = {
     clock.tick(JDuration.ofMillis(1))
     clock
@@ -289,8 +288,7 @@ class R2dbcTimestampOffsetProjectionSpec
   }
 
   def now(): Instant = {
-    // supported databases do not store more than 6 fractional digits
-    Instant.now().truncatedTo(ChronoUnit.MICROS)
+    TestClock.nowMicros().instant()
   }
 
   def createEnvelopesWithDuplicates(pid1: Pid, pid2: Pid): 
Vector[EventEnvelope[String]] = {
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
index 0708d74..0d76ebb 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
@@ -70,7 +70,7 @@ class R2dbcTimestampOffsetStoreSpec
 
   override def typedSystem: ActorSystem[_] = system
 
-  private val clock = new TestClock
+  private val clock = TestClock.nowMicros()
   def tick(): Unit = clock.tick(JDuration.ofMillis(1))
 
   private val log = LoggerFactory.getLogger(getClass)
@@ -357,10 +357,10 @@ class R2dbcTimestampOffsetStoreSpec
 
     "accept known sequence numbers and reject unknown" in {
       val projectionId = genRandomProjectionId()
-      val eventTimestampQueryClock = new TestClock
+      val eventTimestampQueryClock = TestClock.nowMicros()
       val offsetStore = createOffsetStore(projectionId, 
eventTimestampQueryClock = eventTimestampQueryClock)
 
-      val startTime = Instant.now()
+      val startTime = TestClock.nowMicros().instant()
       val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, 
"p3" -> 5L))
       offsetStore.saveOffset(offset1).futureValue
 
@@ -437,7 +437,7 @@ class R2dbcTimestampOffsetStoreSpec
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
 
-      val startTime = Instant.now()
+      val startTime = TestClock.nowMicros().instant()
 
       val envelope1 = createEnvelope("p1", 1L, startTime.plusMillis(1), "e1-1")
       val envelope2 = createEnvelope("p1", 2L, startTime.plusMillis(2), "e1-2")
@@ -474,7 +474,7 @@ class R2dbcTimestampOffsetStoreSpec
 
     "filter accepted" in {
       val projectionId = genRandomProjectionId()
-      val startTime = Instant.now()
+      val startTime = TestClock.nowMicros().instant()
       val offsetStore = createOffsetStore(projectionId)
 
       val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, 
"p3" -> 5L))
@@ -499,7 +499,7 @@ class R2dbcTimestampOffsetStoreSpec
       val projectionId = genRandomProjectionId()
       val offsetStore = createOffsetStore(projectionId)
 
-      val startTime = Instant.now()
+      val startTime = TestClock.nowMicros().instant()
       val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, 
"p3" -> 5L))
       offsetStore.saveOffset(offset1).futureValue
 
@@ -565,7 +565,7 @@ class R2dbcTimestampOffsetStoreSpec
       import evictSettings._
       val offsetStore = createOffsetStore(projectionId, evictSettings)
 
-      val startTime = Instant.now()
+      val startTime = TestClock.nowMicros().instant()
       log.debug("Start time [{}]", startTime)
 
       offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" -> 
1L))).futureValue
@@ -608,7 +608,7 @@ class R2dbcTimestampOffsetStoreSpec
       import deleteSettings._
       val offsetStore = createOffsetStore(projectionId, deleteSettings)
 
-      val startTime = Instant.now()
+      val startTime = TestClock.nowMicros().instant()
       log.debug("Start time [{}]", startTime)
 
       offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" -> 
1L))).futureValue
@@ -639,7 +639,7 @@ class R2dbcTimestampOffsetStoreSpec
       import deleteSettings._
       val offsetStore = createOffsetStore(projectionId, deleteSettings)
 
-      val startTime = Instant.now()
+      val startTime = TestClock.nowMicros().instant()
       log.debug("Start time [{}]", startTime)
 
       offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" -> 
1L))).futureValue
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestClock.scala 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestClock.scala
index 6c01374..209a3bc 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestClock.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestClock.scala
@@ -18,15 +18,25 @@ import java.time.Duration
 import java.time.Instant
 import java.time.ZoneId
 import java.time.ZoneOffset
+import java.time.temporal.ChronoUnit
+import java.time.temporal.TemporalUnit
 
 import org.apache.pekko.annotation.InternalApi
 
+object TestClock {
+  def nowMillis(): TestClock = new TestClock(ChronoUnit.MILLIS)
+  def nowMicros(): TestClock = new TestClock(ChronoUnit.MICROS)
+}
+
 /**
- * INTERNAL API
+ * Clock for testing purpose, which is truncated to a resolution (milliseconds 
or microseconds).
+ *
+ * The reason for truncating to the resolution is that Postgres timestamps 
have the resolution of microseconds but some
+ * OS/JDK (Linux/JDK17) has Instant resolution of nanoseconds.
  */
-@InternalApi private[projection] class TestClock extends Clock {
+@InternalApi private[projection] class TestClock(resolution: TemporalUnit) 
extends Clock {
 
-  @volatile private var _instant = roundToMillis(Instant.now())
+  @volatile private var _instant = Instant.now().truncatedTo(resolution)
 
   override def getZone: ZoneId = ZoneOffset.UTC
 
@@ -37,18 +47,15 @@ import org.apache.pekko.annotation.InternalApi
     _instant
 
   def setInstant(newInstant: Instant): Unit =
-    _instant = roundToMillis(newInstant)
+    _instant = newInstant.truncatedTo(resolution)
 
+  /**
+   * Increase the clock with this duration (truncated to the resolution)
+   */
   def tick(duration: Duration): Instant = {
-    val newInstant = roundToMillis(_instant.plus(duration))
+    val newInstant = _instant.plus(duration).truncatedTo(resolution)
     _instant = newInstant
     newInstant
   }
 
-  private def roundToMillis(i: Instant): Instant = {
-    // algo taken from java.time.Clock.tick
-    val epochMilli = i.toEpochMilli
-    Instant.ofEpochMilli(epochMilli - Math.floorMod(epochMilli, 1L))
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to