This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new c2a969af47 Port akka-core#31777: O(1) mandatory attribute lookup via
map index (#2758)
c2a969af47 is described below
commit c2a969af478e08676a36d70ea5df1f185ee384ac
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Mar 20 19:32:37 2026 +0100
Port akka-core#31777: O(1) mandatory attribute lookup via map index (#2758)
* Initial plan
* Port akka-core PR #31777: optimize mandatory attribute lookup with O(1)
map
Co-authored-by: pjfanning <[email protected]>
* Fix unapply return type to not use unnecessary tuple wrapper
Co-authored-by: pjfanning <[email protected]>
* mima
* Update Attributes.scala
* Update optimized-mandatory-attributes.excludes
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../optimized-mandatory-attributes.excludes | 23 ++++
.../scala/org/apache/pekko/stream/Attributes.scala | 131 +++++++++++++++++----
2 files changed, 128 insertions(+), 26 deletions(-)
diff --git
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/optimized-mandatory-attributes.excludes
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/optimized-mandatory-attributes.excludes
new file mode 100644
index 0000000000..b38fc19c8d
--- /dev/null
+++
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/optimized-mandatory-attributes.excludes
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Optimized mandatory attributes
+ProblemFilters.exclude[FinalClassProblem]("org.apache.pekko.stream.Attributes$NestedMaterializationCancellationPolicy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Attributes.fromProduct")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.Attributes.unapply")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Attributes._1")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.stream.Attributes$")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
index 28a94b9ca7..838eedc52b 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
@@ -17,6 +17,7 @@ import java.net.URLEncoder
import java.time.Duration
import java.util.Optional
+import scala.annotation.nowarn
import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
import scala.jdk.DurationConverters._
@@ -31,7 +32,7 @@ import pekko.annotation.InternalApi
import pekko.event.Logging
import pekko.japi.function
import pekko.stream.impl.TraversalBuilder
-import pekko.util.{ ByteString, OptionVal }
+import pekko.util.ByteString
import pekko.util.LineNumbers
/**
@@ -45,11 +46,36 @@ import pekko.util.LineNumbers
*
* Operators should in general not access the `attributeList` but instead use
`get` to get the expected
* value of an attribute.
+ *
+ * Constructor is internal API, use factories in companion to create instances.
*/
-final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
+final class Attributes private[pekko] (
+ val attributeList: List[Attributes.Attribute],
+ private val mandatoryAttributes: Map[Class[AnyRef],
Attributes.MandatoryAttribute])
+ extends scala.Product
+ with scala.Serializable
+ with scala.Equals {
import Attributes._
+ // for binary compatibility (used to be a case class)
+ @deprecated("Use factories on companion object instead", since = "2.0.0")
+ @nowarn("msg=deprecated")
+ def this(attributeList: List[Attributes.Attribute] = Nil) =
+ this(
+ attributeList,
+ attributeList.reverseIterator
+ .foldLeft(Map.newBuilder[Class[AnyRef],
Attributes.MandatoryAttribute]) {
+ case (builder, attribute) =>
+ attribute match {
+ case m: Attributes.MandatoryAttribute =>
+ builder += (m.getClass.asInstanceOf[Class[AnyRef]] -> m)
+ builder
+ case _ => builder
+ }
+ }
+ .result())
+
/**
* Note that this must only be used during traversal building and not during
materialization
* as it will then always return true because of the defaults from the
ActorMaterializerSettings
@@ -119,6 +145,8 @@ final case class Attributes(attributeList:
List[Attributes.Attribute] = Nil) {
/**
* Scala API: Get the most specific of one of the mandatory attributes.
Mandatory attributes are guaranteed
* to always be among the attributes when the attributes are coming from a
materialization.
+ *
+ * Note: looks for the exact mandatory attribute class, hierarchies of the
same mandatory attribute not supported
*/
def mandatoryAttribute[T <: MandatoryAttribute: ClassTag]: T = {
val c = classTag[T].runtimeClass.asInstanceOf[Class[T]]
@@ -129,20 +157,16 @@ final case class Attributes(attributeList:
List[Attributes.Attribute] = Nil) {
* Java API: Get the most specific of one of the mandatory attributes.
Mandatory attributes are guaranteed
* to always be among the attributes when the attributes are coming from a
materialization.
*
+ * Note: looks for the exact mandatory attribute class, hierarchies of the
same mandatory attribute not supported
+ *
* @param c A class that is a subtype of [[MandatoryAttribute]]
*/
def getMandatoryAttribute[T <: MandatoryAttribute](c: Class[T]): T = {
- @tailrec
- def find(list: List[Attribute]): OptionVal[Attribute] = list match {
- case Nil => OptionVal.None
- case head :: tail =>
- if (c.isInstance(head)) OptionVal.Some(head)
- else find(tail)
- }
-
- find(attributeList) match {
- case OptionVal.Some(t) => t.asInstanceOf[T]
- case _ => throw new IllegalStateException(s"Mandatory
attribute [$c] not found")
+ try {
+ mandatoryAttributes(c.asInstanceOf[Class[AnyRef]]).asInstanceOf[T]
+ } catch {
+ case _: NoSuchElementException =>
+ throw new IllegalStateException(s"Mandatory attribute [$c] not found")
}
}
@@ -153,16 +177,30 @@ final case class Attributes(attributeList:
List[Attributes.Attribute] = Nil) {
def and(other: Attributes): Attributes = {
if (attributeList.isEmpty) other
else if (other.attributeList.isEmpty) this
- else if (other.attributeList.tail.isEmpty)
Attributes(other.attributeList.head :: attributeList)
- else Attributes(other.attributeList ::: attributeList)
+ else if (other.attributeList.tail.isEmpty) {
+ // note the inverted order for attributes vs mandatory values here
+ val newAttributes = other.attributeList.head :: attributeList
+ val newMandatory = this.mandatoryAttributes ++ other.mandatoryAttributes
+ new Attributes(newAttributes, newMandatory)
+ } else {
+ val newAttributes = other.attributeList ::: attributeList
+ val newMandatory = this.mandatoryAttributes ++ other.mandatoryAttributes
+ new Attributes(newAttributes, newMandatory)
+ }
}
/**
* Adds given attribute. Added attribute is considered more specific than
* already existing attributes of the same type.
*/
- def and(other: Attribute): Attributes =
- Attributes(other :: attributeList)
+ def and(other: Attribute): Attributes = {
+ other match {
+ case m: MandatoryAttribute =>
+ new Attributes(other :: attributeList, mandatoryAttributes +
(m.getClass.asInstanceOf[Class[AnyRef]] -> m))
+ case regular =>
+ new Attributes(regular :: attributeList, mandatoryAttributes)
+ }
+ }
/**
* Extracts Name attributes and concatenates them.
@@ -298,6 +336,36 @@ final case class Attributes(attributeList:
List[Attributes.Attribute] = Nil) {
attributeList.reverseIterator.collectFirst { case attr if
c.isInstance(attr) => c.cast(attr) }
}
+ // for binary compatibility (used to be a case class)
+
+ @deprecated("Use explicit methods on Attributes to interact, not the ones
provided by Product", "2.0.0")
+ override def productArity: Int = 1
+
+ @deprecated("Use explicit methods on Attributes to interact, not the ones
provided by Product", "2.0.0")
+ override def productElement(n: Int): Any = n match {
+ case 0 => attributeList
+ case _ => throw new IllegalArgumentException()
+ }
+
+ @deprecated("Don't use copy on Attributes", "2.0.0")
+ @nowarn("msg=deprecated")
+ def copy(attributeList: List[Attribute] = attributeList): Attributes =
+ new Attributes(attributeList)
+
+ override def canEqual(that: Any): Boolean = that.isInstanceOf[Attributes]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: Attributes =>
+ attributeList == that.attributeList &&
+ mandatoryAttributes == that.mandatoryAttributes
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val state = Seq(attributeList, mandatoryAttributes)
+ state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+
}
/**
@@ -315,6 +383,17 @@ object Attributes {
@DoNotInherit
sealed trait MandatoryAttribute extends Attribute
+ def apply(): Attributes = new Attributes(Nil, Map.empty)
+
+ @nowarn("msg=deprecated")
+ def apply(attributeList: List[Attribute] = Nil): Attributes = new
Attributes(attributeList)
+
+ // for binary compatibility
+
+ @deprecated("Use explicit methods on Attributes to interact, not the
synthetic case class ones", "2.0.0")
+ def unapply(attrs: Attributes): Option[List[Attribute]] =
+ Some(attrs.attributeList)
+
final case class Name(n: String) extends Attribute
/**
@@ -561,9 +640,12 @@ object Attributes {
* , otherwise these stages will immediately cancel without materializing
the nested flow.
*/
@ApiMayChange
- class NestedMaterializationCancellationPolicy
private[NestedMaterializationCancellationPolicy] (
- val propagateToNestedMaterialization: Boolean)
- extends MandatoryAttribute
+ final class NestedMaterializationCancellationPolicy
private[NestedMaterializationCancellationPolicy] (
+ val propagateToNestedMaterialization: Boolean,
+ name: String)
+ extends MandatoryAttribute {
+ override def toString: String = name
+ }
@ApiMayChange
object NestedMaterializationCancellationPolicy {
@@ -575,9 +657,7 @@ object Attributes {
* This applies to [[pekko.stream.scaladsl.FlowOps.flatMapPrefix]],
[[pekko.stream.scaladsl.Flow.futureFlow]] and derived operators.
*/
val EagerCancellation: NestedMaterializationCancellationPolicy =
- new NestedMaterializationCancellationPolicy(false) {
- override def toString: String = "EagerCancellation"
- }
+ new NestedMaterializationCancellationPolicy(false, "EagerCancellation")
/**
* A [[NestedMaterializationCancellationPolicy]] that configures graph
stages
@@ -585,9 +665,8 @@ object Attributes {
* nested flow materialization. Once the nested flow is materialized it
will be cancelled immediately.
* This applies to [[pekko.stream.scaladsl.FlowOps.flatMapPrefix]],
[[pekko.stream.scaladsl.Flow.futureFlow]] and derived operators.
*/
- val PropagateToNested: NestedMaterializationCancellationPolicy = new
NestedMaterializationCancellationPolicy(true) {
- override def toString: String = "PropagateToNested"
- }
+ val PropagateToNested: NestedMaterializationCancellationPolicy =
+ new NestedMaterializationCancellationPolicy(true, "PropagateToNested")
/**
* Default [[NestedMaterializationCancellationPolicy]],
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]