This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 784e0ef04b Refactor: Simplify a few loops (#33736)
784e0ef04b is described below
commit 784e0ef04b5013f4e370e9a0380f5c2589128c7f
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Sat Aug 26 07:17:54 2023 +0000
Refactor: Simplify a few loops (#33736)
---
airflow/providers/apache/hive/transfers/s3_to_hive.py | 8 ++++----
airflow/providers/google/cloud/hooks/bigquery.py | 12 ++++++------
scripts/ci/pre_commit/pre_commit_replace_bad_characters.py | 8 +++-----
tests/test_utils/mock_executor.py | 3 +--
4 files changed, 14 insertions(+), 17 deletions(-)
diff --git a/airflow/providers/apache/hive/transfers/s3_to_hive.py
b/airflow/providers/apache/hive/transfers/s3_to_hive.py
index 16791611c0..e3786b42f1 100644
--- a/airflow/providers/apache/hive/transfers/s3_to_hive.py
+++ b/airflow/providers/apache/hive/transfers/s3_to_hive.py
@@ -247,16 +247,16 @@ class S3ToHiveOperator(BaseOperator):
"Headers count mismatch File headers:\n %s\nField names: \n
%s\n", header_list, field_names
)
return False
- test_field_match = [h1.lower() == h2.lower() for h1, h2 in
zip(header_list, field_names)]
- if not all(test_field_match):
+ test_field_match = all(h1.lower() == h2.lower() for h1, h2 in
zip(header_list, field_names))
+ if test_field_match:
+ return True
+ else:
self.log.warning(
"Headers do not match field names File headers:\n %s\nField
names: \n %s\n",
header_list,
field_names,
)
return False
- else:
- return True
@staticmethod
def _delete_top_row_and_compress(input_file_name, output_file_ext,
dest_dir):
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index f5f532b154..9a79465bf4 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -3154,15 +3154,15 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
if "rows" in query_results and query_results["rows"]:
rows = query_results["rows"]
fields = query_results["schema"]["fields"]
+ fields_names = [field["name"] for field in fields]
col_types = [field["type"] for field in fields]
for dict_row in rows:
- typed_row = [bq_cast(vs["v"], col_types[idx]) for idx, vs in
enumerate(dict_row["f"])]
- if not as_dict:
- buffer.append(typed_row)
- else:
- fields_names = [field["name"] for field in fields]
- typed_row_dict = {k: v for k, v in zip(fields_names,
typed_row)}
+ typed_row = [bq_cast(vs["v"], col_type) for vs, col_type in
zip(dict_row["f"], col_types)]
+ if as_dict:
+ typed_row_dict = dict(zip(fields_names, typed_row))
buffer.append(typed_row_dict)
+ else:
+ buffer.append(typed_row)
return buffer
def value_check(
diff --git a/scripts/ci/pre_commit/pre_commit_replace_bad_characters.py
b/scripts/ci/pre_commit/pre_commit_replace_bad_characters.py
index d0b040720d..67377f9974 100755
--- a/scripts/ci/pre_commit/pre_commit_replace_bad_characters.py
+++ b/scripts/ci/pre_commit/pre_commit_replace_bad_characters.py
@@ -56,13 +56,11 @@ def main() -> int:
count_changes = 0
path = Path(file_string)
text = path.read_text()
- for index in range(len(matches)):
- current_match = matches[index]
- text, new_count_changes =
current_match.subn(REPLACEMENTS[index].replacement, text)
+ for match, spec in zip(matches, REPLACEMENTS):
+ text, new_count_changes = match.subn(spec.replacement, text)
if new_count_changes:
console.print(
- f"[yellow] Performed {new_count_changes} replacements "
- f"of {REPLACEMENTS[index].description}[/]: {path}"
+ f"[yellow] Performed {new_count_changes} replacements of
{spec.description}[/]: {path}"
)
count_changes += new_count_changes
if count_changes:
diff --git a/tests/test_utils/mock_executor.py
b/tests/test_utils/mock_executor.py
index 91e49e07fe..ba555fbcd9 100644
--- a/tests/test_utils/mock_executor.py
+++ b/tests/test_utils/mock_executor.py
@@ -69,8 +69,7 @@ class MockExecutor(BaseExecutor):
open_slots = self.parallelism - len(self.running)
sorted_queue = sorted(self.queued_tasks.items(), key=sort_by)
- for index in range(min((open_slots, len(sorted_queue)))):
- (key, (_, _, _, ti)) = sorted_queue[index]
+ for key, (_, _, _, ti) in sorted_queue[:open_slots]:
self.queued_tasks.pop(key)
ti._try_number += 1
state = self.mock_task_results[key]