dolfinus opened a new pull request, #27442: URL: https://github.com/apache/airflow/pull/27442
<!-- Thank you for contributing! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! In case of an existing issue, reference it using one of the following: closes: #ISSUE related: #ISSUE How to write a good git commit message: http://chris.beams.io/posts/git-commit/ --> If a command running by SSHOperator returns multiline logs, they are logged into task log. But logging format is applied only to the first line. For example, logs printed by command: ```log 2022-10-12 10:26:05,919 [INFO ] ===================================== DBReader starts ===================================== 2022-10-12 10:26:05,919 [INFO ] |Hive| -> |Spark| Reading table to DataFrame using parameters: 2022-10-12 10:26:05,919 [INFO ] table = 'myschema.mytable' 2022-10-12 10:26:05,920 [INFO ] columns = '*' 2022-10-12 10:26:05,920 [INFO ] options = None 2022-10-12 10:26:05,920 [INFO ] |Hive| Checking connection availability... 2022-10-12 10:26:05,923 [INFO ] |Spark| Using connection parameters: 2022-10-12 10:26:05,923 [INFO ] type = Hive 2022-10-12 10:26:07,717 [INFO ] |Hive| Connection is available. 2022-10-12 10:26:07,718 [INFO ] |Hive| Fetching schema of table 'myschema.mytable' 2022-10-12 10:26:11,094 [INFO ] |Hive| Executing SQL query: 2022-10-12 10:26:11,094 [INFO ] SELECT 2022-10-12 10:26:11,094 [INFO ] col1, 2022-10-12 10:26:11,095 [INFO ] col2, 2022-10-12 10:26:11,095 [INFO ] col3 2022-10-12 10:26:11,097 [INFO ] FROM 2022-10-12 10:26:11,097 [INFO ] myschema.mytable 2022-10-12 10:26:11,267 [INFO ] |Spark| DataFrame successfully created from SQL statement 2022-10-12 10:26:11,267 [INFO ] -------------------------------------- DBReader ends -------------------------------------- 2022-10-12 10:26:11,267 [INFO ] ===================================== DBReader starts ===================================== 2022-10-12 10:26:11,267 [INFO ] |Hive| -> |Spark| Reading table to DataFrame using parameters: 2022-10-12 10:26:11,267 [INFO ] table = 'myschema.anothertable' 2022-10-12 10:26:11,268 [INFO ] columns = '*' 2022-10-12 10:26:11,268 [INFO ] where = "version_dt = '2022-09-24' and business_dt > '2022-12-01'" 2022-10-12 10:26:11,268 [INFO ] options = None 2022-10-12 10:26:11,268 [INFO ] |Hive| Checking connection availability... 2022-10-12 10:26:11,271 [INFO ] |Spark| Using connection parameters: 2022-10-12 10:26:11,271 [INFO ] type = Hive 2022-10-12 10:26:11,320 [INFO ] |Hive| Connection is available. 2022-10-12 10:26:11,321 [INFO ] |Hive| Fetching schema of table 'myschema.anothertable' 2022-10-12 10:26:11,419 [INFO ] |Hive| Executing SQL query: 2022-10-12 10:26:11,420 [INFO ] SELECT 2022-10-12 10:26:11,420 [INFO ] number, 2022-10-12 10:26:11,420 [INFO ] version_dt, 2022-10-12 10:26:11,420 [INFO ] business_dt 2022-10-12 10:26:11,420 [INFO ] FROM 2022-10-12 10:26:11,420 [INFO ] myschema.anothertable 2022-10-12 10:26:11,421 [INFO ] WHERE 2022-10-12 10:26:11,421 [INFO ] version_dt = '2022-09-24' and business_dt > '2022-12-01' 2022-10-12 10:26:11,695 [INFO ] |Spark| DataFrame successfully created from SQL statement 2022-10-12 10:26:11,696 [INFO ] -------------------------------------- DBReader ends -------------------------------------- 2022-10-12 10:26:12,146 [INFO ] ===================================== DBWriter starts ===================================== 2022-10-12 10:26:12,147 [INFO ] |Spark| -> |Hive| Writing DataFrame to table using parameters: 2022-10-12 10:26:12,147 [INFO ] table = 'myschema.newtable' 2022-10-12 10:26:12,147 [INFO ] options: 2022-10-12 10:26:12,147 [INFO ] mode = 'overwrite_table' 2022-10-12 10:26:12,147 [INFO ] format = 'orc' 2022-10-12 10:26:12,148 [INFO ] partitionBy = 'business_dt' 2022-10-12 10:26:12,148 [INFO ] DataFrame schema 2022-10-12 10:26:12,155 [INFO ] root 2022-10-12 10:26:12,156 [INFO ] |-- col1: string (nullable = true) 2022-10-12 10:26:12,156 [INFO ] |-- col2: string (nullable = true) 2022-10-12 10:26:12,156 [INFO ] |-- col2: string (nullable = true) 2022-10-12 10:26:12,156 [INFO ] |-- business_dt: date (nullable = true) 2022-10-12 10:26:12,156 [INFO ] |Hive| Checking connection availability... 2022-10-12 10:26:12,156 [INFO ] |Spark| Using connection parameters: 2022-10-12 10:26:12,156 [INFO ] type = Hive 2022-10-12 10:26:12,173 [INFO ] |Hive| Connection is available. 2022-10-12 10:26:12,174 [INFO ] |Hive| Fetching schema of table 'myschema.newtable' 2022-10-12 10:26:12,324 [INFO ] |Hive| Table 'myschema.newtable' already exists 2022-10-12 10:26:12,325 [INFO ] |Hive| Saving data to a table 'myschema.newtable' 2022-10-12 10:33:25,856 [INFO ] |Hive| Table 'myschema.newtable' successfully created 2022-10-12 10:33:25,857 [INFO ] -------------------------------------- DBWriter ends -------------------------------------- ``` and how they are printed to task log in Airflow: ```log [2022-10-12 07:26:05,928] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,919 [INFO ] ===================================== DBReader starts ===================================== 2022-10-12 10:26:05,919 [INFO ] |Hive| -> |Spark| Reading table to DataFrame using parameters: 2022-10-12 10:26:05,919 [INFO ] table = 'myschema.mytable' 2022-10-12 10:26:05,920 [INFO ] columns = '*' 2022-10-12 10:26:05,920 [INFO ] options = None 2022-10-12 10:26:05,920 [INFO ] |Hive| Checking connection availability... [2022-10-12 07:26:05,931] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,923 [INFO ] |Spark| Using connection parameters: 2022-10-12 10:26:05,923 [INFO ] type = Hive [2022-10-12 07:26:07,733] {ssh_hook.py:472} INFO - 2022-10-12 10:26:07,717 [INFO ] |Hive| Connection is available. 2022-10-12 10:26:07,718 [INFO ] |Hive| Fetching schema of table 'myschema.mytable' [2022-10-12 07:26:11,100] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,094 [INFO ] |Hive| Executing SQL query: [2022-10-12 07:26:11,105] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,094 [INFO ] SELECT 2022-10-12 10:26:11,094 [INFO ] col1, 2022-10-12 10:26:11,095 [INFO ] col2, 2022-10-12 10:26:11,095 [INFO ] col3 2022-10-12 10:26:11,097 [INFO ] FROM 2022-10-12 10:26:11,097 [INFO ] myschema.mytable [2022-10-12 07:26:11,270] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,267 [INFO ] |Spark| DataFrame successfully created from SQL statement 2022-10-12 10:26:11,267 [INFO ] -------------------------------------- DBReader ends -------------------------------------- [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,267 [INFO ] ===================================== DBReader starts ===================================== 2022-10-12 10:26:11,267 [INFO ] |Hive| -> |Spark| Reading table to DataFrame using parameters: 2022-10-12 10:26:11,267 [INFO ] table = 'myschema.anothertable' 2022-10-12 10:26:11,268 [INFO ] columns = '*' 2022-10-12 10:26:11,268 [INFO ] where = "version_dt = '2022-09-24' and business_dt > '2022-12-01'" 2022-10-12 10:26:11,268 [INFO ] options = None 2022-10-12 10:26:11,268 [INFO ] |Hive| Checking connection availability... 2022-10-12 10:26:11,271 [INFO ] |Spark| Using connection parameters: 2022-10-12 10:26:11,271 [INFO ] type = Hive [2022-10-12 07:26:11,329] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,320 [INFO ] |Hive| Connection is available. 2022-10-12 10:26:11,321 [INFO ] |Hive| Fetching schema of table 'myschema.anothertable' [2022-10-12 07:26:11,424] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,419 [INFO ] |Hive| Executing SQL query: [2022-10-12 07:26:11,430] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,420 [INFO ] SELECT 2022-10-12 10:26:11,420 [INFO ] number, 2022-10-12 10:26:11,420 [INFO ] version_dt, 2022-10-12 10:26:11,420 [INFO ] business_dt 2022-10-12 10:26:11,420 [INFO ] FROM 2022-10-12 10:26:11,420 [INFO ] myschema.anothertable 2022-10-12 10:26:11,421 [INFO ] WHERE 2022-10-12 10:26:11,421 [INFO ] version_dt = '2022-09-24' and business_dt > '2022-12-01' [2022-10-12 07:26:11,701] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,695 [INFO ] |Spark| DataFrame successfully created from SQL statement 2022-10-12 10:26:11,696 [INFO ] -------------------------------------- DBReader ends -------------------------------------- [2022-10-12 07:26:12,154] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,146 [INFO ] ===================================== DBWriter starts ===================================== 2022-10-12 10:26:12,147 [INFO ] |Spark| -> |Hive| Writing DataFrame to table using parameters: 2022-10-12 10:26:12,147 [INFO ] table = 'myschema.newtable' 2022-10-12 10:26:12,147 [INFO ] options: 2022-10-12 10:26:12,147 [INFO ] mode = 'overwrite_table' 2022-10-12 10:26:12,147 [INFO ] format = 'orc' 2022-10-12 10:26:12,148 [INFO ] partitionBy = 'business_dt' 2022-10-12 10:26:12,148 [INFO ] DataFrame schema [2022-10-12 07:26:12,159] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,155 [INFO ] root [2022-10-12 07:26:12,161] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,156 [INFO ] |-- col1: string (nullable = true) 2022-10-12 10:26:12,156 [INFO ] |-- col2: string (nullable = true) 2022-10-12 10:26:12,156 [INFO ] |-- col2: string (nullable = true) 2022-10-12 10:26:12,156 [INFO ] |-- business_dt: date (nullable = true) 2022-10-12 10:26:12,156 [INFO ] |Hive| Checking connection availability... 2022-10-12 10:26:12,156 [INFO ] |Spark| Using connection parameters: 2022-10-12 10:26:12,156 [INFO ] type = Hive [2022-10-12 07:26:12,179] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,173 [INFO ] |Hive| Connection is available. 2022-10-12 10:26:12,174 [INFO ] |Hive| Fetching schema of table 'myschema.newtable' [2022-10-12 07:26:12,330] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,324 [INFO ] |Hive| Table 'myschema.newtable' already exists 2022-10-12 10:26:12,325 [INFO ] |Hive| Saving data to a table 'myschema.newtable' [2022-10-12 07:33:25,863] {ssh_hook.py:472} INFO - 2022-10-12 10:33:25,856 [INFO ] |Hive| Table 'myschema.newtable' successfully created 2022-10-12 10:33:25,857 [INFO ] -------------------------------------- DBWriter ends -------------------------------------- ``` Indents are messed up, this is really hard to read. Now SSHHook splits multiline logs, and logs every line separately: ```log [2022-10-12 07:26:05,928] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,919 [INFO ] ===================================== DBReader starts ===================================== [2022-10-12 07:26:05,928] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,919 [INFO ] |Hive| -> |Spark| Reading table to DataFrame using parameters: [2022-10-12 07:26:05,928] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,919 [INFO ] table = 'myschema.mytable' [2022-10-12 07:26:05,928] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,920 [INFO ] columns = '*' [2022-10-12 07:26:05,928] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,920 [INFO ] options = None [2022-10-12 07:26:05,928] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,920 [INFO ] |Hive| Checking connection availability... [2022-10-12 07:26:05,931] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,923 [INFO ] |Spark| Using connection parameters: [2022-10-12 07:26:05,931] {ssh_hook.py:472} INFO - 2022-10-12 10:26:05,923 [INFO ] type = Hive [2022-10-12 07:26:07,733] {ssh_hook.py:472} INFO - 2022-10-12 10:26:07,717 [INFO ] |Hive| Connection is available. [2022-10-12 07:26:07,733] {ssh_hook.py:472} INFO - 2022-10-12 10:26:07,718 [INFO ] |Hive| Fetching schema of table 'myschema.mytable' [2022-10-12 07:26:11,100] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,094 [INFO ] |Hive| Executing SQL query: [2022-10-12 07:26:11,105] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,094 [INFO ] SELECT [2022-10-12 07:26:11,105] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,094 [INFO ] col1, [2022-10-12 07:26:11,105] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,095 [INFO ] col2, [2022-10-12 07:26:11,105] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,095 [INFO ] col3 [2022-10-12 07:26:11,105] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,097 [INFO ] FROM [2022-10-12 07:26:11,105] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,097 [INFO ] myschema.mytable [2022-10-12 07:26:11,270] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,267 [INFO ] |Spark| DataFrame successfully created from SQL statement [2022-10-12 07:26:11,270] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,267 [INFO ] -------------------------------------- DBReader ends -------------------------------------- [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,267 [INFO ] ===================================== DBReader starts ===================================== [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,267 [INFO ] |Hive| -> |Spark| Reading table to DataFrame using parameters: [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,267 [INFO ] table = 'myschema.anothertable' [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,268 [INFO ] columns = '*' [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,268 [INFO ] where = "version_dt = '2022-09-24' and business_dt > '2022-12-01'" [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,268 [INFO ] options = None [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,268 [INFO ] |Hive| Checking connection availability... [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,271 [INFO ] |Spark| Using connection parameters: [2022-10-12 07:26:11,277] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,271 [INFO ] type = Hive [2022-10-12 07:26:11,329] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,320 [INFO ] |Hive| Connection is available. [2022-10-12 07:26:11,329] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,321 [INFO ] |Hive| Fetching schema of table 'myschema.anothertable' [2022-10-12 07:26:11,424] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,419 [INFO ] |Hive| Executing SQL query: [2022-10-12 07:26:11,430] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,420 [INFO ] SELECT [2022-10-12 07:26:11,430] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,420 [INFO ] number, [2022-10-12 07:26:11,430] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,420 [INFO ] version_dt, [2022-10-12 07:26:11,430] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,420 [INFO ] business_dt [2022-10-12 07:26:11,430] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,420 [INFO ] FROM [2022-10-12 07:26:11,430] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,420 [INFO ] myschema.anothertable [2022-10-12 07:26:11,430] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,421 [INFO ] WHERE [2022-10-12 07:26:11,430] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,421 [INFO ] version_dt = '2022-09-24' and business_dt > '2022-12-01' [2022-10-12 07:26:11,701] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,695 [INFO ] |Spark| DataFrame successfully created from SQL statement [2022-10-12 07:26:11,701] {ssh_hook.py:472} INFO - 2022-10-12 10:26:11,696 [INFO ] -------------------------------------- DBReader ends -------------------------------------- [2022-10-12 07:26:12,154] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,146 [INFO ] ===================================== DBWriter starts ===================================== [2022-10-12 07:26:12,154] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,147 [INFO ] |Spark| -> |Hive| Writing DataFrame to table using parameters: [2022-10-12 07:26:12,154] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,147 [INFO ] table = 'myschema.newtable' [2022-10-12 07:26:12,154] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,147 [INFO ] options: [2022-10-12 07:26:12,154] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,147 [INFO ] mode = 'overwrite_table' [2022-10-12 07:26:12,154] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,147 [INFO ] format = 'orc' [2022-10-12 07:26:12,154] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,148 [INFO ] partitionBy = 'business_dt' [2022-10-12 07:26:12,154] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,148 [INFO ] DataFrame schema [2022-10-12 07:26:12,159] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,155 [INFO ] root [2022-10-12 07:26:12,161] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,156 [INFO ] |-- col1: string (nullable = true) [2022-10-12 07:26:12,161] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,156 [INFO ] |-- col2: string (nullable = true) [2022-10-12 07:26:12,161] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,156 [INFO ] |-- col2: string (nullable = true) [2022-10-12 07:26:12,161] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,156 [INFO ] |-- business_dt: date (nullable = true) [2022-10-12 07:26:12,161] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,156 [INFO ] |Hive| Checking connection availability... [2022-10-12 07:26:12,161] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,156 [INFO ] |Spark| Using connection parameters: [2022-10-12 07:26:12,161] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,156 [INFO ] type = Hive [2022-10-12 07:26:12,179] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,173 [INFO ] |Hive| Connection is available. [2022-10-12 07:26:12,179] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,174 [INFO ] |Hive| Fetching schema of table 'myschema.newtable' [2022-10-12 07:26:12,330] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,324 [INFO ] |Hive| Table 'myschema.newtable' already exists [2022-10-12 07:26:12,330] {ssh_hook.py:472} INFO - 2022-10-12 10:26:12,325 [INFO ] |Hive| Saving data to a table 'myschema.newtable' [2022-10-12 07:33:25,863] {ssh_hook.py:472} INFO - 2022-10-12 10:33:25,856 [INFO ] |Hive| Table 'myschema.newtable' successfully created [2022-10-12 07:33:25,863] {ssh_hook.py:472} INFO - 2022-10-12 10:33:25,857 [INFO ] -------------------------------------- DBWriter ends -------------------------------------- ``` --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
