This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-dynamodb.git
The following commit(s) were added to refs/heads/main by this push:
new f9a424d more changes to support scala 3
f9a424d is described below
commit f9a424d5c206797459a37d2380baec81a5fa31ab
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Apr 26 08:14:08 2023 +0200
more changes to support scala 3
---
.../persistence/dynamodb/journal/DynamoDBHelper.scala | 2 --
.../persistence/dynamodb/journal/DynamoDBJournal.scala | 2 +-
.../dynamodb/journal/DynamoDBJournalRequests.scala | 6 ++++--
.../persistence/dynamodb/journal/DynamoDBRecovery.scala | 16 +++++-----------
.../persistence/dynamodb/journal/DynamoDBUtils.scala | 6 +++---
.../dynamodb/journal/FailureReportingSpec.scala | 4 ++--
.../dynamodb/journal/WriteThroughputBench.scala | 5 +++--
.../persistence/dynamodb/snapshot/DynamoDBUtils.scala | 9 ++++-----
8 files changed, 22 insertions(+), 28 deletions(-)
diff --git
a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala
b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala
index 53e68a6..5c90b1c 100644
---
a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala
+++
b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala
@@ -27,8 +27,6 @@ import java.util.{ concurrent => juc }
import scala.collection.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration._
-import scala.reflect.ClassTag
-import scala.util.control.NoStackTrace
case class LatencyReport(nanos: Long, retries: Int)
private class RetryStateHolder(var retries: Int = 10, var backoff:
FiniteDuration = 1.millis)
diff --git
a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournal.scala
b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournal.scala
index 3f9eacc..459a81b 100644
---
a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournal.scala
+++
b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournal.scala
@@ -109,7 +109,7 @@ class DynamoDBJournal(config: Config)
private val opQueue: JMap[String, Future[Done]] = new JHMap
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]):
Future[immutable.Seq[Try[Unit]]] = {
- val p = Promise[Done]
+ val p = Promise[Done]()
val pid = messages.head.persistenceId
opQueue.put(pid, p.future)
diff --git
a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala
b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala
index bbfcc3b..b55a203 100644
---
a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala
+++
b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala
@@ -24,6 +24,7 @@ import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
import org.apache.pekko.Done
import org.apache.pekko.actor.ExtendedActorSystem
+import org.apache.pekko.dispatch.ExecutionContexts
import org.apache.pekko.pattern.after
import org.apache.pekko.persistence.{ AtomicWrite, PersistentRepr }
import org.apache.pekko.persistence.dynamodb._
@@ -46,12 +47,13 @@ trait DynamoDBJournalRequests extends DynamoDBRequests {
// optimize the common case
if (writes.size == 1) {
writeMessages(writes.head)
- .map(bubbleUpFailures(_) ::
Nil)(org.apache.pekko.dispatch.ExecutionContexts.sameThreadExecutionContext)
+ .map(bubbleUpFailures(_) ::
Nil)(ExecutionContexts.sameThreadExecutionContext)
} else {
def rec(todo: List[AtomicWrite], acc: List[Try[Unit]]):
Future[List[Try[Unit]]] =
todo match {
case write :: remainder =>
- writeMessages(write).flatMap(result => rec(remainder,
bubbleUpFailures(result) :: acc))
+ writeMessages(write)
+ .flatMap(result => rec(remainder, bubbleUpFailures(result) ::
acc))
case Nil => Future.successful(acc.reverse)
}
rec(writes.toList, Nil)
diff --git
a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala
b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala
index 6d77f5d..adad104 100644
---
a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala
+++
b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala
@@ -14,15 +14,10 @@
package org.apache.pekko.persistence.dynamodb.journal
import org.apache.pekko.NotUsed
-import org.apache.pekko.actor.{ ActorSystem, ExtendedActorSystem }
+import org.apache.pekko.actor.ExtendedActorSystem
+import org.apache.pekko.dispatch.MessageDispatcher
import org.apache.pekko.persistence.PersistentRepr
import org.apache.pekko.persistence.dynamodb._
-import org.apache.pekko.persistence.dynamodb.{
- ActorSystemProvider,
- DynamoProvider,
- LoggingProvider,
- MaterializerProvider
-}
import org.apache.pekko.serialization.{ AsyncSerializer, Serialization }
import org.apache.pekko.stream._
import org.apache.pekko.stream.scaladsl._
@@ -32,8 +27,7 @@ import com.amazonaws.services.dynamodbv2.model._
import java.util.function.Consumer
import java.util.{ ArrayList, Collections, Map => JMap }
import scala.collection.immutable
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.{ Await, Future }
+import scala.concurrent.Future
import scala.jdk.CollectionConverters._
object DynamoDBRecovery {
@@ -215,7 +209,7 @@ trait DynamoDBRecovery extends AsyncReplayMessages {
import DynamoDBRecovery._
import journalSettings._
- implicit lazy val replayDispatcher =
system.dispatchers.lookup(ReplayDispatcher)
+ implicit lazy val replayDispatcher: MessageDispatcher =
system.dispatchers.lookup(ReplayDispatcher)
override def asyncReplayMessages(persistenceId: String, fromSequenceNr:
Long, toSequenceNr: Long, max: Long)(
replayCallback: (PersistentRepr) => Unit): Future[Unit] = {
@@ -463,7 +457,7 @@ trait DynamoDBRecovery extends AsyncReplayMessages {
Future.successful(deserializedEvent)
}
- fut.map { event: AnyRef =>
+ fut.map { (event: AnyRef) =>
PersistentRepr(
event,
sequenceNr = sN,
diff --git
a/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBUtils.scala
b/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBUtils.scala
index c801e25..217769d 100644
---
a/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBUtils.scala
+++
b/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBUtils.scala
@@ -29,15 +29,15 @@ trait DynamoDBUtils extends JournalSettingsProvider with
DynamoProvider {
val system: ActorSystem
import system.dispatcher
- override lazy val journalSettings = {
+ override val journalSettings = {
val c = system.settings.config
val config = c.getConfig(c.getString("pekko.persistence.journal.plugin"))
new DynamoDBJournalConfig(config)
}
- override lazy val dynamo: DynamoDBHelper = dynamoClient(system,
journalSettings)
+ override val dynamo: DynamoDBHelper = dynamoClient(system, journalSettings)
- implicit val timeout = Timeout(5.seconds)
+ implicit val timeout: Timeout = Timeout(5.seconds)
def ensureJournalTableExists(read: Long = 10L, write: Long = 10L): Unit = {
val create =
diff --git
a/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/FailureReportingSpec.scala
b/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/FailureReportingSpec.scala
index a788b64..1f143ef 100644
---
a/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/FailureReportingSpec.scala
+++
b/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/FailureReportingSpec.scala
@@ -41,7 +41,7 @@ class FailureReportingSpec
with DynamoDBUtils
with IntegSpec {
- implicit val patience = PatienceConfig(5.seconds)
+ implicit val patience: PatienceConfig = PatienceConfig(5.seconds)
override val persistenceId = "FailureReportingSpec"
@@ -78,7 +78,7 @@ class FailureReportingSpec
val config = ConfigFactory
.parseString("my-dynamodb-journal.journal-table=ThisTableDoesNotExist")
.withFallback(ConfigFactory.load())
- implicit val system = ActorSystem("FailureReportingSpec-test1", config)
+ implicit val system: ActorSystem =
ActorSystem("FailureReportingSpec-test1", config)
try EventFilter[ResourceNotFoundException](pattern =
".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
Persistence(system).journalFor("")
}
diff --git
a/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/WriteThroughputBench.scala
b/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/WriteThroughputBench.scala
index 6e44d83..1e4cd94 100644
---
a/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/WriteThroughputBench.scala
+++
b/src/test/scala/org/apache/pekko/persistence/dynamodb/journal/WriteThroughputBench.scala
@@ -126,8 +126,9 @@ writer-dispatcher {
""").resolve)
.withFallback(ConfigFactory.load())
- implicit val system = ActorSystem("WriteThroughputBench", config)
- implicit val materializer =
ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(1, 1))
+ implicit val system: ActorSystem = ActorSystem("WriteThroughputBench",
config)
+ implicit val materializer: ActorMaterializer =
+ ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(1, 1))
/*
* You will want to make sure that the table is deployed with the proper
values for
diff --git
a/src/test/scala/org/apache/pekko/persistence/dynamodb/snapshot/DynamoDBUtils.scala
b/src/test/scala/org/apache/pekko/persistence/dynamodb/snapshot/DynamoDBUtils.scala
index ce5b5b9..d44207f 100644
---
a/src/test/scala/org/apache/pekko/persistence/dynamodb/snapshot/DynamoDBUtils.scala
+++
b/src/test/scala/org/apache/pekko/persistence/dynamodb/snapshot/DynamoDBUtils.scala
@@ -22,15 +22,14 @@ import com.amazonaws.services.dynamodbv2.model._
import java.util.UUID
import scala.collection.JavaConverters._
import scala.concurrent.duration._
-import scala.concurrent.{ Await, Future }
-import org.scalatest.Suite
+import scala.concurrent.{ Await, ExecutionContext, Future }
trait DynamoDBUtils {
def system: ActorSystem
- implicit val executionContext = system.dispatcher
+ implicit val executionContext: ExecutionContext = system.dispatcher
- lazy val settings: DynamoDBSnapshotConfig = {
+ val settings: DynamoDBSnapshotConfig = {
val c = system.settings.config
val config =
c.getConfig(c.getString("pekko.persistence.snapshot-store.plugin"))
new DynamoDBSnapshotConfig(config)
@@ -39,7 +38,7 @@ trait DynamoDBUtils {
lazy val client: DynamoDBHelper = dynamoClient(system, settings)
- implicit val timeout = Timeout(5.seconds)
+ implicit val timeout: Timeout = Timeout(5.seconds)
import com.amazonaws.services.dynamodbv2.model.{ KeySchemaElement, KeyType }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]