This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9e3a9ec222dd2e1c61d2862b82d6122f1ac0e49f
Author: Jakub Novák <[email protected]>
AuthorDate: Tue May 10 22:43:25 2022 +0200

    Fix: Exception when parsing log #20966 (#23301)
    
    * UnicodeDecodeError: 'utf-8' codec can't decode byte 0xXX in position X: 
invalid start byte
    
      File 
"/opt/work/python395/lib/python3.9/site-packages/airflow/hooks/subprocess.py", 
line 89, in run_command
        line = raw_line.decode(output_encoding).rstrip()            # raw_line 
==  b'\x00\x00\x00\x11\xa9\x01\n'
    UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 4: 
invalid start byte
    
    * Update subprocess.py
    
    * Update subprocess.py
    
    * Fix:  Exception when parsing log #20966
    
    * Fix:  Exception when parsing log #20966
    
     Another alternative is: try-catch it.
    
    e.g.
    
    ```
                line = ''
                for raw_line in iter(self.sub_process.stdout.readline, b''):
                    try:
                        line = raw_line.decode(output_encoding).rstrip()
                    except UnicodeDecodeError as err:
                        print(err, output_encoding, raw_line)
                    self.log.info("%s", line)
    ```
    
    * Create test_subprocess.sh
    
    * Update test_subprocess.py
    
    * Added shell directive and license to test_subprocess.sh
    
    * Distinguish between raw and decoded lines as suggested by @uranusjr
    
    * simplify test
    
    Co-authored-by: muhua <[email protected]>
    (cherry picked from commit 863b2576423e1a7933750b297a9b4518ae598db9)
---
 airflow/hooks/subprocess.py                            | 2 +-
 airflow/providers/cncf/kubernetes/utils/pod_manager.py | 5 +++--
 tests/hooks/test_subprocess.py                         | 6 ++++++
 3 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py
index c814b0528d..fa8c706c69 100644
--- a/airflow/hooks/subprocess.py
+++ b/airflow/hooks/subprocess.py
@@ -88,7 +88,7 @@ class SubprocessHook(BaseHook):
                 raise RuntimeError("The subprocess should be created here and 
is None!")
             if self.sub_process.stdout is not None:
                 for raw_line in iter(self.sub_process.stdout.readline, b''):
-                    line = raw_line.decode(output_encoding).rstrip()
+                    line = raw_line.decode(output_encoding, 
errors='backslashreplace').rstrip()
                     self.log.info("%s", line)
 
             self.sub_process.wait()
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 993ba12e31..4d6fbaa4c0 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -220,8 +220,9 @@ class PodManager(LoggingMixin):
                     ),
                     follow=follow,
                 )
-                for line in logs:
-                    timestamp, message = 
self.parse_log_line(line.decode('utf-8'))
+                for raw_line in logs:
+                    line = raw_line.decode('utf-8', errors="backslashreplace")
+                    timestamp, message = self.parse_log_line(line)
                     self.log.info(message)
             except BaseHTTPError as e:
                 self.log.warning(
diff --git a/tests/hooks/test_subprocess.py b/tests/hooks/test_subprocess.py
index 642b219c01..10d9b00970 100644
--- a/tests/hooks/test_subprocess.py
+++ b/tests/hooks/test_subprocess.py
@@ -96,3 +96,9 @@ class TestSubprocessHook(unittest.TestCase):
             stderr=STDOUT,
             stdout=PIPE,
         )
+
+    def test_task_decode(self):
+        hook = SubprocessHook()
+        command = ['bash', '-c', 'printf "This will cause a coding error 
\\xb1\\xa6\\x01\n"']
+        result = hook.run_command(command=command)
+        assert result.exit_code == 0

Reply via email to