This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bff1b4e1f fix: align BroadcastPartition routing in LinkConfig with 
broadcast semantics (#5032)
5bff1b4e1f is described below

commit 5bff1b4e1f2bb4dddcf5febb19a8c00cafaef0e0
Author: Matthew B. <[email protected]>
AuthorDate: Tue May 12 19:53:31 2026 -0700

    fix: align BroadcastPartition routing in LinkConfig with broadcast 
semantics (#5032)
---
 .../scheduling/config/LinkConfig.scala             |  7 +++--
 .../scheduling/config/LinkConfigSpec.scala         | 31 +++++++++++++++-------
 2 files changed, 24 insertions(+), 14 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala
index ef0117834c..ef92afef59 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala
@@ -72,10 +72,9 @@ case object LinkConfig {
       case BroadcastPartition() =>
         BroadcastPartitioning(
           dataTransferBatchSize,
-          fromWorkerIds.zip(toWorkerIds).map {
-            case (fromWorkerId, toWorkerId) =>
-              ChannelIdentity(fromWorkerId, toWorkerId, isControl = false)
-          }
+          fromWorkerIds.flatMap(fromId =>
+            toWorkerIds.map(toId => ChannelIdentity(fromId, toId, isControl = 
false))
+          )
         )
 
       case UnknownPartition() =>
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
index 8a5f2b8652..7dd8a3de08 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
@@ -144,13 +144,7 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers {
 
   // ----- BroadcastPartition -----
 
-  "BroadcastPartition" should "produce a BroadcastPartitioning whose channels 
follow zip pairing today (current behavior)" in {
-    // Pin: BroadcastPartition currently uses `fromWorkerIds.zip(toWorkerIds)`
-    // — the SAME 1:1 pairing as OneToOnePartition. ChannelConfig in the same
-    // package emits a full cross product for the BroadcastPartition arm,
-    // which matches broadcast semantics ("each sender targets every
-    // receiver"). The two helpers diverge today; pinning this so a fix that
-    // realigns the contract surfaces here. Filed as a Bug.
+  "BroadcastPartition" should "produce a BroadcastPartitioning with the full 
sender x receiver cross product" in {
     val out = LinkConfig.toPartitioning(
       List(w1, w2, w3),
       List(u1, u2, u3),
@@ -160,10 +154,20 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers {
     out shouldBe a[BroadcastPartitioning]
     val bp = out.asInstanceOf[BroadcastPartitioning]
     bp.batchSize shouldBe batch
-    endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"), ("w3", 
"u3"))
+    endpoints(bp.channels) shouldBe Seq(
+      ("w1", "u1"),
+      ("w1", "u2"),
+      ("w1", "u3"),
+      ("w2", "u1"),
+      ("w2", "u2"),
+      ("w2", "u3"),
+      ("w3", "u1"),
+      ("w3", "u2"),
+      ("w3", "u3")
+    )
   }
 
-  it should "silently truncate broadcast pairings when sides differ in length 
(current behavior)" in {
+  it should "emit the full cross product even when sender and receiver counts 
differ" in {
     val out = LinkConfig.toPartitioning(
       List(w1, w2, w3),
       List(u1, u2),
@@ -171,7 +175,14 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers {
       batch
     )
     val bp = out.asInstanceOf[BroadcastPartitioning]
-    endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"))
+    endpoints(bp.channels) shouldBe Seq(
+      ("w1", "u1"),
+      ("w1", "u2"),
+      ("w2", "u1"),
+      ("w2", "u2"),
+      ("w3", "u1"),
+      ("w3", "u2")
+    )
   }
 
   // ----- UnknownPartition -----

Reply via email to