This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-materialization
by this push:
new bd1bc287ff fix: keep only materialization change in region coordinator
bd1bc287ff is described below
commit bd1bc287ff71163bf8b1ee7da7fe7d6151cd4180
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 23 01:05:16 2026 -0700
fix: keep only materialization change in region coordinator
---
AGENTS.md | 76 ++++++++++++++++++++++
.../scheduling/RegionExecutionCoordinator.scala | 55 +++-------------
2 files changed, 86 insertions(+), 45 deletions(-)
diff --git a/AGENTS.md b/AGENTS.md
new file mode 100644
index 0000000000..4038ee1733
--- /dev/null
+++ b/AGENTS.md
@@ -0,0 +1,76 @@
+# AGENTS.md (Texera)
+
+Quick orientation for agents working on Apache Texera (Incubating). Pair this
file with the root `README.md` and the developer wiki linked from it.
+
+## Big picture (modules + service boundaries)
+- Repo is an **sbt multi-project Scala backend** (Scala 2.13.12, JDK-based)
plus an **Angular frontend** under `frontend/`.
+- Backend services (Dropwizard/Jersey) live at the top level and share code
under `common/`. The sbt module graph is defined in `build.sbt`; note that sbt
project names do not always match folder names (e.g., `amber/` is the sbt
project `WorkflowExecutionService`).
+- Shared libraries (`common/`):
+ - `common/dao` (DAO), `common/config` (Config), `common/auth` (Auth, JWT
setup), `common/workflow-core` (WorkflowCore), `common/workflow-operator`
(WorkflowOperator, operator definitions and descriptors), `common/pybuilder`
(PyBuilder — a `pyb"..."` macro DSL for composing Python code for Python
operators).
+- Services (top-level folders):
+ - `amber/` — main web application + workflow execution engine ("Amber"
actor-based dataflow runtime). Serves the Angular GUI, REST API, and the
collaboration WebSocket.
+ - `workflow-compiling-service/` — compiles workflow JSON into executable
plans.
+ - `file-service/` — datasets/files (works with LakeFS / Iceberg catalogs,
see `sql/`).
+ - `config-service/` — runtime configuration.
+ - `access-control-service/` — ACL + the AI assistant chat/completion
endpoints.
+ - `computing-unit-managing-service/` — lifecycle of compute units
(master/worker pods, scaling).
+ - `pyright-language-service/` — Pyright-backed language server for Python
UDF editing.
+- Python runtime companion: `amber/src/main/python/` (`pytexera`, `pyamber`,
`core`, `proto`, `texera_run_python_worker.py`) — used by Python workers
spawned by the Amber engine.
+- A secondary, source-only copy of the operator library lives at
`core/workflow-operator/src/...` (legacy / build artifact source; prefer
`common/workflow-operator` for new code).
+- Build note: `build.sbt` injects ASF licensing files (`LICENSE`, `NOTICE`,
`DISCLAIMER-WIP`) into `META-INF/` of every JAR via `asfLicensingSettings`.
+
+## Service port map (default config)
+| Service | App port | Admin port | Source config |
+| --- | --- | --- | --- |
+| amber (`TexeraWebApplication`) | 8080 | 8081 |
`amber/src/main/resources/web-config.yml` |
+| workflow-compiling-service | 9090 | — |
`workflow-compiling-service/src/main/resources/workflow-compiling-service-config.yaml`
|
+| file-service | 9092 | — |
`file-service/src/main/resources/file-service-web-config.yaml` |
+| config-service | 9094 | — |
`config-service/src/main/resources/config-service-web-config.yaml` |
+| access-control-service (AI assistant, models) | 9096 | — |
`access-control-service/src/main/resources/access-control-service-web-config.yaml`
|
+| computing-unit-managing-service | 8888 | 8082 |
`computing-unit-managing-service/src/main/resources/computing-unit-managing-service-config.yaml`
|
+| WebSocket (collaboration, `/wsapi`) | 8085 | — | served by `amber` |
+| y-websocket (shared editing `/rtc`) | 1234 | — |
`bin/shared-editing-server.sh`, `bin/y-websocket-server/` |
+
+Frontend dev proxy routing (`frontend/proxy.config.json`) mirrors this split —
e.g., `/api/compile` → 9090, `/api/dataset` → 9092, `/api/config/**` → 9094,
`/api/models` and `/api/chat/completion` → 9096, `/api/computing-unit` → 8888,
everything else `/api` → 8080.
+
+## Runtime flow & cross-component communication
+- **REST base path:** every service mounts Jersey at `/api/*`
(`environment.jersey.setUrlPattern("/api/*")` in each `Application.run`).
+- **Web GUI serving:** `amber` serves Angular static output via
`FileAssetsBundle("../../frontend/dist", "/", "index.html")` and redirects 404s
to `/` so Angular client-side routing works (`TexeraWebApplication.scala`).
+- **WebSockets:** collaboration is wired through Dropwizard
`WebsocketBundle(classOf[CollaborationResource])`; the Jetty WS idle timeout is
explicitly set to 1 hour via `WebSocketUpgradeFilter` in
`TexeraWebApplication.run(...)`.
+- **Database init pattern:** services call
`SqlServer.initConnection(StorageConfig.jdbcUrl, ...)` during startup (see
`TexeraWebApplication.scala`, `WorkflowCompilingService.scala`, etc.). DDL
lives in `sql/texera_ddl.sql`; Iceberg / LakeFS / Lakekeeper bootstrap SQL is
also under `sql/`.
+- **Auth:** JWT auth is installed via `setupJwtAuth(environment)` in `amber`,
plus `AuthValueFactoryProvider.Binder[SessionUser]` and
`RolesAllowedDynamicFeature`. Resources under `.../resource/auth/`
(`AuthResource`, `GoogleAuthResource`) own login;
`AuthResource.createAdminUser()` runs at startup.
+- **Request logging filter:** every service adds a Jetty request-log filter
that logs through SLF4J logger `org.eclipse.jetty.server.RequestLog` (level
controlled by env var `TEXERA_SERVICE_LOG_LEVEL`). Note the servlet-API split:
`amber` currently uses `javax.servlet.*` while `workflow-compiling-service`
(and other newer services) use `jakarta.servlet.*`. There is a TODO to
consolidate onto `common/auth`'s `RequestLoggingFilter.register()` once `amber`
upgrades to Dropwizard 4.x.
+- **Config loading:** every service uses `SubstitutingSourceProvider` +
`EnvironmentVariableSubstitutor(false)` so YAML configs support `${ENV_VAR}`
expansion.
+
+## Where to make changes (project-specific conventions)
+- **New backend endpoint:** create a Jersey `*Resource` under
`<service>/src/main/scala/.../resource/` and **register it in that service's
`Application.run(...)`** via
`environment.jersey.register(classOf[YourResource])`. `amber`'s
`TexeraWebApplication.run(...)` already registers a long list — `AuthResource`,
`WorkflowResource`, `DashboardResource`, `ProjectResource`, `HubResource`,
`GmailResource`, `AIAssistantResource`, etc. Follow that pattern; don't rely on
classpath scanning.
+- **Shared backend logic** belongs in `common/*` (honor the dependency graph
in `build.sbt`): `common/dao` for DB, `common/config` for config, `common/auth`
for auth, `common/workflow-core` + `common/workflow-operator` for dataflow
model/operators, `common/pybuilder` for Python code generation.
+- **New operators:** add to `common/workflow-operator/...`; Python-backed
operators typically use `common/pybuilder` and interact with the Python worker
under `amber/src/main/python/`.
+- **Frontend code** is isolated under `frontend/src/` (Angular, yarn-managed).
`amber` only serves the built output from `frontend/dist`. The app modules live
in `frontend/src/app/{common,dashboard,hub,workspace}`.
+- When adding Jackson-touched types, respect the per-service Jackson
`dependencyOverrides` already in `build.sbt` — different services pin different
versions for Dropwizard 3 vs 4 compatibility.
+
+## Critical developer workflows
+- **Build everything:** `bin/build.sh` runs `bin/build-services.sh` (which
runs `sbt clean dist` and unzips the `target/universal/*.zip` artifacts into
per-service `target/` dirs) and then `bin/frontend.sh` (`yarn install && yarn
run build` in `frontend/`).
+- **Run locally after build:**
+ - `bin/server.sh` — starts `amber` from
`amber/target/texera-*/bin/texera-web-application`.
+ - `bin/workflow-compiling-service.sh`, `bin/file-service.sh`,
`bin/config-service.sh`, `bin/computing-unit-managing-service.sh`,
`bin/workflow-computing-unit.sh` — start the other services from their unzipped
`target/` dirs.
+ - `bin/frontend-dev.sh` — frontend dev server with the proxy config above.
+ - `bin/shared-editing-server.sh` — y-websocket server for `/rtc`
collaboration.
+ - `bin/python-language-service.sh` / `bin/pylsp/` — Python language service.
+- **Docker images:** Dockerfiles in `bin/*.dockerfile` **must be built from
the repo root** as context (see `bin/README.md`). Example: `docker build -f
bin/texera-web-application.dockerfile -t your-repo/texera-web-application:test
.`. Helpers: `bin/build-images.sh`, `bin/merge-image-tags.sh`.
+- **Deployment references:** single-node Docker Compose at
`bin/single-node/docker-compose.yml` (+ `nginx.conf`, `examples/`); Kubernetes
Helm chart at `bin/k8s/` (`Chart.yaml`, `values.yaml`,
`values-development.yaml`, `templates/`).
+- **Formatting/lint:** `bin/fix-format.sh`; Scalafmt config at
`.scalafmt.conf`, Scalafix at `.scalafix.conf`.
+- **Proto codegen:** `bin/python-proto-gen.sh`, `bin/frontend-proto-gen.sh`.
+- **Service entrypoints** typically call `new <Service>().run("server",
<path-to-yaml>)`. The YAML path either resolves via `TEXERA_HOME` (e.g.,
`WorkflowCompilingService`) or via `Utils.amberHomePath` (e.g.,
`TexeraWebApplication`).
+
+## Map of the code (high-signal entrypoints)
+- sbt module graph and ASF licensing task: `build.sbt`
+- `amber` web app + REST registration + GUI serving + WebSocket + JWT:
`amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala`
+- Amber dataflow engine:
`amber/src/main/scala/org/apache/texera/amber/engine/architecture/{controller,worker,pythonworker,scheduling,messaginglayer,sendsemantics,deploysemantics,logreplay,common}/`
+- Compilation service:
`workflow-compiling-service/src/main/scala/org/apache/texera/service/WorkflowCompilingService.scala`
+- Other service entrypoints:
`*/src/main/scala/org/apache/texera/service/*Service.scala` (`FileService`,
`ConfigService`, `AccessControlService`, `ComputingUnitManagingService`)
+- Dashboard/user/hub/admin/AI resources:
`amber/src/main/scala/org/apache/texera/web/resource/` (top-level + `auth/`,
`aiassistant/`, `dashboard/{admin,hub,user}/`)
+- Python worker + pytexera SDK: `amber/src/main/python/`
+- Deployment artifacts: `bin/*.dockerfile`, `bin/*.sh`, `bin/single-node/`,
`bin/k8s/`
+- SQL DDL and catalog bootstrap: `sql/texera_ddl.sql`,
`sql/texera_lakefs.sql`, `sql/texera_lakekeeper.sql`,
`sql/iceberg_postgres_catalog.sql`, `sql/updates/`
+- Root docs: `README.md` (links to developer wiki), `CONTRIBUTING.md`,
`SECURITY.md`, `DISCLAIMER-WIP`.
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 5be5d942e5..6600721e93 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -20,7 +20,7 @@
package org.apache.texera.amber.engine.architecture.scheduling
import org.apache.pekko.pattern.gracefulStop
-import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer,
Return, Throw, Timer}
+import com.twitter.util.{Future, Return, Throw}
import org.apache.texera.amber.core.state.State
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
@@ -62,7 +62,7 @@ import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutions
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
-import scala.concurrent.duration.{Duration => ScalaDuration}
+import scala.concurrent.duration.Duration
/**
* The executor of a region.
@@ -110,14 +110,10 @@ class RegionExecutionCoordinator(
private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new
AtomicReference(
Unexecuted
)
- private val terminationFutureRef: AtomicReference[Future[Unit]] = new
AtomicReference(null)
- private val killRetryTimer: Timer = new JavaTimer(true)
- private val killRetryDelay: TwitterDuration =
TwitterDuration.fromMilliseconds(200)
/**
* Sync the status of `RegionExecution` and transition this coordinator's
phase to `Completed` only when the
- * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the
ports of this region are completed, and
- * all workers in this region are terminated.
+ * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the
ports of this region are completed.
*
* Additionally, this method will also terminate all the workers of this
region:
*
@@ -140,22 +136,12 @@ class RegionExecutionCoordinator(
return Future.Unit
}
- val existingTerminationFuture = terminationFutureRef.get
- if (existingTerminationFuture != null) {
- existingTerminationFuture
- } else {
- val terminationFuture =
terminateWorkersWithRetry(regionExecution).flatMap { _ =>
- // Set this coordinator's status to be completed so that subsequent
regions can be started by
- // WorkflowExecutionCoordinator.
- setPhase(Completed)
- Future.Unit
- }
- if (terminationFutureRef.compareAndSet(null, terminationFuture)) {
- terminationFuture
- } else {
- terminationFutureRef.get
- }
- }
+ // Set this coordinator's status to be completed so that subsequent
regions can be started by
+ // WorkflowExecutionCoordinator.
+ setPhase(Completed)
+
+ // Terminate all the workers in this region.
+ terminateWorkers(regionExecution)
}
private def terminateWorkers(regionExecution: RegionExecution) = {
@@ -182,7 +168,7 @@ class RegionExecutionCoordinator(
val actorRef = actorRefService.getActorRef(workerId)
// Remove the actorRef so that no other actors can find the
worker and send messages.
actorRefService.removeActorRef(workerId)
- gracefulStop(actorRef, ScalaDuration(5,
TimeUnit.SECONDS)).asTwitter()
+ gracefulStop(actorRef, Duration(5,
TimeUnit.SECONDS)).asTwitter()
}
}.toSeq
@@ -206,29 +192,8 @@ class RegionExecutionCoordinator(
}
}
- private def terminateWorkersWithRetry(
- regionExecution: RegionExecution,
- attempt: Int = 1
- ): Future[Unit] = {
- terminateWorkers(regionExecution).rescue { case err =>
- logger.warn(
- s"Failed to terminate region ${region.id.id} on attempt $attempt.
Retrying in ${killRetryDelay.inMilliseconds} ms.",
- err
- )
- Future
- .sleep(killRetryDelay)(killRetryTimer)
- .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1))
- }
- }
-
def isCompleted: Boolean = currentPhaseRef.get == Completed
- /**
- * Returns the region termination future if termination has been initiated.
- * This is only set by `tryCompleteRegionExecution()`.
- */
- def getTerminationFutureOpt: Option[Future[Unit]] =
Option(terminationFutureRef.get)
-
/**
* This will sync and transition the region execution phase from one to
another depending on its current phase:
*