This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5682-dfa0434e70c256feb37a8d38c1d0bc134e789fdd in repository https://gitbox.apache.org/repos/asf/texera.git
commit 3242ac9e3f29d5ceac95443dde39a3aa81bfe3dd Author: Yicong Huang <[email protected]> AuthorDate: Sun Jun 14 20:18:07 2026 -0700 test(amber): move state-mat e2e into amber-integration (#5682) ### What changes were proposed in this PR? Two things: **1. Move the state-materialization e2e tests into `amber-integration`.** The two e2e specs that previously ran in the deleted `pyamber-state-materialization-mac` diagnostic job now run under the existing `amber-integration` job, which already provisions postgres + iceberg catalog DB + MinIO + Lakekeeper and runs `pytest -m integration` as its last step. The test's catalog backend switches from sqlite `SqlCatalog` to the real postgres-backed `JdbcCatalog`, matching `test_iceberg_document.py:45`, so we exercise the prod catalog code path instead of an sqlite shim. **2. Add macOS to the `amber-integration` matrix.** We want CI coverage that the integration stack actually runs on macOS (where most dev machines live), not just Linux. GitHub-hosted macOS runners have no Docker, so each docker-dependent step now branches on `$RUNNER_OS`: macOS provisions postgres / minio / lakekeeper natively (`brew install` + the upstream lakekeeper `aarch64-apple-darwin` tarball — published from v0.11.0 onward, same version as the Linux docker tag) while Linux keeps the existing docker path. Protoc on macOS uses brew's arm64-native protobuf because protoc 3.19.4 has no arm64-mac build and running its x86_64 binary under Rosetta breaks the `python_betterproto` plugin (arch / site-packages split between Rosetta'd protoc and arm64 setup-python). For proto3 sources, the plugin output depends on `betterproto`, not protoc, so the protoc version drift on macOS is benign for codegen. | Before | After | | --- | --- | | `pyamber-state-materialization-mac` macOS diagnostic job (build.yml:742) | deleted | | `SqlCatalog` (sqlite) injected in module fixture | real postgres-backed `JdbcCatalog`, matching `test_iceberg_document.py:45` | | Test discovered by an explicit `pytest -sv <path>` from that job | `@pytest.mark.integration` + picked up by `amber-integration`'s `pytest -m integration` | | `amber-integration` runs only on `ubuntu-22.04` | `amber-integration` runs on `[ubuntu-22.04, macos-latest]` with `$RUNNER_OS`-branched service provisioning | `StorageConfig.initialize` is wrapped in a class-scoped autouse fixture (rather than called at module import time) so it co-exists with `test_iceberg_document.py`'s import-time `initialize` regardless of pytest collection order. All catalog + S3 credentials read the same `STORAGE_*` env vars the production code consumes (via storage.conf), with defaults that match storage.conf — so the test stays aligned with whichever identity the surrounding CI infra uses. The unit-style `test_process_start_channel_persists_produce_state_on_start_output` that the deleted mac job also ran is untouched: it monkeypatches the output manager and is already picked up by the regular `pyamber` job's `pytest -m "not integration"` step. ### Any related issues, documentation, discussions? Closes #5681. ### How was this PR tested? Locally against the existing texera-dev infra (postgres on 5432 with `texera_iceberg_catalog` schema initialized via `sql/iceberg_postgres_catalog.sql`): ``` cd amber && pytest -m integration --junit-xml=/tmp/junit-integration.xml -sv # 3 passed, 502 deselected -- test_state_written_by_output_manager_is_replayed_by_reader, # test_state_table_persists_across_writer_close, # test_rest_catalog_round_trip ``` And the regular pyamber suite still green: ``` cd amber && pytest -m "not integration" -q # 502 passed, 3 deselected ``` In CI, the `amber-integration` job picks these tests up automatically because they're marked `@pytest.mark.integration`, on both `ubuntu-22.04` and `macos-latest`. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.7) --------- Signed-off-by: Yicong Huang <[email protected]> Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]> --- .github/workflows/build.yml | 218 ++++++++----- .../packaging/test_state_materialization_e2e.py | 355 +++++++++++---------- 2 files changed, 316 insertions(+), 257 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index aa2822e187..14f2429b63 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -316,25 +316,19 @@ jobs: # license check stay in `amber`; this job is tests-only. if: ${{ inputs.run_amber_integration }} strategy: + # macOS provisions postgres / minio / lakekeeper natively because + # GitHub-hosted macOS runners have no Docker (and `services:` + # containers are Linux-only). Each docker-dependent step below + # branches on $RUNNER_OS inside its `run:` script: Linux keeps + # the docker image, macOS uses brew + the upstream + # aarch64-apple-darwin lakekeeper tarball. matrix: - os: [ubuntu-22.04] + os: [ubuntu-22.04, macos-latest] java-version: [17] runs-on: ${{ matrix.os }} env: JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 - services: - postgres: - image: postgres - env: - POSTGRES_PASSWORD: postgres - ports: - - 5432:5432 - options: >- - --health-cmd="pg_isready -U postgres" - --health-interval=10s - --health-timeout=5s - --health-retries=5 steps: - name: Checkout uses: actions/checkout@v5 @@ -375,13 +369,59 @@ jobs: if [ -f amber/operator-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; fi if [ -f amber/dev-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi - name: Install protoc - # Version pinned in bin/protoc-version.txt. + # Linux pins protoc to the version in bin/protoc-version.txt via + # the upstream release zip. macOS uses brew's arm64-native + # protobuf instead because protoc 3.19.4 has no arm64-mac build + # and running the x86_64 binary under Rosetta breaks + # protoc-gen-python_betterproto (the plugin's shebang resolves + # to arm64-only setup-python, and the resulting arch/site-pkg + # split surfaces as a silent "plugin failed status 1"). For + # proto3 sources the python_betterproto plugin's output depends + # on betterproto, not protoc, so the version drift is benign + # for python-proto-gen.sh — bin/python-proto-gen.sh derives the + # include dir from `command -v protoc`, so brew's /opt/homebrew + # layout is picked up automatically. run: | - PROTOC_VERSION=$(cat bin/protoc-version.txt) - curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" - sudo unzip -o /tmp/protoc.zip -d /usr/local - sudo chmod +x /usr/local/bin/protoc - sudo chmod -R a+rX /usr/local/include/google + if [ "$RUNNER_OS" = "Linux" ]; then + PROTOC_VERSION=$(cat bin/protoc-version.txt) + curl -fsSL -o /tmp/protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" + sudo unzip -o /tmp/protoc.zip -d /usr/local + sudo chmod +x /usr/local/bin/protoc + sudo chmod -R a+rX /usr/local/include + else + brew install protobuf + fi + - name: Start Postgres + # Replaces the job-level `services.postgres` container, which + # GitHub only supports on Linux runners. Both branches end with + # postgres listening on localhost:5432 with a `postgres` superuser + # (password 'postgres') so the psql steps below stay OS-agnostic. + # macOS uses brew's pg_hba `trust` for 127.0.0.1, which means the + # password is effectively ignored — same effective auth as the + # Linux docker image here. + run: | + if [ "$RUNNER_OS" = "Linux" ]; then + docker run -d --name postgres \ + -p 5432:5432 \ + -e POSTGRES_PASSWORD=postgres \ + postgres + for i in $(seq 1 30); do + docker exec postgres pg_isready -U postgres -h localhost -q && break + echo "Waiting for Postgres... (attempt $i)" + sleep 1 + done + docker exec postgres pg_isready -U postgres -h localhost -q + else + brew install postgresql@16 + brew services start postgresql@16 + for i in $(seq 1 30); do + pg_isready -h localhost -q && break + echo "Waiting for Postgres... (attempt $i)" + sleep 1 + done + createuser -h localhost -s postgres + psql -h localhost -d postgres -c "ALTER USER postgres WITH PASSWORD 'postgres';" + fi - name: Create Databases run: | psql -h localhost -U postgres -f sql/texera_ddl.sql @@ -400,43 +440,79 @@ jobs: env: PGPASSWORD: postgres - name: Start MinIO + # Linux uses the pinned docker image; macOS uses brew's native + # arm64 binary, backgrounded via nohup and logged to /tmp/minio.log + # for post-mortem if the curl health check below fails. The brew + # version drifts from the Linux pin, but the tests only touch + # S3-protocol surface that has been stable across releases. run: | - docker run -d --name minio --network host \ - -e MINIO_ROOT_USER=texera_minio \ - -e MINIO_ROOT_PASSWORD=password \ - minio/minio:RELEASE.2025-02-28T09-55-16Z server /data - for i in $(seq 1 3); do + if [ "$RUNNER_OS" = "Linux" ]; then + docker run -d --name minio --network host \ + -e MINIO_ROOT_USER=texera_minio \ + -e MINIO_ROOT_PASSWORD=password \ + minio/minio:RELEASE.2025-02-28T09-55-16Z server /data + else + brew install minio/stable/minio + mkdir -p /tmp/minio-data + MINIO_ROOT_USER=texera_minio MINIO_ROOT_PASSWORD=password \ + nohup minio server /tmp/minio-data > /tmp/minio.log 2>&1 & + fi + for i in $(seq 1 15); do curl -sf http://localhost:9000/minio/health/live && break echo "Waiting for MinIO... (attempt $i)" sleep 1 done + curl -sf http://localhost:9000/minio/health/live - name: Start Lakekeeper + # Linux uses the v0.11.0 docker image; macOS downloads the + # same-version aarch64-apple-darwin tarball that upstream + # publishes alongside the linux image (v0.11.0 onward). Both + # branches run `migrate` then `serve`, then poll the binary's + # built-in `healthcheck` subcommand until ready. Failure dumps + # the container logs (Linux) or /tmp/lakekeeper.log (macOS). env: + LAKEKEEPER_VERSION: v0.11.0 LAKEKEEPER__PG_DATABASE_URL_READ: postgres://postgres:postgres@localhost:5432/texera_lakekeeper LAKEKEEPER__PG_DATABASE_URL_WRITE: postgres://postgres:postgres@localhost:5432/texera_lakekeeper LAKEKEEPER__PG_ENCRYPTION_KEY: texera_key + LAKEKEEPER__METRICS_PORT: "9091" run: | - docker run --rm --network host \ - -e LAKEKEEPER__PG_DATABASE_URL_READ \ - -e LAKEKEEPER__PG_DATABASE_URL_WRITE \ - -e LAKEKEEPER__PG_ENCRYPTION_KEY \ - vakamo/lakekeeper:v0.11.0 migrate - docker run -d --name lakekeeper --network host \ - -e LAKEKEEPER__PG_DATABASE_URL_READ \ - -e LAKEKEEPER__PG_DATABASE_URL_WRITE \ - -e LAKEKEEPER__PG_ENCRYPTION_KEY \ - -e LAKEKEEPER__METRICS_PORT=9091 \ - vakamo/lakekeeper:v0.11.0 serve - for i in $(seq 1 3); do - docker exec lakekeeper /home/nonroot/lakekeeper healthcheck && break + if [ "$RUNNER_OS" = "Linux" ]; then + docker run --rm --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY \ + vakamo/lakekeeper:${LAKEKEEPER_VERSION} migrate + docker run -d --name lakekeeper --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY \ + -e LAKEKEEPER__METRICS_PORT \ + vakamo/lakekeeper:${LAKEKEEPER_VERSION} serve + healthcheck() { docker exec lakekeeper /home/nonroot/lakekeeper healthcheck; } + on_fail() { echo "Lakekeeper failed to start. Container logs:"; docker logs lakekeeper; } + else + curl -fsSL -o /tmp/lakekeeper.tar.gz \ + "https://github.com/lakekeeper/lakekeeper/releases/download/${LAKEKEEPER_VERSION}/lakekeeper-aarch64-apple-darwin.tar.gz" + mkdir -p /tmp/lakekeeper-bin + tar -xzf /tmp/lakekeeper.tar.gz -C /tmp/lakekeeper-bin + LAKEKEEPER_BIN=$(find /tmp/lakekeeper-bin -type f -perm -u+x -name lakekeeper | head -1) + if [ -z "$LAKEKEEPER_BIN" ]; then + echo "Could not find lakekeeper binary in tarball:" + find /tmp/lakekeeper-bin -type f + exit 1 + fi + "$LAKEKEEPER_BIN" migrate + nohup "$LAKEKEEPER_BIN" serve > /tmp/lakekeeper.log 2>&1 & + healthcheck() { "$LAKEKEEPER_BIN" healthcheck; } + on_fail() { echo "Lakekeeper failed to start. Log:"; cat /tmp/lakekeeper.log; } + fi + for i in $(seq 1 15); do + healthcheck && break echo "Waiting for Lakekeeper... (attempt $i)" sleep 1 done - docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || { - echo "Lakekeeper failed to start. Container logs:" - docker logs lakekeeper - exit 1 - } + healthcheck || { on_fail; exit 1; } - name: Initialize Lakekeeper warehouse # Pull defaults out of storage.conf so this step doesn't duplicate # values that already live in the runtime config. Each scalar in @@ -461,9 +537,18 @@ jobs: LAKEKEEPER_BASE=${REST_URI%/catalog} LAKEKEEPER_BASE=${LAKEKEEPER_BASE%/} - docker run --rm --network host --entrypoint sh minio/mc -c \ - "mc alias set minio $S3_ENDPOINT $S3_USERNAME $S3_PASSWORD && \ - mc mb --ignore-existing minio/$S3_BUCKET" + # bucket creation runs through `mc`; on Linux we keep the + # minio/mc image, on macOS we use the brew-installed native CLI + # since docker is unavailable. + if [ "$RUNNER_OS" = "Linux" ]; then + docker run --rm --network host --entrypoint sh minio/mc -c \ + "mc alias set minio $S3_ENDPOINT $S3_USERNAME $S3_PASSWORD && \ + mc mb --ignore-existing minio/$S3_BUCKET" + else + brew install minio-mc + mc alias set minio "$S3_ENDPOINT" "$S3_USERNAME" "$S3_PASSWORD" + mc mb --ignore-existing "minio/$S3_BUCKET" + fi curl -sf -X POST -H 'Content-Type: application/json' \ -d '{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}' \ "$LAKEKEEPER_BASE/management/v1/project" || true @@ -763,49 +848,6 @@ jobs: disable_search: true fail_ci_if_error: false - pyamber-state-materialization-mac: - # Diagnostic leg: cross-region state materialization is reported to - # fail on macOS while working on Windows / Linux. The main `pyamber` - # job above runs only on ubuntu-latest because it depends on a - # postgres service container (service containers don't work on - # macOS runners). The state-materialization integration tests use - # an in-process sqlite-backed SqlCatalog instead, so we can run - # them on macOS without postgres infra. If they fail here but pass - # in the main `pyamber` job, we've reproduced the macOS-specific - # regression in CI. - if: ${{ inputs.run_pyamber }} - runs-on: macos-latest - steps: - - name: Checkout Texera - uses: actions/checkout@v5 - with: - ref: ${{ inputs.checkout_ref || github.sha }} - fetch-depth: 0 - - name: Prepare backport workspace - if: ${{ inputs.backport_target_branch != '' }} - run: bash ./.github/scripts/prepare-backport-checkout.sh "${{ inputs.backport_target_branch }}" "${{ inputs.backport_commit_range }}" - - name: Set up Python 3.12 - uses: actions/setup-python@v6 - with: - python-version: "3.12" - - name: Install dependencies - run: | - python -m pip install uv - if [ -f amber/requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/requirements.txt; fi - if [ -f amber/operator-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; fi - if [ -f amber/dev-requirements.txt ]; then uv pip install --system -r amber/dev-requirements.txt; fi - - name: Install protoc - # Homebrew protoc; this job doesn't exercise scalapb so the - # bin/protoc-version.txt pin doesn't apply here. - run: brew install protobuf - - name: Generate Python proto bindings - run: bash bin/python-proto-gen.sh - - name: Run state-materialization integration tests - run: | - cd amber && pytest -sv \ - src/test/python/core/architecture/packaging/test_state_materialization_e2e.py \ - src/test/python/core/runnables/test_main_loop.py::TestMainLoop::test_process_start_channel_persists_produce_state_on_start_output - agent-service: if: ${{ inputs.run_agent_service }} name: ${{ format('agent-service{0} ({1})', inputs.job_name_suffix, matrix.os) }} diff --git a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py index cfc4f7f676..8613be95b1 100644 --- a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py +++ b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py @@ -25,7 +25,7 @@ This test wires: OutputManager.set_up_port_storage_writer(port, base_uri) → real PortStorageWriter thread - → real IcebergTableWriter (sqlite-backed SqlCatalog) + → real IcebergTableWriter (postgres-backed JdbcCatalog) → state document at VFSURIFactory.state_uri(base_uri) → InputPortMaterializationReaderRunnable.run() → DataElement(StateFrame) on the consumer's input queue @@ -33,14 +33,20 @@ This test wires: and asserts that a state put through `save_state_to_storage_if_needed` on the producer side actually arrives at the consumer's queue, with the same payload. + +Marked @integration so the CI runner that has postgres + iceberg +catalog DB provisioned (amber-integration) picks it up via +`pytest -m integration`. Earlier versions of this test substituted a +sqlite-backed SqlCatalog to dodge that infra dependency; that diverged +from the prod catalog code path, so we now exercise the real one. """ +import os import tempfile import threading import uuid import pytest -from pyiceberg.catalog.sql import SqlCatalog from core.architecture.packaging.output_manager import OutputManager from core.models import State, StateFrame @@ -68,192 +74,203 @@ from proto.org.apache.texera.amber.engine.architecture.sendsemantics import ( ) -# Module-level scratch dir for the sqlite catalog + iceberg warehouse. -# We don't initialize `StorageConfig` here: other test modules (e.g. -# test_iceberg_document.py) also call `StorageConfig.initialize` at -# import time, and the class rejects re-initialization with -# RuntimeError. Whichever module gets collected first wins; we adopt -# its namespaces below. -_WAREHOUSE_DIR = tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-") [email protected] +class TestStateMaterializationE2E: + @pytest.fixture(autouse=True, scope="class") + def _init_storage_config(self): + """Initialize StorageConfig + IcebergCatalogInstance for the real + postgres-backed catalog in the `amber-integration` CI job. + + Critical detail: the Scala integration tests that run earlier in + the same job connect to the iceberg catalog DB as user + `postgres/postgres` (the storage.conf default for + `STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME/PASSWORD`). pyiceberg + creates the catalog's `iceberg_tables` metadata table on first + use, owned by whoever wrote first — so it ends up owned by + `postgres`. We MUST connect as the same user, otherwise we hit + `permission denied for table iceberg_tables`. + Why the reset: `test_iceberg_document.py` also calls + `StorageConfig.initialize` at module import time (with a + different `texera/password` user that works for it because no + Scala writes first in the `pyamber` job where it runs). pytest + imports every test module during collection, even ones whose + tests will be deselected by `-m integration`, so that + initialization happens here too. We force-reset the singletons + and re-init with the prod-correct credentials; safe because + test_iceberg_document's tests are deselected from this run. [email protected](scope="module", autouse=True) -def sqlite_iceberg_catalog(): - """Inject a sqlite-backed SqlCatalog so the test runs without external - iceberg infra (postgres/minio). + All catalog + S3 settings read the same `STORAGE_*` env vars + the production code consumes (via storage.conf), so the test + matches whichever identity the Scala side uses in the same job + and stays aligned with the bucket / endpoint the workflow + provisions. Defaults mirror storage.conf so a local sbt run + without those vars exported still works. - Module-scoped so all tests in this file share one warehouse, and so - namespace creation only happens once. We save/restore the original - `IcebergCatalogInstance` singleton so other test modules that expect - a real postgres-backed catalog (e.g. test_iceberg_document.py) are - not affected by our replacement. - """ - # Some other test module may have initialized StorageConfig already - # (it has a single-init lock). If nothing has initialized it yet, - # do it here with arbitrary values -- we replace the catalog - # instance below so the postgres/rest fields are never exercised. - if not StorageConfig._initialized: + Class-scoped so the reset + tempdir allocation happens once + per class; the two tests in this class share state through the + same StorageConfig singleton anyway. + """ + StorageConfig._initialized = False + IcebergCatalogInstance._instance = None + large_binaries_bucket = os.environ.get( + "STORAGE_S3_LARGE_BINARIES_BUCKET", "texera-large-binaries" + ) StorageConfig.initialize( catalog_type="postgres", - postgres_uri_without_scheme="unused", - postgres_username="unused", - postgres_password="unused", - rest_catalog_uri="unused", - rest_catalog_warehouse_name="unused", + postgres_uri_without_scheme=os.environ.get( + "STORAGE_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME", + "localhost:5432/texera_iceberg_catalog", + ), + postgres_username=os.environ.get( + "STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME", "postgres" + ), + postgres_password=os.environ.get( + "STORAGE_ICEBERG_CATALOG_POSTGRES_PASSWORD", "postgres" + ), + rest_catalog_uri="http://localhost:8181/catalog/", + rest_catalog_warehouse_name="texera", table_result_namespace="operator-port-result", table_state_namespace="operator-port-state", - directory_path=_WAREHOUSE_DIR, + directory_path=tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-"), commit_batch_size=4096, - s3_endpoint="unused", - s3_region="unused", - s3_auth_username="unused", - s3_auth_password="unused", - s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/", + s3_endpoint=os.environ.get("STORAGE_S3_ENDPOINT", "http://localhost:9000"), + s3_region=os.environ.get("STORAGE_S3_REGION", "us-west-2"), + s3_auth_username=os.environ.get("STORAGE_S3_AUTH_USERNAME", "texera_minio"), + s3_auth_password=os.environ.get("STORAGE_S3_AUTH_PASSWORD", "password"), + s3_large_binaries_base_uri=f"s3://{large_binaries_bucket}/objects/0/", ) - original_instance = IcebergCatalogInstance._instance - db_path = f"{_WAREHOUSE_DIR}/catalog.sqlite" - catalog = SqlCatalog( - "texera_iceberg_e2e", - **{ - "uri": f"sqlite:///{db_path}", - "warehouse": f"file://{_WAREHOUSE_DIR}", - }, - ) - # Adopt whatever namespaces StorageConfig already has -- those are - # the ones DocumentFactory will route into. - catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE) - catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE) - IcebergCatalogInstance.replace_instance(catalog) - try: - yield catalog - finally: - IcebergCatalogInstance.replace_instance(original_instance) - - -def _fresh_base_uri() -> str: - """A unique port-base URI per test so tables don't collide.""" - return VFSURIFactory.create_port_base_uri( - WorkflowIdentity(id=0), - ExecutionIdentity(id=0), - GlobalPortIdentity( - op_id=PhysicalOpIdentity( - logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"), - layer_name="main", + @pytest.fixture + def base_uri(self) -> str: + """A unique port-base URI per test so tables don't collide.""" + return VFSURIFactory.create_port_base_uri( + WorkflowIdentity(id=0), + ExecutionIdentity(id=0), + GlobalPortIdentity( + op_id=PhysicalOpIdentity( + logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"), + layer_name="main", + ), + port_id=PortIdentity(id=0, internal=False), + input=False, ), - port_id=PortIdentity(id=0, internal=False), - input=False, - ), - ) - - -def test_state_written_by_output_manager_is_replayed_by_reader(): - """Producer side writes a state via OutputManager; consumer side reads - it via InputPortMaterializationReaderRunnable. The state must arrive - on the consumer's input queue intact. - """ - base_uri = _fresh_base_uri() - port_id = PortIdentity(id=0, internal=False) - worker_schema_for_result = State.SCHEMA # producer-side: only state matters - - # 1. RegionExecutionCoordinator's responsibility: provision result + - # state documents at the port base URI before any worker starts. - # We emulate that here. - DocumentFactory.create_document( - VFSURIFactory.result_uri(base_uri), worker_schema_for_result - ) - DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), State.SCHEMA) - - # 2. Producer side: spin up an OutputManager, set up real state + - # result writer threads against the iceberg storage. - producer = OutputManager(worker_id="Worker:WF0-test-producer-main-0") - producer.add_output_port( - port_id, schema=worker_schema_for_result, storage_uri_base=base_uri - ) - - # 3. Drive a state through the producer-side path. - state = State({"flag": True, "loop_counter": 7, "name": "outer"}) - producer.save_state_to_storage_if_needed(state) - - # 4. Force the writer threads to flush + commit by closing them. - # Without this, the iceberg buffer holds the state in memory and - # nothing is durable yet. - producer.close_port_storage_writers() + ) - # 5. Consumer side: spin up the materialization reader against the - # same base URI. Each reader needs a partitioning even when no real - # downstream worker exists -- supply a OneToOnePartitioning whose - # only receiver is the consumer worker itself. - consumer_worker = ActorVirtualIdentity(name="consumer-worker-0") - consumer_queue = InternalQueue() - partitioning = Partitioning( - one_to_one_partitioning=OneToOnePartitioning( - batch_size=400, - channels=[ - ChannelIdentity( - from_worker_id=ActorVirtualIdentity(name="producer-worker-0"), - to_worker_id=consumer_worker, - is_control=False, - ) - ], + @pytest.fixture + def producer(self, base_uri): + """An OutputManager wired to the iceberg result + state documents + at `base_uri`. Closes its writer threads on teardown so cached + buffers are flushed even if a test errors out before + `close_port_storage_writers()`. + """ + # RegionExecutionCoordinator's responsibility in prod: provision + # result + state documents at the port base URI before any + # worker starts. We emulate that here. + DocumentFactory.create_document( + VFSURIFactory.result_uri(base_uri), State.SCHEMA ) - ) - reader = InputPortMaterializationReaderRunnable( - uri=base_uri, - queue=consumer_queue, - worker_actor_id=consumer_worker, - partitioning=partitioning, - ) + DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), State.SCHEMA) - # Run the reader on a worker thread so we can time out cleanly if - # something goes wrong. - reader_thread = threading.Thread(target=reader.run, daemon=True) - reader_thread.start() - reader_thread.join(timeout=30) - assert not reader_thread.is_alive(), "reader did not finish within timeout" - assert reader.finished(), "reader exited but did not mark itself finished" + mgr = OutputManager(worker_id="Worker:WF0-test-producer-main-0") + mgr.add_output_port( + PortIdentity(id=0, internal=False), + schema=State.SCHEMA, + storage_uri_base=base_uri, + ) + try: + yield mgr + finally: + # close_port_storage_writers is idempotent — fine to call + # again here if the test already closed. + try: + mgr.close_port_storage_writers() + except Exception: + pass - # 6. Drain the consumer's queue and find the StateFrame(s). - state_frames: list[State] = [] - while not consumer_queue.is_empty(): - elem = consumer_queue.get() - if isinstance(elem, DataElement) and isinstance(elem.payload, StateFrame): - state_frames.append(elem.payload.frame) + def test_state_written_by_output_manager_is_replayed_by_reader( + self, base_uri, producer + ): + """Producer side writes a state via OutputManager; consumer side + reads it via InputPortMaterializationReaderRunnable. The state + must arrive on the consumer's input queue intact. + """ + # Drive a state through the producer-side path. + state = State({"flag": True, "loop_counter": 7, "name": "outer"}) + producer.save_state_to_storage_if_needed(state) - assert len(state_frames) == 1, ( - f"expected exactly one State to flow through writer→iceberg→reader; " - f"got {len(state_frames)}: {state_frames}" - ) - assert state_frames[0] == state, ( - f"replayed state did not match what was written; " - f"wrote={state}, read={state_frames[0]}" - ) + # Force the writer threads to flush + commit by closing them. + # Without this, the iceberg buffer holds the state in memory + # and nothing is durable yet. + producer.close_port_storage_writers() + # Consumer side: spin up the materialization reader against the + # same base URI. Each reader needs a partitioning even when no + # real downstream worker exists — supply a OneToOnePartitioning + # whose only receiver is the consumer worker itself. + consumer_worker = ActorVirtualIdentity(name="consumer-worker-0") + consumer_queue = InternalQueue() + partitioning = Partitioning( + one_to_one_partitioning=OneToOnePartitioning( + batch_size=400, + channels=[ + ChannelIdentity( + from_worker_id=ActorVirtualIdentity(name="producer-worker-0"), + to_worker_id=consumer_worker, + is_control=False, + ) + ], + ) + ) + reader = InputPortMaterializationReaderRunnable( + uri=base_uri, + queue=consumer_queue, + worker_actor_id=consumer_worker, + partitioning=partitioning, + ) -def test_state_table_persists_across_writer_close(): - """Independently verify the iceberg state table contains the row. - If this passes but the reader test above fails, the bug is in the - reader / consumer wiring; if this fails, the bug is in the writer / - storage layer. - """ - base_uri = _fresh_base_uri() - port_id = PortIdentity(id=0, internal=False) + # Run the reader on a worker thread so we can time out cleanly + # if something goes wrong. + reader_thread = threading.Thread(target=reader.run, daemon=True) + reader_thread.start() + reader_thread.join(timeout=30) + assert not reader_thread.is_alive(), "reader did not finish within timeout" + assert reader.finished(), "reader exited but did not mark itself finished" - DocumentFactory.create_document(VFSURIFactory.result_uri(base_uri), State.SCHEMA) - DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), State.SCHEMA) + # Drain the consumer's queue and find the StateFrame(s). + state_frames: list[State] = [] + while not consumer_queue.is_empty(): + elem = consumer_queue.get() + if isinstance(elem, DataElement) and isinstance(elem.payload, StateFrame): + state_frames.append(elem.payload.frame) - producer = OutputManager(worker_id="Worker:WF0-test-producer2-main-0") - producer.add_output_port(port_id, schema=State.SCHEMA, storage_uri_base=base_uri) + assert len(state_frames) == 1, ( + f"expected exactly one State to flow through writer→iceberg→reader; " + f"got {len(state_frames)}: {state_frames}" + ) + assert state_frames[0] == state, ( + f"replayed state did not match what was written; " + f"wrote={state}, read={state_frames[0]}" + ) - state = State({"flag": False, "checkpoint": 42}) - producer.save_state_to_storage_if_needed(state) - producer.close_port_storage_writers() + def test_state_table_persists_across_writer_close(self, base_uri, producer): + """Independently verify the iceberg state table contains the row. + If this passes but the reader test above fails, the bug is in + the reader / consumer wiring; if this fails, the bug is in the + writer / storage layer. + """ + state = State({"flag": False, "checkpoint": 42}) + producer.save_state_to_storage_if_needed(state) + producer.close_port_storage_writers() - # Read directly from the iceberg state document, bypassing the reader. - state_document, _ = DocumentFactory.open_document(VFSURIFactory.state_uri(base_uri)) - rows = list(state_document.get()) - assert len(rows) == 1, ( - f"expected exactly one row in the iceberg state table after the " - f"writer was closed; got {len(rows)} rows" - ) - assert State.from_tuple(rows[0]) == state + # Read directly from the iceberg state document, bypassing the + # reader. + state_document, _ = DocumentFactory.open_document( + VFSURIFactory.state_uri(base_uri) + ) + rows = list(state_document.get()) + assert len(rows) == 1, ( + f"expected exactly one row in the iceberg state table after " + f"the writer was closed; got {len(rows)} rows" + ) + assert State.from_tuple(rows[0]) == state
