This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fb66747ab chore(amber): remove dead tuple-level breakpoint test and 
exception class (#4504)
2fb66747ab is described below

commit 2fb66747abd97589e8ee9c68bb02f6fcd84417bf
Author: Yicong Huang <[email protected]>
AuthorDate: Sat Apr 25 18:10:33 2026 -0700

    chore(amber): remove dead tuple-level breakpoint test and exception class 
(#4504)
    
    ### What changes were proposed in this PR?
    
    Two related deletions, all leftovers from the old Scala tuple-level
    breakpoint mechanism:
    
    1.
    **`amber/src/test/scala/.../breakpoint/ExceptionBreakpointSpec.scala`**
    — all 7 test cases commented out since #941 (2020-12-21). Subsequent
    commits only renamed packages.
    2.
    
**`amber/src/main/scala/.../common/amberexception/BreakpointException.scala`**
    — sole class in its package, zero references anywhere in `amber/` or
    `common/`.
    
    The old breakpoint system's other classes (`LocalBreakpoint`,
    `FaultedTuple`, `SkipTuple` (engine class, not `SkipTupleRequest`),
    `ModifyTuple`, `ExceptionBreakpoint`, `ReportGlobalBreakpointTriggered`)
    had already been removed from the codebase — `grep` confirmed zero
    remaining references.
    
    This is unrelated to the modern Python UDF debugger (Udon), which is
    covered by frontend specs.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4503
    
    ### How was this PR tested?
    
    Pure deletion. The test file had no active test cases, and
    `BreakpointException` had no callers (verified via `grep` across
    `amber/src` and `common/`). No behavior changes.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7)
---
 .../amberexception/BreakpointException.scala       |  24 --
 .../breakpoint/ExceptionBreakpointSpec.scala       | 298 ---------------------
 2 files changed, 322 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/amberexception/BreakpointException.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/amberexception/BreakpointException.scala
deleted file mode 100644
index 647a5248ec..0000000000
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/amberexception/BreakpointException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.engine.common.amberexception
-
-import org.apache.texera.amber.core.WorkflowRuntimeException
-
-class BreakpointException extends WorkflowRuntimeException("breakpoint 
triggered") {}
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/breakpoint/ExceptionBreakpointSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/breakpoint/ExceptionBreakpointSpec.scala
deleted file mode 100644
index 45f0d1bcb8..0000000000
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/breakpoint/ExceptionBreakpointSpec.scala
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.engine.architecture.breakpoint
-
-import org.apache.pekko.actor.ActorSystem
-import org.apache.pekko.event.LoggingAdapter
-import org.apache.pekko.testkit.{ImplicitSender, TestKit}
-import org.apache.pekko.util.Timeout
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.flatspec.AnyFlatSpecLike
-
-import scala.concurrent.ExecutionContextExecutor
-import scala.concurrent.duration._
-
-class ExceptionBreakpointSpec
-    extends TestKit(ActorSystem("PrincipalSpec"))
-    with ImplicitSender
-    with AnyFlatSpecLike
-    with BeforeAndAfterAll {
-
-  implicit val timeout: Timeout = Timeout(5.seconds)
-  implicit val executionContext: ExecutionContextExecutor = system.dispatcher
-  implicit val log: LoggingAdapter = system.log
-
-  //  private val logicalPlan1 =
-  //    """{
-  //      |"operators":[
-  //      
|{"tableName":"D:\\small_input.csv","operatorId":"Scan","operatorType":"LocalScanSource","delimiter":","},
-  //      
|{"attributeName":0,"keyword":"asia","operatorId":"KeywordSearch1","operatorType":"KeywordMatcher"},
-  //      |{"operatorId":"Sink","operatorType":"Sink"}],
-  //      |"links":[
-  //      |{"origin":"Scan","destination":"KeywordSearch1"},
-  //      |{"origin":"KeywordSearch1","destination":"Sink"}]
-  //      |}""".stripMargin
-  //
-  //  private val logicalPlan2 =
-  //    """{
-  //      |"operators":[
-  //      
|{"limit":10000,"delay":0,"operatorId":"Gen","operatorType":"Generate"},
-  //      |{"operatorId":"Count","operatorType":"Aggregation"},
-  //      |{"operatorId":"Sink","operatorType":"Sink"}],
-  //      |"links":[
-  //      |{"origin":"Gen","destination":"Count"},
-  //      |{"origin":"Count","destination":"Sink"}]
-  //      |}""".stripMargin
-  //
-  //  val workflowTag = WorkflowTag("sample")
-  //  var index = 0
-  //  val opTag: () => OperatorIdentifier = () => {
-  //    index += 1; OperatorIdentifier(workflowTag, index.toString)
-  //  }
-  //  val layerTag: () => LayerTag = () => { index += 1; LayerTag(opTag(), 
index.toString) }
-  //  val workerTag: () => WorkerTag = () => { index += 1; 
WorkerTag(layerTag(), index) }
-  //  val linkTag: () => LinkTag = () => { LinkTag(layerTag(), layerTag(), 0) }
-  //
-  //  def resultValidation(expectedTupleCount: Int, idleTime: Duration = 
2.seconds): Unit = {
-  //    var counter = 0
-  //    var receivedEnd = false
-  //    receiveWhile(5.minutes, idleTime) {
-  //      case DataMessage(seq, payload) => counter += payload.length
-  //      case EndSending(seq)           => receivedEnd = true
-  //      case msg                       =>
-  //    }
-  //    assert(counter == expectedTupleCount)
-  //    assert(receivedEnd)
-  //  }
-  //
-  //  override def beforeAll: Unit = {
-  //    system.actorOf(Props[SingleNodeListener], "cluster-info")
-  //  }
-  //
-  //  override def afterAll: Unit = {
-  //    TestKit.shutdownActorSystem(system)
-  //  }
-  //
-  //  "A workflow" should "be able to detect faulted tuples and trigger 
exception breakpoint in the workflow1, then skip them" in {
-  //    val parent = TestProbe()
-  //    val controller = parent.childActorOf(CONTROLLER.props(logicalPlan1))
-  //    controller ! AckedControllerInitialization
-  //    parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
-  //    controller ! Start
-  //    parent.expectMsg(ReportState(ControllerState.Running))
-  //    var isCompleted = false
-  //    parent.receiveWhile(30.seconds, 10.seconds) {
-  //      case ReportGlobalBreakpointTriggered(bp, opID) =>
-  //        for (i <- bp) {
-  //          log.info(
-  //            (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + " 
ERRORS: [" + i._2
-  //              .mkString(",") + "]"
-  //          )
-  //          AdvancedMessageSending.blockingAskWithRetry(i._1._1, 
SkipTuple(i._1._2), 5)
-  //        }
-  //        controller ! Resume
-  //      case ReportState(ControllerState.Paused) =>
-  //      case ReportState(ControllerState.Completed) =>
-  //        isCompleted = true
-  //      case _ =>
-  //    }
-  //    assert(isCompleted)
-  //    parent.ref ! PoisonPill
-  //  }
-  //
-  //  "A workflow" should "be able to detect faulted tuples and trigger 
exception breakpoint in the workflow1, then modify them" in {
-  //    val parent = TestProbe()
-  //    val controller = parent.childActorOf(CONTROLLER.props(logicalPlan1))
-  //    controller ! AckedControllerInitialization
-  //    parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
-  //    controller ! Start
-  //    parent.expectMsg(ReportState(ControllerState.Running))
-  //    var isCompleted = false
-  //    parent.receiveWhile(30.seconds, 10.seconds) {
-  //      case ReportGlobalBreakpointTriggered(bp, opID) =>
-  //        for (i <- bp) {
-  //          log.info(
-  //            (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + " 
ERRORS: [" + i._2
-  //              .mkString(",") + "]"
-  //          )
-  //          val fixed = new FaultedTuple(
-  //            ITuple("Asia", "Rwanda", "1", "0", "0", "0", "0", "0", "0", 
"12", "12", "120", "12"),
-  //            i._1._2.id,
-  //            i._1._2.isInput
-  //          )
-  //          AdvancedMessageSending.blockingAskWithRetry(i._1._1, 
ModifyTuple(fixed), 5)
-  //        }
-  //        controller ! Resume
-  //      case ReportState(ControllerState.Paused) =>
-  //      case ReportState(ControllerState.Completed) =>
-  //        isCompleted = true
-  //      case _ =>
-  //    }
-  //    assert(isCompleted)
-  //    parent.ref ! PoisonPill
-  //  }
-  //
-  //  "A workflow" should "be able to trigger conditional breakpoint in the 
workflow2, then resume them" in {
-  //    val parent = TestProbe()
-  //    val controller = parent.childActorOf(CONTROLLER.props(logicalPlan2))
-  //    controller ! AckedControllerInitialization
-  //    parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
-  //    controller ! PassBreakpointTo(
-  //      "Gen",
-  //      new ConditionalGlobalBreakpoint("ConditionalBreakpoint", x => 
x.getInt(0) % 1000 == 0)
-  //    )
-  //    controller ! Start
-  //    parent.expectMsg(ReportState(ControllerState.Running))
-  //    var isCompleted = false
-  //    parent.receiveWhile(30.seconds, 10.seconds) {
-  //      case ReportGlobalBreakpointTriggered(bp, opID) =>
-  //        for (i <- bp) {
-  //          log.info(
-  //            (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + " 
ERRORS: [" + i._2
-  //              .mkString(",") + "]"
-  //          )
-  //          AdvancedMessageSending.blockingAskWithRetry(i._1._1, 
ResumeTuple(i._1._2), 5)
-  //        }
-  //        controller ! Resume
-  //      case ReportState(ControllerState.Paused) =>
-  //      case ReportState(ControllerState.Completed) =>
-  //        isCompleted = true
-  //      case _ =>
-  //    }
-  //    assert(isCompleted)
-  //    parent.ref ! PoisonPill
-  //  }
-  //
-  //  "A workflow" should "be able to trigger conditional breakpoint in the 
workflow2, then skip them" in {
-  //    val parent = TestProbe()
-  //    val controller = parent.childActorOf(CONTROLLER.props(logicalPlan2))
-  //    controller ! AckedControllerInitialization
-  //    parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
-  //    controller ! PassBreakpointTo(
-  //      "Gen",
-  //      new ConditionalGlobalBreakpoint("ConditionalBreakpoint", x => 
x.getInt(0) % 1000 == 0)
-  //    )
-  //    controller ! Start
-  //    parent.expectMsg(ReportState(ControllerState.Running))
-  //    var isCompleted = false
-  //    parent.receiveWhile(30.seconds, 10.seconds) {
-  //      case ReportGlobalBreakpointTriggered(bp, opID) =>
-  //        for (i <- bp) {
-  //          log.info(
-  //            (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + " 
ERRORS: [" + i._2
-  //              .mkString(",") + "]"
-  //          )
-  //          AdvancedMessageSending.blockingAskWithRetry(i._1._1, 
SkipTuple(i._1._2), 5)
-  //        }
-  //        controller ! Resume
-  //      case ReportState(ControllerState.Paused) =>
-  //      case ReportState(ControllerState.Completed) =>
-  //        isCompleted = true
-  //      case _ =>
-  //    }
-  //    assert(isCompleted)
-  //    parent.ref ! PoisonPill
-  //  }
-  //
-  //  "A workflow" should "be able to trigger count breakpoint in the 
workflow2, then resume it" in {
-  //    val parent = TestProbe()
-  //    val controller = parent.childActorOf(CONTROLLER.props(logicalPlan2))
-  //    controller ! AckedControllerInitialization
-  //    parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
-  //    controller ! PassBreakpointTo("Gen", new 
CountGlobalBreakpoint("CountBreakpoint", 500))
-  //    controller ! Start
-  //    parent.expectMsg(ReportState(ControllerState.Running))
-  //    var isCompleted = false
-  //    parent.receiveWhile(30.seconds, 10.seconds) {
-  //      case ReportGlobalBreakpointTriggered(bp, opID) =>
-  //        for (i <- bp) {
-  //          log.info(
-  //            (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + " 
ERRORS: [" + i._2
-  //              .mkString(",") + "]"
-  //          )
-  //        }
-  //        controller ! Resume
-  //      case ReportState(ControllerState.Paused) =>
-  //      case ReportState(ControllerState.Completed) =>
-  //        isCompleted = true
-  //      case _ =>
-  //    }
-  //    assert(isCompleted)
-  //    parent.ref ! PoisonPill
-  //  }
-  //
-  //  "A workflow" should "be able to trigger conditional breakpoint in the 
workflow2, then resume it" in {
-  //    val parent = TestProbe()
-  //    val controller = parent.childActorOf(CONTROLLER.props(logicalPlan2))
-  //    controller ! AckedControllerInitialization
-  //    parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
-  //    controller ! PassBreakpointTo(
-  //      "Gen",
-  //      new ConditionalGlobalBreakpoint("ConditionalBreakpoint", x => 
x.getInt(0) % 1000 == 0)
-  //    )
-  //    controller ! Start
-  //    parent.expectMsg(ReportState(ControllerState.Running))
-  //    var isCompleted = false
-  //    parent.receiveWhile(30.seconds, 10.seconds) {
-  //      case ReportGlobalBreakpointTriggered(bp, opID) =>
-  //        for (i <- bp) {
-  //          log.info(
-  //            (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + " 
ERRORS: [" + i._2
-  //              .mkString(",") + "]"
-  //          )
-  //        }
-  //        controller ! Resume
-  //      case ReportState(ControllerState.Paused) =>
-  //      case ReportState(ControllerState.Completed) =>
-  //        isCompleted = true
-  //      case _ =>
-  //    }
-  //    assert(isCompleted)
-  //    parent.ref ! PoisonPill
-  //  }
-  //
-  //  "A workflow" should "be able to trigger count breakpoint in the 
workflow1, then resume it" in {
-  //    val parent = TestProbe()
-  //    val controller = parent.childActorOf(CONTROLLER.props(logicalPlan1))
-  //    controller ! AckedControllerInitialization
-  //    parent.expectMsg(30.seconds, ReportState(ControllerState.Ready))
-  //    controller ! PassBreakpointTo("KeywordSearch1", new 
CountGlobalBreakpoint("CountBreakpoint", 3))
-  //    controller ! Start
-  //    parent.expectMsg(ReportState(ControllerState.Running))
-  //    var isCompleted = false
-  //    parent.receiveWhile(3000.seconds, 1000.seconds) {
-  //      case ReportGlobalBreakpointTriggered(bp, opID) =>
-  //        for (i <- bp) {
-  //          log.info(
-  //            (if (i._1._2.isInput) "[IN]" else "[OUT]") + i._1._2.tuple + " 
ERRORS: [" + i._2
-  //              .mkString(",") + "]"
-  //          )
-  //        }
-  //        controller ! Resume
-  //      case ReportState(ControllerState.Paused) =>
-  //      case ReportState(ControllerState.Completed) =>
-  //        isCompleted = true
-  //      case _ =>
-  //    }
-  //    assert(isCompleted)
-  //    parent.ref ! PoisonPill
-  //  }
-
-}

Reply via email to