This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 5419e7a fix: Use explicit microsecond timestamp resolution (#373)
5419e7a is described below
commit 5419e7a5a7fe1c288e70e39e74c95d4ca167dd22
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 09:57:41 2026 +0100
fix: Use explicit microsecond timestamp resolution (#373)
* fix: Use explicit microsecond timestamp resolution (port of
akka/akka-persistence-r2dbc#324)
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/dc2cec70-bd9c-428c-a89c-8e7e8eb22750
Co-authored-by: pjfanning <[email protected]>
* fix: Truncate timestamps to microsecond resolution to match Postgres
precision
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/dc2cec70-bd9c-428c-a89c-8e7e8eb22750
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../persistence/r2dbc/internal/BySliceQuery.scala | 5 ++--
.../r2dbc/internal/InstantFactory.scala | 33 ++++++++++++++++++++++
.../persistence/r2dbc/journal/R2dbcJournal.scala | 3 +-
.../r2dbc/query/scaladsl/QueryDao.scala | 3 +-
.../r2dbc/state/scaladsl/DurableStateDao.scala | 3 +-
.../r2dbc/internal/BySliceQueryBucketsSpec.scala | 2 +-
.../query/EventsBySliceBacktrackingSpec.scala | 5 ++--
.../r2dbc/query/EventsBySlicePubSubSpec.scala | 5 ++--
.../r2dbc/EventSourcedEndToEndSpec.scala | 2 +-
.../projection/r2dbc/R2dbcOffsetStoreSpec.scala | 2 +-
.../r2dbc/R2dbcOffsetStoreStateSpec.scala | 8 +++---
.../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala | 6 ++--
.../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 18 ++++++------
.../apache/pekko/projection/r2dbc/TestClock.scala | 29 +++++++++++--------
14 files changed, 84 insertions(+), 40 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
index dc5515d..fca0486 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
@@ -31,6 +31,7 @@ import pekko.persistence.r2dbc.BufferSize
import pekko.persistence.r2dbc.BySliceQuerySettings
import pekko.persistence.r2dbc.RefreshInterval
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
+import pekko.persistence.r2dbc.internal.InstantFactory
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.Source
import org.slf4j.Logger
@@ -107,7 +108,7 @@ import org.slf4j.Logger
import Buckets.Count
import Buckets.EpochSeconds
- val createdAt: Instant = Instant.now()
+ val createdAt: Instant = InstantFactory.now()
def findTimeForLimit(from: Instant, atLeastCounts: Int): Option[Instant] =
{
val fromEpochSeconds = from.toEpochMilli / 1000
@@ -461,7 +462,7 @@ import org.slf4j.Logger
// Don't run this too frequently
if ((state.buckets.isEmpty ||
JDuration
- .between(state.buckets.createdAt, Instant.now())
+ .between(state.buckets.createdAt, InstantFactory.now())
.compareTo(eventBucketCountInterval) > 0) &&
// For Durable State we always refresh the bucket counts at the
interval. For Event Sourced we know
// that they don't change because events are append only.
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/InstantFactory.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/InstantFactory.scala
new file mode 100644
index 0000000..b8d0b3a
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/InstantFactory.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import java.time.Instant
+import java.time.temporal.ChronoUnit
+
+import org.apache.pekko.annotation.InternalApi
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object InstantFactory {
+
+ /**
+ * Current time truncated to microseconds. The reason for using microseconds
is that Postgres timestamps has the
+ * resolution of microseconds but some OS/JDK (Linux/JDK17) has Instant
resolution of nanoseconds.
+ */
+ def now(): Instant =
+ Instant.now().truncatedTo(ChronoUnit.MICROS)
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
index 9b79519..a95b0b5 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
@@ -33,6 +33,7 @@ import pekko.persistence.PersistentRepr
import pekko.persistence.journal.AsyncWriteJournal
import pekko.persistence.journal.Tagged
import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.internal.InstantFactory
import pekko.persistence.r2dbc.internal.PubSub
import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
@@ -107,7 +108,7 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]):
Future[immutable.Seq[Try[Unit]]] = {
def atomicWrite(atomicWrite: AtomicWrite): Future[Instant] = {
- val timestamp = if (journalSettings.useAppTimestamp) Instant.now() else
JournalDao.EmptyDbTimestamp
+ val timestamp = if (journalSettings.useAppTimestamp)
InstantFactory.now() else JournalDao.EmptyDbTimestamp
val serialized: Try[Seq[SerializedJournalRow]] = Try {
atomicWrite.payload.map { pr =>
val (event, tags) = pr.payload match {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 11b207f..49d70b6 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -31,6 +31,7 @@ import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
+import pekko.persistence.r2dbc.internal.InstantFactory
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
@@ -242,7 +243,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
limit: Int): Future[Seq[Bucket]] = {
val toTimestamp = {
- val now = Instant.now() // not important to use database time
+ val now = InstantFactory.now() // not important to use database time
if (fromTimestamp == Instant.EPOCH)
now
else {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 2eda936..2b988de 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -30,6 +30,7 @@ import pekko.persistence.r2dbc.StateSettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
+import pekko.persistence.r2dbc.internal.InstantFactory
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
@@ -531,7 +532,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
limit: Int): Future[Seq[Bucket]] = {
val toTimestamp = {
- val now = Instant.now() // not important to use database time
+ val now = InstantFactory.now() // not important to use database time
if (fromTimestamp == Instant.EPOCH)
now
else {
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala
index 47a8642..6fa43c4 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala
@@ -25,7 +25,7 @@ import org.scalatest.wordspec.AnyWordSpec
class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers
{
- private val startTime = Instant.now()
+ private val startTime = InstantFactory.now()
private val firstBucketStartTime = startTime.plusSeconds(60)
private val firstBucketStartEpochSeconds = firstBucketStartTime.toEpochMilli
/ 1000
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index 3b244b5..b0dd655 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -30,6 +30,7 @@ import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
import pekko.persistence.r2dbc.TestDbLifecycle
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.internal.InstantFactory
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.serialization.SerializationExtension
@@ -98,7 +99,7 @@ class EventsBySliceBacktrackingSpec
val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
// don't let behind-current-time be a reason for not finding events
- val startTime = Instant.now().minusSeconds(10 * 60)
+ val startTime = InstantFactory.now().minusSeconds(10 * 60)
writeEvent(slice1, pid1, 1L, startTime, "e1-1")
writeEvent(slice1, pid1, 2L, startTime.plusMillis(1), "e1-2")
@@ -182,7 +183,7 @@ class EventsBySliceBacktrackingSpec
val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
// don't let behind-current-time be a reason for not finding events
- val startTime = Instant.now().minusSeconds(10 * 60)
+ val startTime = InstantFactory.now().minusSeconds(10 * 60)
writeEvent(slice1, pid1, 1L, startTime, "e1-1")
writeEvent(slice1, pid1, 2L, startTime.plusMillis(2), "e1-2")
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 553446b..edbb96f 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -36,6 +36,7 @@ import
pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.internal.InstantFactory
import pekko.persistence.r2dbc.internal.PubSub
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
@@ -90,9 +91,9 @@ class EventsBySlicePubSubSpec
}
private def createEnvelope(pid: PersistenceId, seqNr: Long, evt: String):
EventEnvelope[String] = {
- val now = Instant.now()
+ val now = InstantFactory.now()
EventEnvelope(
- TimestampOffset(Instant.now, Map(pid.id -> seqNr)),
+ TimestampOffset(now, Map(pid.id -> seqNr)),
pid.id,
seqNr,
evt,
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index b68c9e7..aeddf55 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -300,7 +300,7 @@ class EventSourcedEndToEndSpec
val pid2 = nextPid(entityType)
val pid3 = nextPid(entityType)
- val startTime = Instant.now()
+ val startTime = TestClock.nowMicros().instant()
val oldTime =
startTime.minus(projectionSettings.timeWindow).minusSeconds(60)
writeEvent(pid1, 1L, startTime, "e1-1")
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index b957eb7..ccd63ff 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -41,7 +41,7 @@ class R2dbcOffsetStoreSpec
override def typedSystem: ActorSystem[_] = system
// test clock for testing of the `last_updated` Instant
- private val clock = new TestClock
+ private val clock = TestClock.nowMillis()
private val settings = R2dbcProjectionSettings(testKit.system)
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala
index 73f859f..318475d 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala
@@ -26,7 +26,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with
TestSuite with Matchers
"R2dbcOffsetStore.State" should {
"add records and keep track of pids and latest offset" in {
- val t0 = Instant.now()
+ val t0 = TestClock.nowMillis().instant()
val state1 = State.empty
.add(Vector(Record("p1", 1, t0), Record("p1", 2, t0.plusMillis(1)),
Record("p1", 3, t0.plusMillis(2))))
state1.byPid("p1").seqNr shouldBe 3L
@@ -53,7 +53,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with
TestSuite with Matchers
// reproducer of issue #173
"include highest seqNr in seen of latestOffset" in {
- val t0 = Instant.now()
+ val t0 = TestClock.nowMillis().instant()
val records =
Vector(Record("p4", 9, t0), Record("p2", 2, t0), Record("p3", 5, t0),
Record("p2", 1, t0), Record("p1", 3, t0))
val state = State(records)
@@ -63,7 +63,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with
TestSuite with Matchers
}
"evict old" in {
- val t0 = Instant.now()
+ val t0 = TestClock.nowMillis().instant()
val state1 = State.empty
.add(
Vector(
@@ -97,7 +97,7 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with
TestSuite with Matchers
}
"find duplicate" in {
- val t0 = Instant.now()
+ val t0 = TestClock.nowMillis().instant()
val state =
State(Vector(Record("p1", 1, t0), Record("p2", 2, t0.plusMillis(1)),
Record("p3", 3, t0.plusMillis(2))))
state.isDuplicate(Record("p1", 1, t0)) shouldBe true
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index dc130b3..00048b7 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -14,7 +14,6 @@
package org.apache.pekko.projection.r2dbc
import java.time.Instant
-import java.time.temporal.ChronoUnit
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
@@ -252,7 +251,7 @@ class R2dbcTimestampOffsetProjectionSpec
}
- private val clock = new TestClock
+ private val clock = TestClock.nowMicros()
def tick(): TestClock = {
clock.tick(JDuration.ofMillis(1))
clock
@@ -289,8 +288,7 @@ class R2dbcTimestampOffsetProjectionSpec
}
def now(): Instant = {
- // supported databases do not store more than 6 fractional digits
- Instant.now().truncatedTo(ChronoUnit.MICROS)
+ TestClock.nowMicros().instant()
}
def createEnvelopesWithDuplicates(pid1: Pid, pid2: Pid):
Vector[EventEnvelope[String]] = {
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
index 0708d74..0d76ebb 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
@@ -70,7 +70,7 @@ class R2dbcTimestampOffsetStoreSpec
override def typedSystem: ActorSystem[_] = system
- private val clock = new TestClock
+ private val clock = TestClock.nowMicros()
def tick(): Unit = clock.tick(JDuration.ofMillis(1))
private val log = LoggerFactory.getLogger(getClass)
@@ -357,10 +357,10 @@ class R2dbcTimestampOffsetStoreSpec
"accept known sequence numbers and reject unknown" in {
val projectionId = genRandomProjectionId()
- val eventTimestampQueryClock = new TestClock
+ val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId,
eventTimestampQueryClock = eventTimestampQueryClock)
- val startTime = Instant.now()
+ val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L,
"p3" -> 5L))
offsetStore.saveOffset(offset1).futureValue
@@ -437,7 +437,7 @@ class R2dbcTimestampOffsetStoreSpec
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
- val startTime = Instant.now()
+ val startTime = TestClock.nowMicros().instant()
val envelope1 = createEnvelope("p1", 1L, startTime.plusMillis(1), "e1-1")
val envelope2 = createEnvelope("p1", 2L, startTime.plusMillis(2), "e1-2")
@@ -474,7 +474,7 @@ class R2dbcTimestampOffsetStoreSpec
"filter accepted" in {
val projectionId = genRandomProjectionId()
- val startTime = Instant.now()
+ val startTime = TestClock.nowMicros().instant()
val offsetStore = createOffsetStore(projectionId)
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L,
"p3" -> 5L))
@@ -499,7 +499,7 @@ class R2dbcTimestampOffsetStoreSpec
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
- val startTime = Instant.now()
+ val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L,
"p3" -> 5L))
offsetStore.saveOffset(offset1).futureValue
@@ -565,7 +565,7 @@ class R2dbcTimestampOffsetStoreSpec
import evictSettings._
val offsetStore = createOffsetStore(projectionId, evictSettings)
- val startTime = Instant.now()
+ val startTime = TestClock.nowMicros().instant()
log.debug("Start time [{}]", startTime)
offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" ->
1L))).futureValue
@@ -608,7 +608,7 @@ class R2dbcTimestampOffsetStoreSpec
import deleteSettings._
val offsetStore = createOffsetStore(projectionId, deleteSettings)
- val startTime = Instant.now()
+ val startTime = TestClock.nowMicros().instant()
log.debug("Start time [{}]", startTime)
offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" ->
1L))).futureValue
@@ -639,7 +639,7 @@ class R2dbcTimestampOffsetStoreSpec
import deleteSettings._
val offsetStore = createOffsetStore(projectionId, deleteSettings)
- val startTime = Instant.now()
+ val startTime = TestClock.nowMicros().instant()
log.debug("Start time [{}]", startTime)
offsetStore.saveOffset(TimestampOffset(startTime, Map("p1" ->
1L))).futureValue
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestClock.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestClock.scala
index 6c01374..209a3bc 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestClock.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestClock.scala
@@ -18,15 +18,25 @@ import java.time.Duration
import java.time.Instant
import java.time.ZoneId
import java.time.ZoneOffset
+import java.time.temporal.ChronoUnit
+import java.time.temporal.TemporalUnit
import org.apache.pekko.annotation.InternalApi
+object TestClock {
+ def nowMillis(): TestClock = new TestClock(ChronoUnit.MILLIS)
+ def nowMicros(): TestClock = new TestClock(ChronoUnit.MICROS)
+}
+
/**
- * INTERNAL API
+ * Clock for testing purpose, which is truncated to a resolution (milliseconds
or microseconds).
+ *
+ * The reason for truncating to the resolution is that Postgres timestamps
have the resolution of microseconds but some
+ * OS/JDK (Linux/JDK17) has Instant resolution of nanoseconds.
*/
-@InternalApi private[projection] class TestClock extends Clock {
+@InternalApi private[projection] class TestClock(resolution: TemporalUnit)
extends Clock {
- @volatile private var _instant = roundToMillis(Instant.now())
+ @volatile private var _instant = Instant.now().truncatedTo(resolution)
override def getZone: ZoneId = ZoneOffset.UTC
@@ -37,18 +47,15 @@ import org.apache.pekko.annotation.InternalApi
_instant
def setInstant(newInstant: Instant): Unit =
- _instant = roundToMillis(newInstant)
+ _instant = newInstant.truncatedTo(resolution)
+ /**
+ * Increase the clock with this duration (truncated to the resolution)
+ */
def tick(duration: Duration): Instant = {
- val newInstant = roundToMillis(_instant.plus(duration))
+ val newInstant = _instant.plus(duration).truncatedTo(resolution)
_instant = newInstant
newInstant
}
- private def roundToMillis(i: Instant): Instant = {
- // algo taken from java.time.Clock.tick
- val epochMilli = i.toEpochMilli
- Instant.ofEpochMilli(epochMilli - Math.floorMod(epochMilli, 1L))
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]