This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
commit fa1351b2139d83ae8efa36de888bcd95fd271971
Author: scala-steward-asf[bot]
<147768647+scala-steward-asf[bot]@users.noreply.github.com>
AuthorDate: Sun Jun 16 00:05:16 2024 +0000
Reformat with scalafmt 3.8.2
Executed command: scalafmt --non-interactive
---
.../org/apache/pekko/pattern/BackoffSupervisorSpec.scala | 9 +++------
.../org/apache/pekko/pattern/CircuitBreakerMTSpec.scala | 3 ++-
.../scala/org/apache/pekko/routing/TailChoppingSpec.scala | 3 ++-
.../pekko/actor/typed/delivery/DurableProducerQueue.scala | 3 ++-
.../typed/delivery/internal/ProducerControllerImpl.scala | 3 ++-
.../main/scala/org/apache/pekko/dispatch/Mailboxes.scala | 3 ++-
.../src/main/scala/org/apache/pekko/event/EventStream.scala | 3 ++-
.../org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala | 3 ++-
.../pekko/cluster/metrics/ClusterMetricsExtensionSpec.scala | 13 +++----------
.../delivery/internal/ShardingProducerControllerImpl.scala | 3 ++-
.../apache/pekko/cluster/sharding/ShardCoordinator.scala | 3 ++-
.../cluster/ddata/typed/internal/ReplicatorBehavior.scala | 3 ++-
.../main/scala/org/apache/pekko/cluster/ClusterDaemon.scala | 3 ++-
.../scaladsl/PersistenceTestKitDurableStateStore.scala | 6 ++++--
.../org/apache/pekko/persistence/journal/ReplayFilter.scala | 3 ++-
project/SbtMultiJvmPlugin.scala | 3 ++-
project/StreamOperatorsIndexGenerator.scala | 8 ++------
.../src/main/scala/org/apache/pekko/remote/Endpoint.scala | 3 ++-
.../pekko/stream/impl/LinearTraversalBuilderSpec.scala | 3 +--
.../scala/org/apache/pekko/stream/impl/io/TcpStages.scala | 3 ++-
.../main/scala/org/apache/pekko/stream/javadsl/Source.scala | 3 ++-
.../scala/org/apache/pekko/stream/scaladsl/Source.scala | 3 ++-
22 files changed, 47 insertions(+), 43 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/pattern/BackoffSupervisorSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/pattern/BackoffSupervisorSpec.scala
index 92ac422e39..96df572af5 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/pattern/BackoffSupervisorSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/pattern/BackoffSupervisorSpec.scala
@@ -273,12 +273,9 @@ class BackoffSupervisorSpec extends PekkoSpec with
ImplicitSender with Eventuall
val delayTable =
Table(
("restartCount", "minBackoff", "maxBackoff", "randomFactor",
"expectedResult"),
- (0, 0.minutes, 0.minutes, 0d, 0.minutes),
- (0, 5.minutes, 7.minutes, 0d, 5.minutes),
- (2, 5.seconds, 7.seconds, 0d, 7.seconds),
- (2, 5.seconds, 7.days, 0d, 20.seconds),
- (29, 5.minutes, 10.minutes, 0d, 10.minutes),
- (29, 10000.days, 10000.days, 0d, 10000.days),
+ (0, 0.minutes, 0.minutes, 0d, 0.minutes), (0, 5.minutes, 7.minutes,
0d, 5.minutes),
+ (2, 5.seconds, 7.seconds, 0d, 7.seconds), (2, 5.seconds, 7.days, 0d,
20.seconds),
+ (29, 5.minutes, 10.minutes, 0d, 10.minutes), (29, 10000.days,
10000.days, 0d, 10000.days),
(Int.MaxValue, 10000.days, 10000.days, 0d, 10000.days))
forAll(delayTable) {
(
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/pattern/CircuitBreakerMTSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/pattern/CircuitBreakerMTSpec.scala
index 1d82aa5a2f..f1e83caad4 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/pattern/CircuitBreakerMTSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/pattern/CircuitBreakerMTSpec.scala
@@ -32,7 +32,8 @@ class CircuitBreakerMTSpec extends PekkoSpec {
def openBreaker(breaker: CircuitBreaker): Unit = {
// returns true if the breaker is open
def failingCall(): Boolean =
- Await.result(breaker.withCircuitBreaker(Future.failed(new
RuntimeException("FAIL"))).recover {
+ Await.result(
+ breaker.withCircuitBreaker(Future.failed(new
RuntimeException("FAIL"))).recover {
case _: CircuitBreakerOpenException => true
case _ => false
}, remainingOrDefault)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/routing/TailChoppingSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/routing/TailChoppingSpec.scala
index e22321f09e..c9d996a6b0 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/routing/TailChoppingSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/routing/TailChoppingSpec.scala
@@ -26,7 +26,8 @@ import pekko.testkit._
object TailChoppingSpec {
def newActor(id: Int, sleepTime: Duration)(implicit system: ActorSystem) =
- system.actorOf(Props(new Actor {
+ system.actorOf(
+ Props(new Actor {
var times: Int = _
def receive = {
diff --git
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/DurableProducerQueue.scala
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/DurableProducerQueue.scala
index 713b52e282..cd69940cd8 100644
---
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/DurableProducerQueue.scala
+++
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/DurableProducerQueue.scala
@@ -174,7 +174,8 @@ object DurableProducerQueue {
override def equals(obj: Any): Boolean = {
obj match {
case other: MessageSent[_] =>
- seqNr == other.seqNr && message == other.message && ack == other.ack
&& confirmationQualifier == other.confirmationQualifier && timestampMillis ==
other.timestampMillis
+ seqNr == other.seqNr && message == other.message && ack == other.ack
&& confirmationQualifier == other
+ .confirmationQualifier && timestampMillis == other.timestampMillis
case _ => false
}
}
diff --git
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
index 6358740f15..aab18c1e1a 100644
---
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
+++
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
@@ -711,7 +711,8 @@ private class ProducerControllerImpl[A: ClassTag](
}
def receiveSendChunk(): Behavior[InternalCommand] = {
- if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <=
s.requestedSeqNr && s.storeMessageSentInProgress == 0) {
+ if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <=
s.requestedSeqNr && s
+ .storeMessageSentInProgress == 0) {
if (traceEnabled)
context.log.trace("Send next chunk seqNr [{}].",
s.remainingChunks.head.seqNr)
if (durableQueue.isEmpty) {
diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Mailboxes.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/Mailboxes.scala
index 9a7aa11671..428cba3c18 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/Mailboxes.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/Mailboxes.scala
@@ -194,7 +194,8 @@ private[pekko] class Mailboxes(
if (deploy.mailbox != Deploy.NoMailboxGiven) {
verifyRequirements(lookup(deploy.mailbox))
- } else if (deploy.dispatcher != Deploy.NoDispatcherGiven &&
deploy.dispatcher != Deploy.DispatcherSameAsParent && hasMailboxType) {
+ } else if (deploy.dispatcher != Deploy.NoDispatcherGiven &&
deploy.dispatcher != Deploy
+ .DispatcherSameAsParent && hasMailboxType) {
verifyRequirements(lookup(dispatcherConfig.getString("id")))
} else if (hasRequiredType(actorClass)) {
try verifyRequirements(lookupByQueueType(getRequiredType(actorClass)))
diff --git a/actor/src/main/scala/org/apache/pekko/event/EventStream.scala
b/actor/src/main/scala/org/apache/pekko/event/EventStream.scala
index 93c6326351..90ace42e0e 100644
--- a/actor/src/main/scala/org/apache/pekko/event/EventStream.scala
+++ b/actor/src/main/scala/org/apache/pekko/event/EventStream.scala
@@ -103,7 +103,8 @@ class EventStream(sys: ActorSystem, private val debug:
Boolean) extends LoggingB
Logging.Debug(
simpleName(this),
this.getClass,
- "initialized unsubscriber to: " + unsubscriber + ",
registering " + subscribers.size + " initial subscribers with it"))
+ "initialized unsubscriber to: " + unsubscriber + ",
registering " + subscribers
+ .size + " initial subscribers with it"))
subscribers.foreach(registerWithUnsubscriber)
true
} else {
diff --git
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
index 2086700d38..853980be88 100644
---
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
+++
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
@@ -252,7 +252,8 @@ private[pekko] object AsyncDnsResolver {
"""^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$""".r
private val ipv6Address =
-
"""^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:
[...]
+
"""^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:
[...]
+ .r
private[pekko] def isIpv4Address(name: String): Boolean =
ipv4Address.findAllMatchIn(name).nonEmpty
diff --git
a/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/ClusterMetricsExtensionSpec.scala
b/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/ClusterMetricsExtensionSpec.scala
index 29aa48dee7..f9532a2e7a 100644
---
a/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/ClusterMetricsExtensionSpec.scala
+++
b/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/ClusterMetricsExtensionSpec.scala
@@ -70,16 +70,9 @@ class ClusterMetricsExtensionSpec
val history = metricsView.metricsHistory.reverse.map { _.head }
val expected = List(
- (0.700, 0.000, 0.000),
- (0.700, 0.018, 0.007),
- (0.700, 0.051, 0.020),
- (0.700, 0.096, 0.038),
- (0.700, 0.151, 0.060),
- (0.700, 0.214, 0.085),
- (0.700, 0.266, 0.106),
- (0.700, 0.309, 0.123),
- (0.700, 0.343, 0.137),
- (0.700, 0.372, 0.148))
+ (0.700, 0.000, 0.000), (0.700, 0.018, 0.007), (0.700, 0.051, 0.020),
(0.700, 0.096, 0.038),
+ (0.700, 0.151, 0.060), (0.700, 0.214, 0.085), (0.700, 0.266, 0.106),
(0.700, 0.309, 0.123),
+ (0.700, 0.343, 0.137), (0.700, 0.372, 0.148))
expected.size should ===(sampleCount)
diff --git
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala
index c8805b008d..d30b1df016 100644
---
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala
+++
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala
@@ -504,7 +504,8 @@ private class ShardingProducerControllerImpl[A: ClassTag](
s.out.flatMap {
case (outKey: OutKey, outState) =>
val idleDurationMillis = (now - outState.usedNanoTime) / 1000 /
1000
- if (outState.unconfirmed.isEmpty && outState.buffered.isEmpty &&
idleDurationMillis >= settings.cleanupUnusedAfter.toMillis) {
+ if (outState.unconfirmed.isEmpty && outState.buffered.isEmpty &&
idleDurationMillis >= settings
+ .cleanupUnusedAfter.toMillis) {
context.log.debug("Cleanup unused [{}], because it was idle for
[{} ms]", outKey, idleDurationMillis)
context.stop(outState.producerController)
Some(outKey)
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
index 9457346711..f687553dd1 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
@@ -1651,7 +1651,8 @@ private[pekko] class DDataShardCoordinator(
updateStateRetries += 1
val template =
- s"$typeName: The ShardCoordinator was unable to update a distributed
state within 'updating-state-timeout':
${stateWriteConsistency.timeout.toMillis} millis (${if (terminating)
"terminating"
+ s"$typeName: The ShardCoordinator was unable to update a distributed
state within 'updating-state-timeout': ${stateWriteConsistency
+ .timeout.toMillis} millis (${if (terminating) "terminating"
else "retrying"}). Attempt $updateStateRetries. " +
s"Perhaps the ShardRegion has not started on all active nodes yet?
event=$evt"
diff --git
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
index 06b670ec68..8f4bdfe895 100644
---
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
+++
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
@@ -58,7 +58,8 @@ import pekko.util.Timeout
def withState(
subscribeAdapters: Map[
- ActorRef[JReplicator.SubscribeResponse[ReplicatedData]],
ActorRef[dd.Replicator.SubscribeResponse[
+ ActorRef[JReplicator.SubscribeResponse[ReplicatedData]],
+ ActorRef[dd.Replicator.SubscribeResponse[
ReplicatedData]]]): Behavior[SReplicator.Command] = {
def stopSubscribeAdapter(
diff --git
a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
index e7f4fca5dd..34320067be 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
@@ -882,7 +882,8 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
def leaving(address: Address): Unit = {
// only try to update if the node is available (in the member ring)
latestGossip.members.find(_.address == address).foreach { existingMember =>
- if (existingMember.status == Joining || existingMember.status ==
WeaklyUp || existingMember.status == Up || existingMember.status ==
PreparingForShutdown || existingMember.status == ReadyForShutdown) {
+ if (existingMember.status == Joining || existingMember.status ==
WeaklyUp || existingMember
+ .status == Up || existingMember.status == PreparingForShutdown ||
existingMember.status == ReadyForShutdown) {
// mark node as LEAVING
val newMembers = latestGossip.members - existingMember +
existingMember.copy(status = Leaving)
val newGossip = latestGossip.copy(members = newMembers)
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
index 143cefd448..9af65831d5 100644
---
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala
@@ -122,7 +122,8 @@ class PersistenceTestKitDurableStateStore[A](val system:
ExtendedActorSystem)
override def currentChanges(tag: String, offset: Offset):
Source[DurableStateChange[A], pekko.NotUsed] =
this.synchronized {
val currentGlobalOffset = lastGlobalOffset.get()
- changes(tag, offset).takeWhile(_.offset match {
+ changes(tag, offset).takeWhile(
+ _.offset match {
case Sequence(fromOffset) =>
fromOffset < currentGlobalOffset
case offset =>
@@ -137,7 +138,8 @@ class PersistenceTestKitDurableStateStore[A](val system:
ExtendedActorSystem)
offset: Offset): Source[DurableStateChange[A], NotUsed] =
this.synchronized {
val currentGlobalOffset = lastGlobalOffset.get()
- changesBySlices(entityType, minSlice, maxSlice,
offset).takeWhile(_.offset match {
+ changesBySlices(entityType, minSlice, maxSlice, offset).takeWhile(
+ _.offset match {
case Sequence(fromOffset) =>
fromOffset < currentGlobalOffset
case offset =>
diff --git
a/persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala
b/persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala
index 7eeefd7df1..2ed0ac960e 100644
---
a/persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala
+++
b/persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala
@@ -138,7 +138,8 @@ private[pekko] class ReplayFilter(
if (msg.persistent.sequenceNr >= seqNo) {
val errMsg =
s"Invalid replayed event
[sequenceNr=${r.persistent.sequenceNr}, writerUUID=${r.persistent.writerUuid}]
from a new writer. " +
- s"An older writer already sent an event
[sequenceNr=${msg.persistent.sequenceNr},
writerUUID=${msg.persistent.writerUuid}] whose sequence number was equal or
greater for the same persistenceId [${r.persistent.persistenceId}]. " +
+ s"An older writer already sent an event
[sequenceNr=${msg.persistent.sequenceNr}, writerUUID=${msg.persistent
+ .writerUuid}] whose sequence number was equal or greater
for the same persistenceId [${r.persistent.persistenceId}]. " +
"Perhaps, the new writer journaled the event out of sequence,
or duplicate persistenceId for different entities?"
logIssue(errMsg)
mode match {
diff --git a/project/SbtMultiJvmPlugin.scala b/project/SbtMultiJvmPlugin.scala
index ffd2febc96..7f270ae0c0 100644
--- a/project/SbtMultiJvmPlugin.scala
+++ b/project/SbtMultiJvmPlugin.scala
@@ -248,7 +248,8 @@ object MultiJvmPlugin extends AutoPlugin {
.foreach(classpathFile =>
IO.copyFile(classpathFile, new File(multiRunCopiedClassDir,
classpathFile.getName), true))
val cp =
- directoryBasedClasspathEntries.absString + File.pathSeparator +
multiRunCopiedClassDir.getAbsolutePath + File.separator + "*"
+ directoryBasedClasspathEntries.absString + File.pathSeparator +
multiRunCopiedClassDir.getAbsolutePath + File
+ .separator + "*"
(testClass: String) => { Seq("-cp", cp, runner, "-s", testClass) ++
options }
}
diff --git a/project/StreamOperatorsIndexGenerator.scala
b/project/StreamOperatorsIndexGenerator.scala
index e2aada1769..6def2eef02 100644
--- a/project/StreamOperatorsIndexGenerator.scala
+++ b/project/StreamOperatorsIndexGenerator.scala
@@ -187,12 +187,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
.map(_.replaceAll("Mat$", ""))
.map(method => (element, method))
} ++ List(
- (noElement, "Partition"),
- (noElement, "MergeSequence"),
- (noElement, "Broadcast"),
- (noElement, "Balance"),
- (noElement, "Unzip"),
- (noElement, "UnzipWith"))
+ (noElement, "Partition"), (noElement, "MergeSequence"), (noElement,
"Broadcast"), (noElement, "Balance"),
+ (noElement, "Unzip"), (noElement, "UnzipWith"))
val sourceAndFlow =
defs.collect { case ("Source", method) => method
}.intersect(defs.collect { case ("Flow", method) => method })
diff --git a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala
b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala
index ffcbef77ca..81f3c06d44 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala
@@ -912,7 +912,8 @@ private[remote] class EndpointWriter(
if (pduSize > transport.maximumPayloadBytes) {
val reasonText =
- s"Discarding oversized payload sent to ${s.recipient}: max
allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded
${s.message.getClass} was ${pdu.size} bytes."
+ s"Discarding oversized payload sent to ${s.recipient}: max
allowed size ${transport
+ .maximumPayloadBytes} bytes, actual size of encoded
${s.message.getClass} was ${pdu.size} bytes."
log.error(
new OversizedPayloadException(reasonText),
"Transient association error (association remains live)")
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/LinearTraversalBuilderSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/LinearTraversalBuilderSpec.scala
index e252cc0144..cccb0868d9 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/LinearTraversalBuilderSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/LinearTraversalBuilderSpec.scala
@@ -817,8 +817,7 @@ class LinearTraversalBuilderSpec extends PekkoSpec {
mat.islandAssignments should ===(
List(
- (sink, Attributes.none, TestDefaultIsland),
- (flow2, Attributes.none, TestDefaultIsland),
+ (sink, Attributes.none, TestDefaultIsland), (flow2, Attributes.none,
TestDefaultIsland),
(flow1, Attributes.name("island2"), TestIsland2),
(source, Attributes.name("island2") and Attributes.name("island1"),
TestIsland1)))
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala
index d9cfeabfa0..2054d5392f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala
@@ -50,7 +50,8 @@ import pekko.util.ByteString
val halfClose: Boolean,
val idleTimeout: Duration,
val bindShutdownTimeout: FiniteDuration)
- extends
GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection],
Future[
+ extends
GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection],
+ Future[
StreamTcp.ServerBinding]] {
import ConnectionSourceStage._
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 10637adac0..d3fbdfdfb6 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -546,7 +546,8 @@ object Source {
*/
@deprecated("Use variant accepting completion and failure matchers", "Akka
2.6.0")
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy):
Source[T, ActorRef] =
- new Source(scaladsl.Source.actorRef({
+ new Source(scaladsl.Source.actorRef(
+ {
case pekko.actor.Status.Success(s: CompletionStrategy) => s
case pekko.actor.Status.Success(_) =>
CompletionStrategy.Draining
case pekko.actor.Status.Success =>
CompletionStrategy.Draining
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 623bd2a2fb..a39a15df59 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -710,7 +710,8 @@ object Source {
*/
@deprecated("Use variant accepting completion and failure matchers instead",
"Akka 2.6.0")
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy):
Source[T, ActorRef] =
- actorRef({
+ actorRef(
+ {
case pekko.actor.Status.Success(s: CompletionStrategy) => s
case pekko.actor.Status.Success(_) =>
CompletionStrategy.Draining
case pekko.actor.Status.Success =>
CompletionStrategy.Draining
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]