potiuk commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r796600521
##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
# specific language governing permissions and limitations
# under the License.
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
from pypsrp.wsman import WSMan
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+ MessageType.DEBUG_RECORD: DEBUG,
+ MessageType.ERROR_RECORD: ERROR,
+ MessageType.VERBOSE_RECORD: INFO,
+ MessageType.WARNING_RECORD: WARNING,
+}
+
class PSRPHook(BaseHook):
"""
Hook for PowerShell Remoting Protocol execution.
- The hook must be used as a context manager.
+ When used as a context manager, the runspace pool is reused between shell
+ sessions.
+
+ :param psrp_conn_id: Required. The name of the PSRP connection.
+ :type psrp_conn_id: str
+ :param logging: If true (default), log command output and streams during
execution.
+ :type logging: bool
+ :param operation_timeout: Override the default WSMan timeout when polling
the pipeline.
+ :type operation_timeout: float
+ :param runspace_options:
+ Optional dictionary which is passed when creating the runspace pool.
See
+ :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+ available options.
+ :type runspace_options: dict
+ :param wsman_options:
+ Optional dictionary which is passed when creating the `WSMan` client.
See
+ :py:class:`~pypsrp.wsman.WSMan` for a description of the available
options.
+ :type wsman_options: dict
+ :param exchange_keys:
+ If true (default), automatically initiate a session key exchange when
the
+ hook is used as a context manager.
+ :type exchange_keys: bool
+
+ You can provide an alternative `configuration_name` using either
`runspace_options`
+ or by setting this key as the extra fields of your connection.
"""
- _client = None
- _poll_interval = 1
-
- def __init__(self, psrp_conn_id: str):
+ _conn = None
+ _configuration_name = None
+ _wsman_ref: "WeakKeyDictionary[RunspacePool, WSMan]" = WeakKeyDictionary()
+
+ def __init__(
+ self,
+ psrp_conn_id: str,
+ logging: bool = True,
+ operation_timeout: Optional[float] = None,
+ runspace_options: Optional[Dict[str, Any]] = None,
+ wsman_options: Optional[Dict[str, Any]] = None,
+ exchange_keys: bool = True,
+ ):
self.conn_id = psrp_conn_id
+ self._logging = logging
+ self._operation_timeout = operation_timeout
+ self._runspace_options = runspace_options or {}
+ self._wsman_options = wsman_options or {}
+ self._exchange_keys = exchange_keys
def __enter__(self):
- conn = self.get_connection(self.conn_id)
-
- self.log.info("Establishing WinRM connection %s to host: %s",
self.conn_id, conn.host)
- self._client = WSMan(
- conn.host,
- ssl=True,
- auth="ntlm",
- encryption="never",
- username=conn.login,
- password=conn.password,
- cert_validation=False,
- )
- self._client.__enter__()
+ conn = self.get_conn()
+ self._wsman_ref[conn].__enter__()
+ conn.__enter__()
+ if self._exchange_keys:
+ conn.exchange_keys()
+ self._conn = conn
return self
def __exit__(self, exc_type, exc_value, traceback):
try:
- self._client.__exit__(exc_type, exc_value, traceback)
+ self._conn.__exit__(exc_type, exc_value, traceback)
+ self._wsman_ref[self._conn].__exit__(exc_type, exc_value,
traceback)
finally:
- self._client = None
+ del self._conn
Review comment:
I think that's actaully cool to clean-up if we can. If we had this
attention everywhere, I think our memory use during tests would be quite a bit
smaller (this is especially important if you'd hve parameterized tests and big
test_* files. We have sometimes spikes in memory use during our tests that make
Public runners to fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]