This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9e3a9ec222dd2e1c61d2862b82d6122f1ac0e49f Author: Jakub Novák <[email protected]> AuthorDate: Tue May 10 22:43:25 2022 +0200 Fix: Exception when parsing log #20966 (#23301) * UnicodeDecodeError: 'utf-8' codec can't decode byte 0xXX in position X: invalid start byte File "/opt/work/python395/lib/python3.9/site-packages/airflow/hooks/subprocess.py", line 89, in run_command line = raw_line.decode(output_encoding).rstrip() # raw_line == b'\x00\x00\x00\x11\xa9\x01\n' UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 4: invalid start byte * Update subprocess.py * Update subprocess.py * Fix: Exception when parsing log #20966 * Fix: Exception when parsing log #20966 Another alternative is: try-catch it. e.g. ``` line = '' for raw_line in iter(self.sub_process.stdout.readline, b''): try: line = raw_line.decode(output_encoding).rstrip() except UnicodeDecodeError as err: print(err, output_encoding, raw_line) self.log.info("%s", line) ``` * Create test_subprocess.sh * Update test_subprocess.py * Added shell directive and license to test_subprocess.sh * Distinguish between raw and decoded lines as suggested by @uranusjr * simplify test Co-authored-by: muhua <[email protected]> (cherry picked from commit 863b2576423e1a7933750b297a9b4518ae598db9) --- airflow/hooks/subprocess.py | 2 +- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 5 +++-- tests/hooks/test_subprocess.py | 6 ++++++ 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py index c814b0528d..fa8c706c69 100644 --- a/airflow/hooks/subprocess.py +++ b/airflow/hooks/subprocess.py @@ -88,7 +88,7 @@ class SubprocessHook(BaseHook): raise RuntimeError("The subprocess should be created here and is None!") if self.sub_process.stdout is not None: for raw_line in iter(self.sub_process.stdout.readline, b''): - line = raw_line.decode(output_encoding).rstrip() + line = raw_line.decode(output_encoding, errors='backslashreplace').rstrip() self.log.info("%s", line) self.sub_process.wait() diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 993ba12e31..4d6fbaa4c0 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -220,8 +220,9 @@ class PodManager(LoggingMixin): ), follow=follow, ) - for line in logs: - timestamp, message = self.parse_log_line(line.decode('utf-8')) + for raw_line in logs: + line = raw_line.decode('utf-8', errors="backslashreplace") + timestamp, message = self.parse_log_line(line) self.log.info(message) except BaseHTTPError as e: self.log.warning( diff --git a/tests/hooks/test_subprocess.py b/tests/hooks/test_subprocess.py index 642b219c01..10d9b00970 100644 --- a/tests/hooks/test_subprocess.py +++ b/tests/hooks/test_subprocess.py @@ -96,3 +96,9 @@ class TestSubprocessHook(unittest.TestCase): stderr=STDOUT, stdout=PIPE, ) + + def test_task_decode(self): + hook = SubprocessHook() + command = ['bash', '-c', 'printf "This will cause a coding error \\xb1\\xa6\\x01\n"'] + result = hook.run_command(command=command) + assert result.exit_code == 0
