asf-tooling commented on issue #944:
URL:
https://github.com/apache/tooling-trusted-releases/issues/944#issuecomment-4409947018
<!-- gofannon-issue-triage-bot v2 -->
**Automated triage** — analyzed at `main@2da7807a`
**Type:** `discussion` • **Classification:** `actionable` •
**Confidence:** `high`
**Application domain(s):** `authentication_authorization`,
`shared_infrastructure`
### Summary
This issue tracks LDAP state management inconsistencies across operating
modes (Production, Debug, Test). Several items from the prior discussion have
already been implemented: @dave2wave's suggestion to replace ALLOW_TESTS with
Mode.Test is done (config.py Mode enum, is_test_mode()), @alitheg's suggestion
for is_ldap_configured() exists, and production startup validation exists in
config.validate(). However, key inconsistencies remain: is_active() still fails
open when LDAP is unconfigured, fetch_admin_users/fetch_tooling_users degrade
inconsistently, the session cache file has no TTL, SSH fails closed while web
fails open, and the test user has unrestricted admin access. @dave2wave's last
comment asks what work remains.
### Where this lives in the code today
#### `atr/ldap.py` — `is_active` (lines 287-300)
_needs modification_
Returns True (fail-open) when LDAP credentials are not configured, allowing
any UID to be treated as active including banned accounts in non-test
environments.
```python
async def is_active(asf_uid: str) -> bool:
import atr.config as config
if config.is_test_mode():
if asf_uid == "test":
return True
if asf_uid == "test-banned":
return False
if get_bind_credentials() is None:
return True
account = await account_lookup(asf_uid)
if account is None:
return False
return not is_banned(account)
```
#### `atr/ldap.py` — `fetch_admin_users` (lines 173-200)
_needs modification_
Returns frozenset() (fail-closed) when LDAP unavailable, inconsistent with
fetch_tooling_users which returns the 'extra' set.
```python
async def fetch_admin_users() -> frozenset[str]:
import atr.log as log
credentials = get_bind_credentials()
if credentials is None:
log.warning("LDAP bind DN or password not configured, returning
empty admin set")
return frozenset()
bind_dn, bind_password = credentials
def _query_ldap() -> frozenset[str]:
users: set[str] = set()
with Search(bind_dn, bind_password) as ldap_search:
for base in (LDAP_ROOT_BASE, LDAP_TOOLING_BASE):
try:
result = ldap_search.search(ldap_base=base,
ldap_scope="BASE")
if (not result) or (len(result) != 1):
continue
for member_dn in result[0].member:
parsed = parse_dn(member_dn)
uids = parsed.get("uid", [])
if uids:
users.add(uids[0])
except Exception as e:
log.warning(f"Failed to query LDAP group {base}: {e}")
return frozenset(users)
return await asyncio.to_thread(_query_ldap)
```
#### `atr/ldap.py` — `fetch_tooling_users` (lines 203-231)
_needs modification_
Returns 'extra' users (partially open) when LDAP unavailable, inconsistent
with fetch_admin_users which fails closed.
```python
async def fetch_tooling_users(extra: set[str]) -> set[str]:
import atr.log as log
credentials = get_bind_credentials()
if credentials is None:
log.warning("LDAP bind DN or password not configured, returning
extra tooling users only")
return extra
bind_dn, bind_password = credentials
def _query_ldap() -> set[str]:
users: set[str] = set()
with Search(bind_dn, bind_password) as ldap_search:
for base in (LDAP_TOOLING_BASE,):
try:
result = ldap_search.search(ldap_base=base,
ldap_scope="BASE")
if (not result) or (len(result) != 1):
continue
for member_dn in result[0].member:
parsed = parse_dn(member_dn)
uids = parsed.get("uid", [])
if uids:
users.add(uids[0])
except Exception as e:
log.warning(f"Failed to query LDAP group {base}: {e}")
return users
tooling = await asyncio.to_thread(_query_ldap)
return tooling | extra
```
#### `atr/principal.py` — `AuthoriserLDAP.cache_refresh` (lines 277-302)
_needs modification_
The session cache file fallback in Debug mode has no TTL, signature, or
integrity check. If the file contains stale data, ATR won't detect changes.
```python
async def cache_refresh(self, asf_uid: str) -> None:
if not self.__cache.outdated(asf_uid):
return
if config.is_test_mode() and (asf_uid == "test"):
# The test user does not exist in LDAP, so we hardcode their data
committees = frozenset({"test"})
projects = frozenset({"test"})
self.__cache.member_of[asf_uid] = committees
self.__cache.participant_of[asf_uid] = projects
self.__cache.last_refreshed[asf_uid] = int(time.time())
return
if config.get_mode() == config.Mode.Debug:
session_cache = await util.session_cache_read()
if asf_uid in session_cache:
cached_session = session_cache[asf_uid]
committees = frozenset(cached_session.get("pmcs", []))
projects = frozenset(cached_session.get("projects", []))
committees, projects = _augment_test_membership(committees,
projects)
self.__cache.member_of[asf_uid] = committees
self.__cache.participant_of[asf_uid] = projects
self.__cache.last_refreshed[asf_uid] = int(time.time())
log.info(f"Loaded session data for {asf_uid} from session
cache file")
return
```
#### `atr/user.py` — `is_admin` (lines 50-60)
_needs modification_
The test user gets full global admin privileges, not scoped to the test
project. This is a privilege escalation risk if test mode is accidentally
enabled.
```python
def is_admin(user_id: str | None) -> bool:
if user_id is None:
return False
if config.is_test_mode() and (user_id == "test"):
return True
# TODO: is_user_session_downgraded only works in a Quart async context
if util.is_user_session_downgraded():
return False
if user_id in _get_additional_admin_users():
return True
return user_id in cache.admins_get()
```
#### `atr/principal.py` — `_augment_test_membership` (lines 404-411)
_needs modification_
Adds test committee/project membership to ALL authenticated users in test
mode, not just the test user. Intentional but undocumented.
```python
def _augment_test_membership(
committees: frozenset[str],
projects: frozenset[str],
) -> tuple[frozenset[str], frozenset[str]]:
if config.is_test_mode():
committees = committees.union({"test"})
projects = projects.union({"test"})
return committees, projects
```
#### `atr/config.py` — `Mode` (lines 146-150)
_currently does this_
Mode enum already exists as @dave2wave proposed - ALLOW_TESTS has been
replaced by Mode.Test.
```python
class Mode(enum.Enum):
Debug = "Debug"
Test = "Test"
Production = "Production"
Profiling = "Profiling"
```
#### `atr/config.py` — `is_ldap_configured` (lines 217-219)
_currently does this_
Already exists as @alitheg suggested - explicit gate to check LDAP
configuration.
```python
def is_ldap_configured() -> bool:
conf = get()
return bool(conf.LDAP_BIND_DN and conf.LDAP_BIND_PASSWORD)
```
#### `atr/config.py` — `validate` (lines 230-242)
_currently does this_
Production startup safety checks already exist as @alitheg proposed. LDAP
must be configured, no additional admins allowed, dev URLs rejected.
```python
def validate() -> None:
...
if (not is_dev_environment()) and (get_mode() == Mode.Debug):
raise RuntimeError("Debug mode can only be set in development
environment")
# Production-specific guards
if is_production_mode():
if not (conf.LDAP_BIND_DN and conf.LDAP_BIND_PASSWORD):
raise RuntimeError("LDAP bind credentials must be configured in
production mode")
if conf.ADMIN_USERS_ADDITIONAL or conf.TOOLING_USERS_ADDITIONAL:
raise RuntimeError("Cannot manually configure additional users
in production")
if is_dev_environment():
raise RuntimeError("Production mode cannot use a development
APP_HOST")
```
#### `atr/config.py` — `get_mode` (lines 183-206)
_currently does this_
Mode is set once and cached (effectively final as @dave2wave requested).
Mutual exclusion is enforced.
```python
def get_mode() -> Mode:
global _global_mode
profiling = decouple.config("PROFILING", default=False, cast=bool)
production = decouple.config("PRODUCTION", default=False, cast=bool)
test = decouple.config("TESTS", default=False, cast=bool)
# Make sure we don't set more than one - which would fall back into
whichever is first in the next conditional
# This prevents accidental production in test mode, for example
enabled = [name for name, val in [("PROFILING", profiling),
("PRODUCTION", production), ("TESTS", test)] if val]
if len(enabled) > 1:
exit(f"Only one mode flag may be set, but got: {', '.join(enabled)}")
if _global_mode is None:
if profiling:
_global_mode = Mode.Profiling
elif production:
_global_mode = Mode.Production
elif test:
_global_mode = Mode.Test
else:
_global_mode = Mode.Debug
return _global_mode
```
#### `atr/get/test.py` — `test_login` (lines 46-64)
_currently does this_
Test login endpoint now gated by config.is_test_mode() rather than
ALLOW_TESTS directly. Creates session with no LDAP verification.
```python
@get.typed
async def test_login(_session: web.Public, _test_login:
Literal["test/login"]) -> web.WerkzeugResponse:
"""
URL: /test/login
"""
# Some test routes work anywhere outside of production
# but test logins should be Test mode only
if not config.is_test_mode():
return quart.abort(404)
await util.write_session(
sql.UserSession(
uid="test",
fullname="Test User",
committees=["test"],
projects=["test"],
)
)
return await web.redirect(root.index)
```
### Proposed approach
Several items from the discussion are already implemented: Mode.Test
replaces ALLOW_TESTS conceptually, is_ldap_configured() exists as an explicit
gate, _global_mode is effectively final, and production startup validation
catches misconfigurations. The remaining work items form a checklist of
security improvements:
1. **Make is_active() use is_ldap_configured() as a gate** — in non-test,
non-debug modes, if LDAP is not configured, this should either raise or return
False (fail-closed), not silently return True. The production validate() check
ensures this can't happen in prod, but the defense-in-depth argument says
is_active() should not silently succeed.
2. **Unify degradation semantics** — fetch_admin_users() and
fetch_tooling_users() should degrade consistently (both fail-closed when LDAP
unavailable).
3. **Add TTL to session cache file** — The file-based session cache in
AuthoriserLDAP.cache_refresh() should either be removed or given a
timestamp/TTL to prevent stale data from persisting indefinitely.
4. **Document the test user's scope** — The test user being a full admin is
documented in authorization-security.md but the team should decide whether to
scope it.
5. **Reconcile SSH vs web failure modes** — Both should fail the same way
(ideally closed) when LDAP is unavailable.
Since @dave2wave asked 'what work remains?' and no response has been given
yet, this issue should remain open as a tracking issue for these remaining
items. No single patch resolves all points; each is a discrete piece of work
that warrants its own PR.
### Open questions
- Should is_active() return False (fail-closed) when LDAP is unavailable
outside of Debug/Test mode, or should it raise an error?
- Should the test user's admin privileges be scoped to only the test
project, or is full admin required for comprehensive testing?
- Should the session cache file (util.session_cache_read) be given a TTL,
removed entirely, or replaced with a different mechanism?
- Should fetch_tooling_users() return frozenset() instead of 'extra' when
LDAP is unavailable, to match fetch_admin_users() behavior?
- Are there concrete sub-issues or PRs already filed for the remaining
items, or should they be created?
### Files examined
- `atr/principal.py`
- `atr/ldap.py`
- `atr/user.py`
- `atr/get/test.py`
- `atr/docs/authorization-security.md`
- `tests/unit/test_ldap.py`
- `atr/config.py`
- `atr/sessions.py`
---
*Draft from a triage agent. A human reviewer should validate before merging
any change. The agent did not run tests or verify diffs apply.*
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]