This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch SDAP-521
in repository https://gitbox.apache.org/repos/asf/sdap-nexus.git

commit 3b2bbf655401ced5831ec0becbd17c4ebb70fe09
Author: rileykk <[email protected]>
AuthorDate: Fri Jul 5 10:01:30 2024 -0700

    Updates
---
 tests/download_data.sh |  10 +-
 tests/test_sdap.py     | 922 +++++++++++++++++++++++++++----------------------
 2 files changed, 512 insertions(+), 420 deletions(-)

diff --git a/tests/download_data.sh b/tests/download_data.sh
index 6064ad3..ed88e83 100755
--- a/tests/download_data.sh
+++ b/tests/download_data.sh
@@ -14,9 +14,15 @@ WGETRC="$wgetrc"
 
 prompt_credentials() {
     echo "Enter your Earthdata Login or other provider supplied credentials"
-    read -p "Username (rileykk): " username
-    username=${username:-rileykk}
+    read -p "Username: " username
+    username=${username}
+    if [ -z "${username}" ]; then
+      exit_with_error "Username is required"
+    fi
     read -s -p "Password: " password
+    if [ -z "${password}" ]; then
+      exit_with_error "Password is required"
+    fi
     echo "machine urs.earthdata.nasa.gov login $username password $password" 
>> $netrc
     echo
 }
diff --git a/tests/test_sdap.py b/tests/test_sdap.py
index 1919c9e..d27e825 100644
--- a/tests/test_sdap.py
+++ b/tests/test_sdap.py
@@ -601,367 +601,242 @@ def run_matchup(url, params, page_size=3500):
 
 
 @pytest.mark.integration
[email protected](
-    ['match', 'expected'],
-    list(zip(
-        ['gridded_to_gridded', 'gridded_to_swath', 'swath_to_gridded', 
'swath_to_swath'],
-        [1058, 6, 21, 4026]
-    ))
-)
-def test_match_spark(host, start, fail_on_miscount, matchup_params, match, 
expected):
-    url = urljoin(host, 'match_spark')
-
-    params = matchup_params[match]
-
-    bounding_poly = b_to_polygon(params['b'])
-
-    body = run_matchup(url, params)
-    try_save(f"test_matchup_spark_{match}", start, body)
-    data = body['data']
+def test_version(host, start):
+    url = urljoin(host, 'version')
 
-    for match in data:
-        verify_match_consistency(match, params, bounding_poly)
+    response = requests.get(url)
 
-    uniq_primaries(data, case=f"test_matchup_spark_{match}")
-    check_count(len(data), expected, fail_on_miscount)
+    assert response.status_code == 200
+    assert re.match(r'^\d+\.\d+\.\d+(-.+)?$', response.text)
 
 
 @pytest.mark.integration
-def test_match_spark_job_cancellation(host, start, matchup_params):
-    url = urljoin(host, 'match_spark')
-
-    params = matchup_params['long']
-
-    response = requests.get(url, params=params)
+def test_capabilities(host, start):
+    url = urljoin(host, 'capabilities')
 
-    assert response.status_code == 200, 'Initial match_spark query failed'
-    response_json = response.json()
+    response = requests.get(url)
 
-    asynchronous = 'status' in response_json
+    assert response.status_code == 200
 
-    if not asynchronous:
-        skip('Deployed SDAP version does not have asynchronous matchup')
-    else:
-        sleep(1)  # Time to allow spark workers to start working
+    capabilities = response.json()
 
-        if response_json['status'] != 'running':
-            skip('Job finished before it could be cancelled')
-        else:
-            cancel_url = [link for link in response_json['links'] if 
link['rel'] == 'cancel'][0]['href']
+    try_save('test_capabilities', start, capabilities)
 
-            cancel_url = url_scheme(
-                urlparse(url).scheme,
-                cancel_url
-            )
+    assert len(capabilities) > 0
 
-            cancel_response = requests.get(cancel_url)
-            assert cancel_response.status_code == 200, 'Cancellation query 
failed'
+    for capability in capabilities:
+        assert all([k in capability for k in ['name', 'path', 'description', 
'parameters']])
+        assert all([isinstance(k, str) for k in ['name', 'path', 
'description']])
 
-            cancel_json = cancel_response.json()
+        assert isinstance(capability['parameters'], (dict, list))
 
-            assert cancel_json['status'] != 'running', 'Job did not cancel'
+        for param in capability['parameters']:
+            if isinstance(capability['parameters'], dict):
+                param = capability['parameters'][param]
 
-            if cancel_json['status'] in ['success', 'failed']:
-                warnings.warn(f'Job status after cancellation is not 
\'cancelled\' ({cancel_json["status"]}), passing '
-                              f'case because it is no longer \'running\', but 
actual cancellation could not be tested '
-                              f'here.')
+            assert isinstance(param, dict)
+            assert all([k in param and isinstance(param[k], str) for k in 
['name', 'type', 'description']])
 
 
 @pytest.mark.integration
[email protected]('Test not re-implemented yet')
-def test_cdmsresults_json(host, eid, start):
-    url = urljoin(host, 'cdmsresults')
-
-    # Skip the test automatically if the matchup request was not successful
-    if not eid['successful']:
-        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
-
-    def fetch_result(execution_id, output):
-        return requests.get(url, params={"id": execution_id, "output": output})
-
-    eid_list = eid['eid']
-    param_list = eid['params']
+def test_endpoints(host, start):
+    url = urljoin(host, 'capabilities')
 
-    response = fetch_result(eid_list[0], "JSON")
+    response = requests.get(url)
 
-    assert response.status_code == 200
+    if response.status_code != 200:
+        skip('Could not get endpoints list. Expected if test_capabilities has 
failed')
 
-    body = response.json()
-    try_save("test_cdmsresults_json_A", start, body)
+    capabilities = response.json()
 
-    data = body['data']
-    assert len(data) == 5
+    endpoints = [c['path'] for c in capabilities]
 
-    for m in data:
-        m['point'] = f"Point({m['lon']} {m['lat']})"
-        for s in m['matches']:
-            s['point'] = f"Point({s['lon']} {s['lat']})"
+    non_existent_endpoints = []
 
-    data.sort(key=lambda e: e['point'])
+    for endpoint in endpoints:
+        status = requests.head(urljoin(host, endpoint)).status_code
 
-    params = param_list[0]
-    bounding_poly = b_to_polygon(params['b'])
+        if status == 404:
+            # Strip special characters because some endpoints have 
wildcards/regex characters
+            # This may not work forever though
+            stripped_endpoint = re.sub(r'[^a-zA-Z0-9/_-]', '', endpoint)
 
-    verify_match(
-        data[0], 'Point(-86.125 27.625)',
-        1535360400, 'Point(-86.13 27.63)',
-        1535374800, params, bounding_poly
-    )
+            status = requests.head(urljoin(host, 
stripped_endpoint)).status_code
 
-    verify_match(
-        data[1], 'Point(-88.875 27.875)',
-        1534669200, 'Point(-88.88 27.88)',
-        1534698000, params, bounding_poly
-    )
+            if status == 404:
+                non_existent_endpoints.append(([endpoint, stripped_endpoint], 
status))
 
-    verify_match(
-        data[2], 'Point(-90.125 27.625)',
-        1534496400, 'Point(-90.13 27.63)',
-        1534491000, params, bounding_poly
-    )
+    assert len(non_existent_endpoints) == 0, non_existent_endpoints
 
-    verify_match(
-        data[3], 'Point(-90.125 28.125)',
-        1534928400, 'Point(-90.13 28.12)',
-        1534899600, params, bounding_poly
-    )
 
-    verify_match(
-        data[4], 'Point(-90.375 28.125)',
-        1534842000, 'Point(-90.38 28.12)',
-        1534813200, params, bounding_poly
-    )
[email protected]
+def test_heartbeat(host, start):
+    url = urljoin(host, 'heartbeat')
 
-    response = fetch_result(eid_list[1], "JSON")
+    response = requests.get(url)
 
     assert response.status_code == 200
+    heartbeat = response.json()
 
-    body = response.json()
-    try_save("test_cdmsresults_json_B", start, body)
-
-    data = body['data']
-    assert len(data) == 5
-
-    for m in data:
-        m['point'] = f"Point({m['lon']} {m['lat']})"
-        for s in m['matches']:
-            s['point'] = f"Point({s['lon']} {s['lat']})"
-
-    data.sort(key=lambda e: e['point'])
-
-    params = param_list[1]
-    bounding_poly = b_to_polygon(params['b'])
-
-    verify_match(
-        data[0], 'Point(-86.125 27.625)',
-        1535371200, 'Point(-86.13 27.63)',
-        1535374800, params, bounding_poly
-    )
-
-    verify_match(
-        data[1], 'Point(-88.875 27.875)',
-        1534680000, 'Point(-88.88 27.88)',
-        1534698000, params, bounding_poly
-    )
-
-    verify_match(
-        data[2], 'Point(-90.125 27.625)',
-        1534507200, 'Point(-90.13 27.63)',
-        1534491000, params, bounding_poly
-    )
-
-    verify_match(
-        data[3], 'Point(-90.125 28.125)',
-        1534939200, 'Point(-90.13 28.12)',
-        1534899600, params, bounding_poly
-    )
-
-    verify_match(
-        data[4], 'Point(-90.375 28.125)',
-        1534852800, 'Point(-90.38 28.12)',
-        1534813200, params, bounding_poly
-    )
+    assert isinstance(heartbeat, dict)
+    assert all(heartbeat.values())
 
 
 @pytest.mark.integration
[email protected]('Test not re-implemented yet')
-def test_cdmsresults_csv(host, eid, start):
-    url = urljoin(host, 'cdmsresults')
-
-    # Skip the test automatically if the matchup request was not successful
-    if not eid['successful']:
-        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
-
-    def fetch_result(execution_id, output):
-        return requests.get(url, params={"id": execution_id, "output": output})
-
-    eid_list = eid['eid']
-    param_list = eid['params']
+def test_swaggerui_sdap(host):
+    url = urljoin(host, 'apidocs/')
 
-    response = fetch_result(eid_list[0], "CSV")
-    params = param_list[0]
-    bounding_poly = b_to_polygon(params['b'])
+    response = requests.get(url)
 
     assert response.status_code == 200
+    assert 'swagger-ui' in response.text
 
-    try_save("test_cdmsresults_csv_A", start, response, "csv")
-
-    rows = response.text.split('\r\n')
-    index = rows.index('')
-
-    global_rows = rows[:index]
-    matchup_rows = rows[index + 1:-1]  # Drop trailing empty string from 
trailing newline
-
-    global_rows = translate_global_rows(global_rows)
-    matchup_rows = translate_matchup_rows(matchup_rows)
-
-    assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched'])
-
-    for row in matchup_rows:
-        primary_point = lat_lon_to_point(row['lat'], row['lon'])
-
-        assert bounding_poly.intersects(primary_point)
-        assert params['startTime'] <= format_time(row['time']) <= 
params['endTime']
-
-        verify_secondary_in_tolerance(
-            {'lat': row['lat'], 'lon': row['lon']},
-            {'lat': row['lat_secondary'], 'lon': row['lon_secondary']},
-            params['rt']
-        )
-        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
-               <= iso_time_to_epoch(format_time(row['time_secondary'])) \
-               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
+    try:
+        # There's probably a better way to do this, but extract the .yml file 
for the docs from the returned text
+        soup = BeautifulSoup(response.text, 'html.parser')
 
-    response = fetch_result(eid_list[1], "CSV")
-    params = param_list[1]
-    bounding_poly = b_to_polygon(params['b'])
+        script = str([tag for tag in soup.find_all('script') if tag.attrs == 
{}][0])
 
-    assert response.status_code == 200
+        start_index = script.find('url:')
+        end_index = script.find('",\n', start_index)
 
-    try_save("test_cdmsresults_csv_B", start, response, "csv")
+        script = script[start_index:end_index]
 
-    rows = response.text.split('\r\n')
-    index = rows.index('')
+        yml_filename = script.split('"')[1]
 
-    global_rows = rows[:index]
-    matchup_rows = rows[index + 1:-1]  # Drop trailing empty string from 
trailing newline
+        url = urljoin(url, yml_filename)
 
-    global_rows = translate_global_rows(global_rows)
-    matchup_rows = translate_matchup_rows(matchup_rows)
+        response = requests.get(url)
 
-    assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched'])
+        assert response.status_code == 200
+    except AssertionError:
+        raise
+    except:
+        try:
+            url = urljoin(url, 'openapi.yml')
 
-    for row in matchup_rows:
-        primary_point = lat_lon_to_point(row['lat'], row['lon'])
+            response = requests.get(url)
 
-        assert bounding_poly.intersects(primary_point)
-        assert params['startTime'] <= format_time(row['time']) <= 
params['endTime']
+            assert response.status_code == 200
 
-        verify_secondary_in_tolerance(
-            {'lat': row['lat'], 'lon': row['lon']},
-            {'lat': row['lat_secondary'], 'lon': row['lon_secondary']},
-            params['rt']
-        )
-        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
-               <= iso_time_to_epoch(format_time(row['time_secondary'])) \
-               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
+            warnings.warn("Could not extract documentation yaml filename from 
response text, "
+                          "but using an assumed value worked successfully")
+        except:
+            raise ValueError("Could not verify documentation yaml file, 
assumed value also failed")
 
 
 @pytest.mark.integration
[email protected]('Test not re-implemented yet')
-def test_cdmsresults_netcdf(host, eid, start):
-    warnings.filterwarnings('ignore')
-
-    url = urljoin(host, 'cdmsresults')
-
-    # Skip the test automatically if the matchup request was not successful
-    if not eid['successful']:
-        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
-
-    def fetch_result(execution_id, output):
-        return requests.get(url, params={"id": execution_id, "output": output})
-
-    eid_list = eid['eid']
-    param_list = eid['params']
-
-    temp_file = Temp(mode='wb+', suffix='.csv.tmp', prefix='CDMSReader_')
+def test_list(host, start):
+    url = urljoin(host, 'list')
 
-    response = fetch_result(eid_list[0], "NETCDF")
-    params = param_list[0]
-    bounding_poly = b_to_polygon(params['b'])
+    response = requests.get(url)
 
     assert response.status_code == 200
 
-    try_save("test_cdmsresults_netcdf_A", start, response, "nc", 'wb')
-
-    temp_file.write(response.content)
-    temp_file.flush()
-    temp_file.seek(0)
+    body = response.json()
+    try_save("test_list", start, body)
 
-    matches = cdms_reader.assemble_matches(temp_file.name)
+    assert isinstance(body, list)
 
-    cdms_reader.matches_to_csv(matches, temp_file.name)
+    if len(body) == 0:
+        warnings.warn('/list returned no datasets. This could be correct if 
SDAP has no data ingested, otherwise '
+                      'this should be considered a failure')
 
-    with open(temp_file.name) as f:
-        reader = csv.DictReader(f)
-        rows = list(reader)
 
-    for row in rows:
-        primary_point = lat_lon_to_point(row['PrimaryData_lat'], 
row['PrimaryData_lon'])
[email protected]
[email protected](
+    ['collection'],
+    [('MUR25-JPL-L4-GLOB-v04.2_test',), 
('OISSS_L4_multimission_7day_v1_test',)]
+)
+def test_subset_L4(host, start, collection):
+    url = urljoin(host, 'datainbounds')
 
-        assert bounding_poly.intersects(primary_point)
-        assert iso_time_to_epoch(params['startTime']) \
-               <= float(row['PrimaryData_time']) \
-               <= iso_time_to_epoch(params['endTime'])
+    params = {
+        "ds": collection,
+        "startTime": "2018-09-24T00:00:00Z",
+        "endTime": "2018-09-30T00:00:00Z",
+        "b": "160,-30,180,-25",
+    }
 
-        verify_secondary_in_tolerance(
-            {'lat': row['PrimaryData_lat'], 'lon': row['PrimaryData_lon']},
-            {'lat': row['SecondaryData_lat'], 'lon': row['SecondaryData_lon']},
-            params['rt']
-        )
-        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
-               <= float(row['SecondaryData_time']) \
-               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
+    response = requests.get(url, params=params)
+    assert response.status_code == 200
+
+    data = response.json()
+    try_save(f"test_datainbounds_L4_{collection}", start, data)
 
-    response = fetch_result(eid_list[1], "NETCDF")
-    params = param_list[1]
     bounding_poly = b_to_polygon(params['b'])
 
-    assert response.status_code == 200
+    epoch = datetime(1970, 1, 1, tzinfo=UTC)
 
-    try_save("test_cdmsresults_netcdf_B", start, response, "nc", 'wb')
+    start = (datetime.strptime(params['startTime'], 
'%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds()
+    end = (datetime.strptime(params['endTime'], 
'%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds()
 
-    temp_file.write(response.content)
-    temp_file.flush()
-    temp_file.seek(0)
+    for p in data:
+        assert bounding_poly.intersects(Point(float(p['longitude']), 
float(p['latitude'])))
+        assert start <= p['time'] <= end
+
+    # response_buf = io.BytesIO(response.content)
+    #
+    # with ZipFile(response_buf) as data:
+    #     namelist = data.namelist()
+    #
+    #     assert namelist == [f'{collection}.csv']
+    #
+    #     csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
+    #     csv_data = pd.read_csv(csv_buf)
+    #
+    # def validate_row_bounds(row):
+    #     assert bounding_poly.intersects(Point(float(row['longitude']), 
float(row['latitude'])))
+    #     assert params['startTime'] <= row['time'] <= params['endTime']
+    #
+    # for i in range(0, len(csv_data)):
+    #     validate_row_bounds(csv_data.iloc[i])
 
-    matches = cdms_reader.assemble_matches(temp_file.name)
 
-    cdms_reader.matches_to_csv(matches, temp_file.name)
[email protected]
+def test_subset_L2(host, start):
+    url = urljoin(host, 'datainbounds')
 
-    with open(temp_file.name) as f:
-        reader = csv.DictReader(f)
-        rows = list(reader)
+    params = {
+        "ds": "ASCATB-L2-Coastal_test",
+        "startTime": "2018-09-24T00:00:00Z",
+        "endTime": "2018-09-30T00:00:00Z",
+        "b": "160,-30,180,-25",
+    }
 
-    for row in rows:
-        primary_point = lat_lon_to_point(row['PrimaryData_lat'], 
row['PrimaryData_lon'])
+    response = requests.get(url, params=params)
+    assert response.status_code == 200
 
-        assert bounding_poly.intersects(primary_point)
-        assert iso_time_to_epoch(params['startTime']) \
-               <= float(row['PrimaryData_time']) \
-               <= iso_time_to_epoch(params['endTime'])
+    data = response.json()
+    try_save("test_datainbounds_L2", start, data)
 
-        verify_secondary_in_tolerance(
-            {'lat': row['PrimaryData_lat'], 'lon': row['PrimaryData_lon']},
-            {'lat': row['SecondaryData_lat'], 'lon': row['SecondaryData_lon']},
-            params['rt']
-        )
-        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
-               <= float(row['SecondaryData_time']) \
-               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
+    bounding_poly = b_to_polygon(params['b'])
 
-    temp_file.close()
-    warnings.filterwarnings('default')
+    epoch = datetime(1970, 1, 1, tzinfo=UTC)
+
+    start = (datetime.strptime(params['startTime'], 
'%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds()
+    end = (datetime.strptime(params['endTime'], 
'%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC) - epoch).total_seconds()
+
+    for p in data:
+        assert bounding_poly.intersects(Point(float(p['longitude']), 
float(p['latitude'])))
+        assert start <= p['time'] <= end
+
+    # response_buf = io.BytesIO(response.content)
+    #
+    # with ZipFile(response_buf) as data:
+    #     namelist = data.namelist()
+    #
+    #     assert namelist == ['ASCATB-L2-Coastal_test.csv']
+    #
+    #     csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
+    #     csv_data = pd.read_csv(csv_buf)
+    #
+    # def validate_row_bounds(row):
+    #     assert bounding_poly.intersects(Point(float(row['longitude']), 
float(row['latitude'])))
+    #     assert params['startTime'] <= row['time'] <= params['endTime']
+    #
+    # for i in range(0, len(csv_data)):
+    #     validate_row_bounds(csv_data.iloc[i])
 
 
 @pytest.mark.integration
@@ -993,23 +868,6 @@ def test_timeseries_spark(host, start):
         assert start <= p[0]['time'] <= end
 
 
[email protected]
-def test_list(host, start):
-    url = urljoin(host, 'list')
-
-    response = requests.get(url)
-
-    assert response.status_code == 200
-
-    body = response.json()
-    try_save("test_list", start, body)
-
-    assert isinstance(body, list)
-
-    if len(body) == 0:
-        warnings.warn('/list returned no datasets. This could be correct if 
SDAP has no data ingested, otherwise '
-                      'this should be considered a failure')
-
 @pytest.mark.integration
 def test_cdmslist(host, start):
     url = urljoin(host, 'cdmslist')
@@ -1036,11 +894,15 @@ def test_cdmslist(host, start):
 
 
 @pytest.mark.integration
-def test_cdmssubset_L4(host, start):
[email protected](
+    ['collection'],
+    [('MUR25-JPL-L4-GLOB-v04.2_test',), 
('OISSS_L4_multimission_7day_v1_test',)]
+)
+def test_cdmssubset_L4(host, start, collection):
     url = urljoin(host, 'cdmssubset')
 
     params = {
-        "dataset": "MUR25-JPL-L4-GLOB-v04.2_test",
+        "dataset": collection,
         "parameter": "sst",
         "startTime": "2018-09-24T00:00:00Z",
         "endTime": "2018-09-30T00:00:00Z",
@@ -1052,7 +914,7 @@ def test_cdmssubset_L4(host, start):
 
     assert response.status_code == 200
 
-    try_save("test_cdmssubset_L4_a", start, response, "zip", 'wb')
+    try_save(f"test_cdmssubset_L4_{collection}", start, response, "zip", 'wb')
 
     bounding_poly = b_to_polygon(params['b'])
 
@@ -1061,7 +923,7 @@ def test_cdmssubset_L4(host, start):
     with ZipFile(response_buf) as data:
         namelist = data.namelist()
 
-        assert namelist == ['MUR25-JPL-L4-GLOB-v04.2_test.csv']
+        assert namelist == [f'{collection}.csv']
 
         csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
         csv_data = pd.read_csv(csv_buf)
@@ -1073,188 +935,412 @@ def test_cdmssubset_L4(host, start):
     for i in range(0, len(csv_data)):
         validate_row_bounds(csv_data.iloc[i])
 
-    params['dataset'] = 'OISSS_L4_multimission_7day_v1_test'
+
[email protected]
+def test_cdmssubset_L2(host, start):
+    url = urljoin(host, 'cdmssubset')
+
+    params = {
+        "dataset": "ASCATB-L2-Coastal_test",
+        "startTime": "2018-09-24T00:00:00Z",
+        "endTime": "2018-09-30T00:00:00Z",
+        "b": "160,-30,180,-25",
+        "output": "ZIP"
+    }
 
     response = requests.get(url, params=params)
 
     assert response.status_code == 200
 
-    try_save("test_cdmssubset_L4_b", start, response, "zip", 'wb')
+    try_save("test_cdmssubset_L2", start, response, "zip", 'wb')
+
+    bounding_poly = b_to_polygon(params['b'])
 
     response_buf = io.BytesIO(response.content)
 
-    with ZipFile(response_buf) as data:
-        namelist = data.namelist()
+    with ZipFile(response_buf) as data:
+        namelist = data.namelist()
+
+        assert namelist == ['ASCATB-L2-Coastal_test.csv']
+
+        csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
+        csv_data = pd.read_csv(csv_buf)
+
+    def validate_row_bounds(row):
+        assert bounding_poly.intersects(Point(float(row['longitude']), 
float(row['latitude'])))
+        assert params['startTime'] <= row['time'] <= params['endTime']
+
+    for i in range(0, len(csv_data)):
+        validate_row_bounds(csv_data.iloc[i])
+
+
[email protected]
[email protected](
+    ['match', 'expected'],
+    list(zip(
+        ['gridded_to_gridded', 'gridded_to_swath', 'swath_to_gridded', 
'swath_to_swath'],
+        [1058, 6, 21, 4026]
+    ))
+)
+def test_match_spark(host, start, fail_on_miscount, matchup_params, match, 
expected):
+    url = urljoin(host, 'match_spark')
+
+    params = matchup_params[match]
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params)
+    try_save(f"test_matchup_spark_{match}", start, body)
+    data = body['data']
+
+    for match in data:
+        verify_match_consistency(match, params, bounding_poly)
+
+    uniq_primaries(data, case=f"test_matchup_spark_{match}")
+    check_count(len(data), expected, fail_on_miscount)
+
+
[email protected]
+def test_match_spark_job_cancellation(host, start, matchup_params):
+    url = urljoin(host, 'match_spark')
+
+    params = matchup_params['long']
+
+    response = requests.get(url, params=params)
+
+    assert response.status_code == 200, 'Initial match_spark query failed'
+    response_json = response.json()
+
+    asynchronous = 'status' in response_json
+
+    if not asynchronous:
+        skip('Deployed SDAP version does not have asynchronous matchup')
+    else:
+        sleep(1)  # Time to allow spark workers to start working
+
+        if response_json['status'] != 'running':
+            skip('Job finished before it could be cancelled')
+        else:
+            cancel_url = [link for link in response_json['links'] if 
link['rel'] == 'cancel'][0]['href']
+
+            cancel_url = url_scheme(
+                urlparse(url).scheme,
+                cancel_url
+            )
+
+            cancel_response = requests.get(cancel_url)
+            assert cancel_response.status_code == 200, 'Cancellation query 
failed'
+
+            cancel_json = cancel_response.json()
+
+            assert cancel_json['status'] != 'running', 'Job did not cancel'
+
+            if cancel_json['status'] in ['success', 'failed']:
+                warnings.warn(f'Job status after cancellation is not 
\'cancelled\' ({cancel_json["status"]}), passing '
+                              f'case because it is no longer \'running\', but 
actual cancellation could not be tested '
+                              f'here.')
+
+
[email protected]
[email protected]('Test not re-implemented yet')
+def test_cdmsresults_json(host, eid, start):
+    url = urljoin(host, 'cdmsresults')
+
+    # Skip the test automatically if the matchup request was not successful
+    if not eid['successful']:
+        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
+
+    def fetch_result(execution_id, output):
+        return requests.get(url, params={"id": execution_id, "output": output})
+
+    eid_list = eid['eid']
+    param_list = eid['params']
+
+    response = fetch_result(eid_list[0], "JSON")
+
+    assert response.status_code == 200
+
+    body = response.json()
+    try_save("test_cdmsresults_json_A", start, body)
+
+    data = body['data']
+    assert len(data) == 5
+
+    for m in data:
+        m['point'] = f"Point({m['lon']} {m['lat']})"
+        for s in m['matches']:
+            s['point'] = f"Point({s['lon']} {s['lat']})"
+
+    data.sort(key=lambda e: e['point'])
+
+    params = param_list[0]
+    bounding_poly = b_to_polygon(params['b'])
+
+    verify_match(
+        data[0], 'Point(-86.125 27.625)',
+        1535360400, 'Point(-86.13 27.63)',
+        1535374800, params, bounding_poly
+    )
+
+    verify_match(
+        data[1], 'Point(-88.875 27.875)',
+        1534669200, 'Point(-88.88 27.88)',
+        1534698000, params, bounding_poly
+    )
+
+    verify_match(
+        data[2], 'Point(-90.125 27.625)',
+        1534496400, 'Point(-90.13 27.63)',
+        1534491000, params, bounding_poly
+    )
+
+    verify_match(
+        data[3], 'Point(-90.125 28.125)',
+        1534928400, 'Point(-90.13 28.12)',
+        1534899600, params, bounding_poly
+    )
+
+    verify_match(
+        data[4], 'Point(-90.375 28.125)',
+        1534842000, 'Point(-90.38 28.12)',
+        1534813200, params, bounding_poly
+    )
+
+    response = fetch_result(eid_list[1], "JSON")
+
+    assert response.status_code == 200
+
+    body = response.json()
+    try_save("test_cdmsresults_json_B", start, body)
+
+    data = body['data']
+    assert len(data) == 5
+
+    for m in data:
+        m['point'] = f"Point({m['lon']} {m['lat']})"
+        for s in m['matches']:
+            s['point'] = f"Point({s['lon']} {s['lat']})"
+
+    data.sort(key=lambda e: e['point'])
+
+    params = param_list[1]
+    bounding_poly = b_to_polygon(params['b'])
+
+    verify_match(
+        data[0], 'Point(-86.125 27.625)',
+        1535371200, 'Point(-86.13 27.63)',
+        1535374800, params, bounding_poly
+    )
+
+    verify_match(
+        data[1], 'Point(-88.875 27.875)',
+        1534680000, 'Point(-88.88 27.88)',
+        1534698000, params, bounding_poly
+    )
 
-        assert namelist == ['OISSS_L4_multimission_7day_v1_test.csv']
+    verify_match(
+        data[2], 'Point(-90.125 27.625)',
+        1534507200, 'Point(-90.13 27.63)',
+        1534491000, params, bounding_poly
+    )
 
-        csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
-        csv_data = pd.read_csv(csv_buf)
+    verify_match(
+        data[3], 'Point(-90.125 28.125)',
+        1534939200, 'Point(-90.13 28.12)',
+        1534899600, params, bounding_poly
+    )
 
-    for i in range(0, len(csv_data)):
-        validate_row_bounds(csv_data.iloc[i])
+    verify_match(
+        data[4], 'Point(-90.375 28.125)',
+        1534852800, 'Point(-90.38 28.12)',
+        1534813200, params, bounding_poly
+    )
 
 
 @pytest.mark.integration
-def test_cdmssubset_L2(host, start):
-    url = urljoin(host, 'cdmssubset')
-
-    params = {
-        "dataset": "ASCATB-L2-Coastal_test",
-        "startTime": "2018-09-24T00:00:00Z",
-        "endTime": "2018-09-30T00:00:00Z",
-        "b": "160,-30,180,-25",
-        "output": "ZIP"
-    }
[email protected]('Test not re-implemented yet')
+def test_cdmsresults_csv(host, eid, start):
+    url = urljoin(host, 'cdmsresults')
 
-    response = requests.get(url, params=params)
+    # Skip the test automatically if the matchup request was not successful
+    if not eid['successful']:
+        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
 
-    assert response.status_code == 200
+    def fetch_result(execution_id, output):
+        return requests.get(url, params={"id": execution_id, "output": output})
 
-    try_save("test_cdmssubset_L2", start, response, "zip", 'wb')
+    eid_list = eid['eid']
+    param_list = eid['params']
 
+    response = fetch_result(eid_list[0], "CSV")
+    params = param_list[0]
     bounding_poly = b_to_polygon(params['b'])
 
-    response_buf = io.BytesIO(response.content)
-
-    with ZipFile(response_buf) as data:
-        namelist = data.namelist()
-
-        assert namelist == ['ASCATB-L2-Coastal_test.csv']
+    assert response.status_code == 200
 
-        csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
-        csv_data = pd.read_csv(csv_buf)
+    try_save("test_cdmsresults_csv_A", start, response, "csv")
 
-    def validate_row_bounds(row):
-        assert bounding_poly.intersects(Point(float(row['longitude']), 
float(row['latitude'])))
-        assert params['startTime'] <= row['time'] <= params['endTime']
+    rows = response.text.split('\r\n')
+    index = rows.index('')
 
-    for i in range(0, len(csv_data)):
-        validate_row_bounds(csv_data.iloc[i])
+    global_rows = rows[:index]
+    matchup_rows = rows[index + 1:-1]  # Drop trailing empty string from 
trailing newline
 
+    global_rows = translate_global_rows(global_rows)
+    matchup_rows = translate_matchup_rows(matchup_rows)
 
[email protected]
-def test_swaggerui_sdap(host):
-    url = urljoin(host, 'apidocs/')
+    assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched'])
 
-    response = requests.get(url)
+    for row in matchup_rows:
+        primary_point = lat_lon_to_point(row['lat'], row['lon'])
 
-    assert response.status_code == 200
-    assert 'swagger-ui' in response.text
+        assert bounding_poly.intersects(primary_point)
+        assert params['startTime'] <= format_time(row['time']) <= 
params['endTime']
 
-    try:
-        # There's probably a better way to do this, but extract the .yml file 
for the docs from the returned text
-        soup = BeautifulSoup(response.text, 'html.parser')
+        verify_secondary_in_tolerance(
+            {'lat': row['lat'], 'lon': row['lon']},
+            {'lat': row['lat_secondary'], 'lon': row['lon_secondary']},
+            params['rt']
+        )
+        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
+               <= iso_time_to_epoch(format_time(row['time_secondary'])) \
+               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
 
-        script = str([tag for tag in soup.find_all('script') if tag.attrs == 
{}][0])
+    response = fetch_result(eid_list[1], "CSV")
+    params = param_list[1]
+    bounding_poly = b_to_polygon(params['b'])
 
-        start_index = script.find('url:')
-        end_index = script.find('",\n', start_index)
+    assert response.status_code == 200
 
-        script = script[start_index:end_index]
+    try_save("test_cdmsresults_csv_B", start, response, "csv")
 
-        yml_filename = script.split('"')[1]
+    rows = response.text.split('\r\n')
+    index = rows.index('')
 
-        url = urljoin(url, yml_filename)
+    global_rows = rows[:index]
+    matchup_rows = rows[index + 1:-1]  # Drop trailing empty string from 
trailing newline
 
-        response = requests.get(url)
+    global_rows = translate_global_rows(global_rows)
+    matchup_rows = translate_matchup_rows(matchup_rows)
 
-        assert response.status_code == 200
-    except AssertionError:
-        raise
-    except:
-        try:
-            url = urljoin(url, 'openapi.yml')
+    assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched'])
 
-            response = requests.get(url)
+    for row in matchup_rows:
+        primary_point = lat_lon_to_point(row['lat'], row['lon'])
 
-            assert response.status_code == 200
+        assert bounding_poly.intersects(primary_point)
+        assert params['startTime'] <= format_time(row['time']) <= 
params['endTime']
 
-            warnings.warn("Could not extract documentation yaml filename from 
response text, "
-                          "but using an assumed value worked successfully")
-        except:
-            raise ValueError("Could not verify documentation yaml file, 
assumed value also failed")
+        verify_secondary_in_tolerance(
+            {'lat': row['lat'], 'lon': row['lon']},
+            {'lat': row['lat_secondary'], 'lon': row['lon_secondary']},
+            params['rt']
+        )
+        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
+               <= iso_time_to_epoch(format_time(row['time_secondary'])) \
+               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
 
 
 @pytest.mark.integration
-def test_version(host, start):
-    url = urljoin(host, 'version')
[email protected]('Test not re-implemented yet')
+def test_cdmsresults_netcdf(host, eid, start):
+    warnings.filterwarnings('ignore')
 
-    response = requests.get(url)
+    url = urljoin(host, 'cdmsresults')
 
-    assert response.status_code == 200
-    assert re.match(r'^\d+\.\d+\.\d+(-.+)?$', response.text)
+    # Skip the test automatically if the matchup request was not successful
+    if not eid['successful']:
+        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
 
+    def fetch_result(execution_id, output):
+        return requests.get(url, params={"id": execution_id, "output": output})
 
[email protected]
-def test_capabilities(host, start):
-    url = urljoin(host, 'capabilities')
+    eid_list = eid['eid']
+    param_list = eid['params']
 
-    response = requests.get(url)
+    temp_file = Temp(mode='wb+', suffix='.csv.tmp', prefix='CDMSReader_')
+
+    response = fetch_result(eid_list[0], "NETCDF")
+    params = param_list[0]
+    bounding_poly = b_to_polygon(params['b'])
 
     assert response.status_code == 200
 
-    capabilities = response.json()
+    try_save("test_cdmsresults_netcdf_A", start, response, "nc", 'wb')
 
-    try_save('test_capabilities', start, capabilities)
+    temp_file.write(response.content)
+    temp_file.flush()
+    temp_file.seek(0)
 
-    assert len(capabilities) > 0
+    matches = cdms_reader.assemble_matches(temp_file.name)
 
-    for capability in capabilities:
-        assert all([k in capability for k in ['name', 'path', 'description', 
'parameters']])
-        assert all([isinstance(k, str) for k in ['name', 'path', 
'description']])
+    cdms_reader.matches_to_csv(matches, temp_file.name)
 
-        assert isinstance(capability['parameters'], (dict, list))
+    with open(temp_file.name) as f:
+        reader = csv.DictReader(f)
+        rows = list(reader)
 
-        for param in capability['parameters']:
-            if isinstance(capability['parameters'], dict):
-                param = capability['parameters'][param]
+    for row in rows:
+        primary_point = lat_lon_to_point(row['PrimaryData_lat'], 
row['PrimaryData_lon'])
 
-            assert isinstance(param, dict)
-            assert all([k in param and isinstance(param[k], str) for k in 
['name', 'type', 'description']])
+        assert bounding_poly.intersects(primary_point)
+        assert iso_time_to_epoch(params['startTime']) \
+               <= float(row['PrimaryData_time']) \
+               <= iso_time_to_epoch(params['endTime'])
+
+        verify_secondary_in_tolerance(
+            {'lat': row['PrimaryData_lat'], 'lon': row['PrimaryData_lon']},
+            {'lat': row['SecondaryData_lat'], 'lon': row['SecondaryData_lon']},
+            params['rt']
+        )
+        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
+               <= float(row['SecondaryData_time']) \
+               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
 
+    response = fetch_result(eid_list[1], "NETCDF")
+    params = param_list[1]
+    bounding_poly = b_to_polygon(params['b'])
 
[email protected]
-def test_endpoints(host, start):
-    url = urljoin(host, 'capabilities')
+    assert response.status_code == 200
 
-    response = requests.get(url)
+    try_save("test_cdmsresults_netcdf_B", start, response, "nc", 'wb')
 
-    if response.status_code != 200:
-        skip('Could not get endpoints list. Expected if test_capabilities has 
failed')
+    temp_file.write(response.content)
+    temp_file.flush()
+    temp_file.seek(0)
 
-    capabilities = response.json()
+    matches = cdms_reader.assemble_matches(temp_file.name)
 
-    endpoints = [c['path'] for c in capabilities]
+    cdms_reader.matches_to_csv(matches, temp_file.name)
 
-    non_existent_endpoints = []
+    with open(temp_file.name) as f:
+        reader = csv.DictReader(f)
+        rows = list(reader)
 
-    for endpoint in endpoints:
-        status = requests.head(urljoin(host, endpoint)).status_code
+    for row in rows:
+        primary_point = lat_lon_to_point(row['PrimaryData_lat'], 
row['PrimaryData_lon'])
 
-        if status == 404:
-            # Strip special characters because some endpoints have 
wildcards/regex characters
-            # This may not work forever though
-            stripped_endpoint = re.sub(r'[^a-zA-Z0-9/_-]', '', endpoint)
+        assert bounding_poly.intersects(primary_point)
+        assert iso_time_to_epoch(params['startTime']) \
+               <= float(row['PrimaryData_time']) \
+               <= iso_time_to_epoch(params['endTime'])
 
-            status = requests.head(urljoin(host, 
stripped_endpoint)).status_code
+        verify_secondary_in_tolerance(
+            {'lat': row['PrimaryData_lat'], 'lon': row['PrimaryData_lon']},
+            {'lat': row['SecondaryData_lat'], 'lon': row['SecondaryData_lon']},
+            params['rt']
+        )
+        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
+               <= float(row['SecondaryData_time']) \
+               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
 
-            if status == 404:
-                non_existent_endpoints.append(([endpoint, stripped_endpoint], 
status))
+    temp_file.close()
+    warnings.filterwarnings('default')
 
-    assert len(non_existent_endpoints) == 0, non_existent_endpoints
 
 
[email protected]
-def test_heartbeat(host, start):
-    url = urljoin(host, 'heartbeat')
 
-    response = requests.get(url)
 
-    assert response.status_code == 200
-    heartbeat = response.json()
 
-    assert isinstance(heartbeat, dict)
-    assert all(heartbeat.values())
 
 


Reply via email to