This is an automated email from the ASF dual-hosted git repository.
roiocam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new b0e9886439 chore: avoid the double evaluation of entityId in
ClusterSharding (#1304)
b0e9886439 is described below
commit b0e9886439bf216dace9e2979ea820521ddd2a63
Author: AndyChen(Jingzhang) <[email protected]>
AuthorDate: Wed Jun 5 23:23:33 2024 +0800
chore: avoid the double evaluation of entityId in ClusterSharding (#1304)
* chore: avoid the double evaluation of entityId in ClusterSharding
* new cacheable partial function
* optimized for review
* fix the right type
---
.../typed/internal/ClusterShardingImpl.scala | 15 ++++++++---
.../pekko/cluster/sharding/ClusterSharding.scala | 31 +++++++++++++++-------
2 files changed, 33 insertions(+), 13 deletions(-)
diff --git
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala
index 026611912c..525718d227 100644
---
a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala
+++
b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/internal/ClusterShardingImpl.scala
@@ -20,6 +20,7 @@ import java.util.concurrent.CompletionStage
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
+import scala.runtime.AbstractPartialFunction
import org.apache.pekko
import pekko.actor.ActorRefProvider
@@ -172,10 +173,16 @@ import pekko.util.JavaDurationConverters._
allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] = {
val extractorAdapter = new ExtractorAdapter(extractor)
- val extractEntityId: ShardRegion.ExtractEntityId = {
- // TODO is it possible to avoid the double evaluation of entityId
- case message if extractorAdapter.entityId(message) != null =>
- (extractorAdapter.entityId(message),
extractorAdapter.unwrapMessage(message))
+ // !!!important is only applicable if you know that isDefinedAt(x) is
always called before apply(x) (with the same x)
+ val extractEntityId: ShardRegion.ExtractEntityId = new
AbstractPartialFunction[Any, (String, Any)] {
+ var cache: String = _
+
+ override def isDefinedAt(msg: Any): Boolean = {
+ cache = extractorAdapter.entityId(msg)
+ cache != null
+ }
+
+ override def apply(x: Any): (String, Any) = (cache,
extractorAdapter.unwrapMessage(x))
}
val extractShardId: ShardRegion.ExtractShardId = { message =>
extractorAdapter.entityId(message) match {
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
index 9278839da3..835d2fcfbb 100755
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala
@@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.immutable
import scala.concurrent.Await
+import scala.runtime.AbstractPartialFunction
import scala.util.control.NonFatal
import org.apache.pekko
@@ -429,15 +430,26 @@ class ClusterSharding(system: ExtendedActorSystem)
extends Extension {
typeName,
_ => entityProps,
settings,
- extractEntityId = {
- case msg if messageExtractor.entityId(msg) ne null =>
- (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg))
- },
+ extractEntityId = extractEntityIdFromExtractor(messageExtractor),
extractShardId = msg => messageExtractor.shardId(msg),
allocationStrategy = allocationStrategy,
handOffStopMessage = handOffStopMessage)
}
+ // !!!important is only applicable if you know that isDefinedAt(x) is always
called before apply(x) (with the same x)
+ private def extractEntityIdFromExtractor(
+ messageExtractor: ShardRegion.MessageExtractor):
ShardRegion.ExtractEntityId =
+ new AbstractPartialFunction[Any, (String, Any)] {
+ var cache: String = _
+
+ override def isDefinedAt(msg: Any): Boolean = {
+ cache = messageExtractor.entityId(msg)
+ cache != null
+ }
+
+ override def apply(x: Any): (String, Any) = (cache,
messageExtractor.entityMessage(x))
+ }
+
/**
* Java/Scala API: Register a named entity type by defining the
[[pekko.actor.Props]] of the entity actor
* and functions to extract entity and shard identifier from messages. The
[[ShardRegion]] actor
@@ -612,11 +624,12 @@ class ClusterSharding(system: ExtendedActorSystem)
extends Extension {
dataCenter: Optional[String],
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
- startProxy(typeName, Option(role.orElse(null)),
Option(dataCenter.orElse(null)),
- extractEntityId = {
- case msg if messageExtractor.entityId(msg) ne null =>
- (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg))
- }, extractShardId = msg => messageExtractor.shardId(msg))
+ startProxy(
+ typeName,
+ Option(role.orElse(null)),
+ Option(dataCenter.orElse(null)),
+ extractEntityId = extractEntityIdFromExtractor(messageExtractor),
+ msg => messageExtractor.shardId(msg))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]