asf-tooling commented on issue #1031:
URL: 
https://github.com/apache/tooling-trusted-releases/issues/1031#issuecomment-4407503967

   <!-- gofannon-issue-triage-bot v2 -->
   
   **Automated triage** — analyzed at `main@751c2146`
   
   **Type:** `discussion`  •  **Classification:** `no_action`  •  
**Confidence:** `high`
   **Application domain(s):** `infrastructure`
   
   ### Summary
   This issue proposes moving the generic PubSub client code from 
`atr/pubsub.py` to the ASFQuart shared framework library (a separate 
repository). The code was originally copied from `asfpy`. @dave2wave pointed 
@sbp to https://github.com/apache/tooling-agenda for cross-repo coordination. 
Since this involves moving code to an external repository 
(infrastructure-asfquart), it cannot be resolved with a patch to this repo 
alone and requires architectural coordination.
   
   ### Where this lives in the code today
   
   #### `atr/pubsub.py` — `listen` (lines 63-113)
   _currently does this_
   The generic PubSub listener function copied from asfpy - this is the 
candidate code for moving to ASFQuart.
   
   ```python
   async def listen(
       pubsub_url: str,
       username: str | None = None,
       password: str | None = None,
       sock_read: float | None = None,
       buffersize: int | None = None,
   ) -> AsyncGenerator[Any | None]:
       if username:
           if password is None:
               raise ValueError("PubSub password is required")
           auth = aiohttp.BasicAuth(username, password)
       else:
           auth = None
   
       if sock_read is None:
           sock_read = _DEFAULT_INACTIVITY_TIMEOUT
       ct = aiohttp.ClientTimeout(sock_read=sock_read)
   
       if buffersize is None:
           buffersize = _DEFAULT_READ_BUFFER_SIZE
   
       async with aiohttp.ClientSession(auth=auth, timeout=ct, 
read_bufsize=buffersize) as session:
           # Retry immediately, and then back it off.
           delay = 0.0
   
           ### tbd: look at event loop, to see if it has been halted
           while True:
               log.debug("Opening new connection...")
               try:
                   async for payload in _process_connection(session, 
pubsub_url):
                       if not payload:
                           ### tbd?: event loop killed or hit EOF
                           pass
   
                       # We got a payload, so reset the DELAY.
                       delay = 0.0
   
                       yield payload
   
               except (
                   ConnectionRefusedError,
                   aiohttp.ClientConnectorError,
                   aiohttp.ServerTimeoutError,
                   aiohttp.ClientPayloadError,
               ) as e:
                   log.error(f"Connection failed ({type(e).__name__}: {e}), 
reconnecting in {delay} seconds")
                   await asyncio.sleep(delay)
   
                   # Back off on the delay. Step it up from 0s, doubling each
                   # time, and top out at 30s retry. Steps: 0, 2, 6, 14, 30.
                   delay = min(30.0, (delay + 1.0) * 2)
   ```
   
   #### `atr/pubsub.py` — `PubSubListener` (lines 144-161)
   _currently does this_
   ATR-specific PubSub listener that routes payloads to SVN commit and LDAP 
handlers - this would likely remain in ATR even if the generic listen() moves 
to ASFQuart.
   
   ```python
   class PubSubListener:
       def __init__(
           self,
           svn_working_copy_root: os.PathLike | str,
           url: str,
           username: str,
           password: str,
           topics: str = "commit/svn,ldap",
       ) -> None:
           self.svn_working_copy_root = pathlib.Path(svn_working_copy_root)
           self.url = url
           self.username = username
           self.password = password
           self.topics = topics
   
       async def start(self) -> None:
           """Run forever, processing PubSub payloads as they arrive."""
           ...
   ```
   
   #### `atr/server.py` — `_initialise_pubsub` (lines 761-778)
   _currently does this_
   Server startup code that creates and starts the PubSubListener - shows how 
the PubSub code integrates into the ATR application lifecycle.
   
   ```python
   async def _initialise_pubsub(conf: type[config.AppConfig], app: 
base.QuartApp):
       pubsub_url = conf.PUBSUB_URL
       pubsub_user = conf.PUBSUB_USER
       pubsub_password = conf.PUBSUB_PASSWORD
       parsed_pubsub_url = urllib.parse.urlparse(pubsub_url) if pubsub_url else 
None
       valid_pubsub_url = bool(parsed_pubsub_url and parsed_pubsub_url.scheme 
and parsed_pubsub_url.netloc)
   
       if valid_pubsub_url and pubsub_url and pubsub_user and pubsub_password:
           log.info("Starting PubSub listener")
           listener = pubsub.PubSubListener(
               svn_working_copy_root=conf.SVN_STORAGE_DIR,
               url=pubsub_url,
               username=pubsub_user,
               password=pubsub_password,
           )
           task = asyncio.create_task(listener.start())
           app.extensions["pubsub_listener"] = task
           log.info("PubSub listener task created")
   ```
   
   ### Proposed approach
   This issue requires cross-repository coordination. The generic PubSub client 
code (`listen()`, `_process_connection()`, `is_ldap_payload()`, 
`is_commit_payload()`, and related constants) in `atr/pubsub.py` would need to 
be contributed to the ASFQuart repository 
(github.com/apache/infrastructure-asfquart). Once available there, ATR's 
`atr/pubsub.py` would be refactored to import the generic `listen()` from 
ASFQuart and retain only the ATR-specific `PubSubListener` class that routes 
payloads to SVN commit and LDAP handlers.
   
   As @dave2wave noted, this should be tracked via 
https://github.com/apache/tooling-agenda for cross-team coordination. No patch 
within this repository alone can resolve this issue - it requires first adding 
the code to ASFQuart, then updating ATR to consume it.
   
   ### Open questions
   - Has this been discussed on https://github.com/apache/tooling-agenda as 
@dave2wave suggested?
   - Would the ASFQuart maintainers accept this code, or would they prefer it 
stay in consuming applications?
   - Should the generic listen() function also be added to QuartApp (e.g. via 
add_runner) for lifecycle management, or remain a standalone async generator?
   - Is there other asfpy code that was copied into ATR that should also move 
to ASFQuart at the same time?
   
   _The agent reviewed this issue and is not proposing patches in this run. 
Review the existing-code citations and open questions above before deciding 
next steps._
   
   ### Files examined
   - `atr/pubsub.py`
   - `atr/docs/asfquart-usage.md`
   - `typestubs/asfquart/__init__.pyi`
   - `atr/server.py`
   - `typestubs/asfquart/base.pyi`
   - `atr/blueprints/common.py`
   - `atr/blueprints/get.py`
   - `atr/blueprints/post.py`
   
   ---
   *Draft from a triage agent. A human reviewer should validate before merging 
any change. The agent did not run tests or verify diffs apply.*


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to