This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9057c130e19 fix: remove extra slash character from file URI formation
in `MetastoreHivePartitionSensor.poke` method. (#48731)
9057c130e19 is described below
commit 9057c130e19692567d5e0e2926b947323b5e29cd
Author: Nitochkin <[email protected]>
AuthorDate: Fri Apr 4 16:54:30 2025 +0200
fix: remove extra slash character from file URI formation in
`MetastoreHivePartitionSensor.poke` method. (#48731)
Co-authored-by: Anton Nitochkin <[email protected]>
---
.../google/cloud/sensors/dataproc_metastore.py | 2 +-
.../cloud/sensors/test_dataproc_metastore.py | 41 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 1 deletion(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/sensors/dataproc_metastore.py
b/providers/google/src/airflow/providers/google/cloud/sensors/dataproc_metastore.py
index a3c712e7836..d60ff39d663 100644
---
a/providers/google/src/airflow/providers/google/cloud/sensors/dataproc_metastore.py
+++
b/providers/google/src/airflow/providers/google/cloud/sensors/dataproc_metastore.py
@@ -112,7 +112,7 @@ class MetastoreHivePartitionSensor(BaseSensorOperator):
# Extract actual query results
result_base_uri = result_manifest_uri.rsplit("/", 1)[0]
- results = (f"{result_base_uri}//{filename}" for filename in
manifest.get("filenames", []))
+ results = (f"{result_base_uri}/{filename}" for filename in
manifest.get("filenames", []))
found_partitions = sum(
len(
parse_json_from_gcs(
diff --git
a/providers/google/tests/unit/google/cloud/sensors/test_dataproc_metastore.py
b/providers/google/tests/unit/google/cloud/sensors/test_dataproc_metastore.py
index 92ae82d3256..1933e1a1859 100644
---
a/providers/google/tests/unit/google/cloud/sensors/test_dataproc_metastore.py
+++
b/providers/google/tests/unit/google/cloud/sensors/test_dataproc_metastore.py
@@ -48,6 +48,7 @@ TEST_REGION = "test-region"
TEST_TABLE = "test_table"
GCP_PROJECT = "test-project"
GCP_CONN_ID = "test-conn"
+TEST_URI = "test-uri"
class TestMetastoreHivePartitionSensor:
@@ -142,3 +143,43 @@ class TestMetastoreHivePartitionSensor:
with pytest.raises(AirflowException, match=f"Request failed:
{error_message}"):
sensor.poke(context={})
+
+ @pytest.mark.parametrize(
+ "requested_partitions, result_files_with_rows, expected_result",
+ [
+ ([PARTITION_1, PARTITION_1], [(RESULT_FILE_NAME_1, [ROW_1])],
True),
+ ],
+ )
+ @mock.patch(DATAPROC_METASTORE_SENSOR_PATH.format("DataprocMetastoreHook"))
+ @mock.patch(DATAPROC_METASTORE_SENSOR_PATH.format("parse_json_from_gcs"))
+ def test_file_uri(
+ self,
+ mock_parse_json_from_gcs,
+ mock_hook,
+ requested_partitions,
+ result_files_with_rows,
+ expected_result,
+ ):
+ mock_hook.return_value.wait_for_operation.return_value =
mock.MagicMock(result_manifest_uri=TEST_URI)
+ manifest = deepcopy(MANIFEST_SUCCESS)
+ parse_json_from_gcs_side_effect = []
+ for file_name, rows in result_files_with_rows:
+ manifest["filenames"].append(file_name)
+ file = deepcopy(RESULT_FILE_CONTENT)
+ file["rows"] = rows
+ parse_json_from_gcs_side_effect.append(file)
+
+ mock_parse_json_from_gcs.side_effect = [manifest,
*parse_json_from_gcs_side_effect]
+
+ sensor = MetastoreHivePartitionSensor(
+ task_id=TEST_TASK_ID,
+ service_id=TEST_SERVICE_ID,
+ region=TEST_REGION,
+ table=TEST_TABLE,
+ partitions=requested_partitions,
+ gcp_conn_id=GCP_CONN_ID,
+ )
+ assert sensor.poke(context={}) == expected_result
+ mock_parse_json_from_gcs.assert_called_with(
+ file_uri=TEST_URI + "/" + RESULT_FILE_NAME_1,
gcp_conn_id=GCP_CONN_ID, impersonation_chain=None
+ )