This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch SDAP-521
in repository https://gitbox.apache.org/repos/asf/sdap-nexus.git

commit ac651f62245bc4e1828a2d54649cb316e6239071
Author: rileykk <[email protected]>
AuthorDate: Tue Jul 2 12:53:42 2024 -0700

    Move & update test script
    
    Made current with what was deployed for the CDMS project. Will need 
extensive editing.
---
 tests/.gitignore                   |    1 +
 tests/{regression => }/README.md   |    0
 tests/cdms_reader.py               |  250 ++++++
 tests/{regression => }/conftest.py |   10 +-
 tests/regression/cdms_reader.py    |    1 -
 tests/regression/test_cdms.py      |  746 ----------------
 tests/test_cdms.py                 | 1707 ++++++++++++++++++++++++++++++++++++
 7 files changed, 1966 insertions(+), 749 deletions(-)

diff --git a/tests/.gitignore b/tests/.gitignore
new file mode 100644
index 0000000..f933b64
--- /dev/null
+++ b/tests/.gitignore
@@ -0,0 +1 @@
+responses/
diff --git a/tests/regression/README.md b/tests/README.md
similarity index 100%
rename from tests/regression/README.md
rename to tests/README.md
diff --git a/tests/cdms_reader.py b/tests/cdms_reader.py
new file mode 100644
index 0000000..ebbc08e
--- /dev/null
+++ b/tests/cdms_reader.py
@@ -0,0 +1,250 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import string
+from netCDF4 import Dataset, num2date
+import sys
+import datetime
+import csv
+from collections import OrderedDict
+import logging
+
+#TODO: Get rid of numpy errors?
+#TODO: Update big SDAP README
+
+LOGGER =  logging.getLogger("cdms_reader")
+
+def assemble_matches(filename):
+    """
+    Read a CDMS netCDF file and return a list of matches.
+    
+    Parameters
+    ----------
+    filename : str
+        The CDMS netCDF file name.
+    
+    Returns
+    -------
+    matches : list
+        List of matches. Each list element is a dictionary.
+        For match m, netCDF group GROUP (SatelliteData or InsituData), and
+        group variable VARIABLE:
+        matches[m][GROUP]['matchID']: MatchedRecords dimension ID for the match
+        matches[m][GROUP]['GROUPID']: GROUP dim dimension ID for the record
+        matches[m][GROUP][VARIABLE]: variable value 
+    """
+   
+    try:
+        # Open the netCDF file
+        with Dataset(filename, 'r') as cdms_nc:
+            # Check that the number of groups is consistent w/ the 
MatchedGroups
+            # dimension
+            assert len(cdms_nc.groups) == 
cdms_nc.dimensions['MatchedGroups'].size,\
+                ("Number of groups isn't the same as MatchedGroups dimension.")
+            
+            matches = []
+            matched_records = cdms_nc.dimensions['MatchedRecords'].size
+            
+            # Loop through the match IDs to assemble matches
+            for match in range(0, matched_records):
+                match_dict = OrderedDict()
+                # Grab the data from each platform (group) in the match
+                for group_num, group in enumerate(cdms_nc.groups):
+                    match_dict[group] = OrderedDict()
+                    match_dict[group]['matchID'] = match
+                    ID = cdms_nc.variables['matchIDs'][match][group_num]
+                    match_dict[group][group + 'ID'] = ID
+                    for var in cdms_nc.groups[group].variables.keys():
+                        match_dict[group][var] = cdms_nc.groups[group][var][ID]
+                    
+                    # Create a UTC datetime field from timestamp
+                    dt = num2date(match_dict[group]['time'],
+                                  cdms_nc.groups[group]['time'].units)
+                    match_dict[group]['datetime'] = dt
+                LOGGER.info(match_dict)
+                matches.append(match_dict)
+            
+            return matches
+    except (OSError, IOError) as err:
+        LOGGER.exception("Error reading netCDF file " + filename)
+        raise err
+    
+def matches_to_csv(matches, csvfile):
+    """
+    Write the CDMS matches to a CSV file. Include a header of column names
+    which are based on the group and variable names from the netCDF file.
+    
+    Parameters
+    ----------
+    matches : list
+        The list of dictionaries containing the CDMS matches as returned from
+        assemble_matches.      
+    csvfile : str
+        The name of the CSV output file.
+    """
+    # Create a header for the CSV. Column names are GROUP_VARIABLE or
+    # GROUP_GROUPID.
+    header = []
+    for key, value in matches[0].items():
+        for otherkey in value.keys():
+            header.append(key + "_" + otherkey)
+    
+    try:
+        # Write the CSV file
+        with open(csvfile, 'w') as output_file:
+            csv_writer = csv.writer(output_file)
+            csv_writer.writerow(header)
+            for match in matches:
+                row = []
+                for group, data in match.items():
+                    for value in data.values():
+                        row.append(value)
+                csv_writer.writerow(row)
+    except (OSError, IOError) as err:
+        LOGGER.exception("Error writing CSV file " + csvfile)
+        raise err
+
+def get_globals(filename):
+    """
+    Write the CDMS  global attributes to a text file. Additionally,
+     within the file there will be a description of where all the different
+     outputs go and how to best utlize this program.
+    
+    Parameters
+    ----------      
+    filename : str
+        The name of the original '.nc' input file.
+    
+    """
+    x0 = "README / cdms_reader.py Program Use and Description:\n"
+    x1 = "\nThe cdms_reader.py program reads a CDMS netCDF (a NETCDF file with 
a matchIDs variable)\n"
+    x2 = "file into memory, assembles a list of matches of satellite and in 
situ data\n"
+    x3 = "(or a primary and secondary dataset), and optionally\n"
+    x4 = "output the matches to a CSV file. Each matched pair contains one 
satellite\n"
+    x5 = "data record and one in situ data record.\n"
+    x6 = "\nBelow, this file wil list the global attributes of the .nc 
(NETCDF) file.\n"
+    x7 = "If you wish to see a full dump of the data from the .nc file,\n"
+    x8 = "please utilize the ncdump command from NETCDF (or look at the CSV 
file).\n"
+    try:
+        with Dataset(filename, "r", format="NETCDF4") as ncFile:
+            txtName = filename.replace(".nc", ".txt")
+            with open(txtName, "w") as txt:
+                txt.write(x0 + x1 +x2 +x3 + x4 + x5 + x6 + x7 + x8)
+                txt.write("\nGlobal Attributes:")
+                for x in ncFile.ncattrs():
+                    txt.write(f'\t :{x} = "{ncFile.getncattr(x)}" ;\n')
+
+
+    except (OSError, IOError) as err:
+        LOGGER.exception("Error reading netCDF file " + filename)
+        print("Error reading file!")
+        raise err
+
+def create_logs(user_option, logName):
+    """
+    Write the CDMS log information to a file. Additionally, the user may
+    opt to print this information directly to stdout, or discard it entirely.
+    
+    Parameters
+    ----------      
+    user_option : str
+        The result of the arg.log 's interpretation of
+         what option the user selected.
+    logName : str
+        The name of the log file we wish to write to,
+        assuming the user did not use the -l option.
+    """
+    if user_option == 'N':
+        print("** Note: No log was created **")
+
+
+    elif user_option == '1':
+        #prints the log contents to stdout
+        logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',
+                        level=logging.INFO,
+                        datefmt='%Y-%m-%d %H:%M:%S',
+                        handlers=[
+                            logging.StreamHandler(sys.stdout)
+                            ])
+                
+    else:
+        #prints log to a .log file
+        logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',
+                        level=logging.INFO,
+                        datefmt='%Y-%m-%d %H:%M:%S',
+                        handlers=[
+                            logging.FileHandler(logName)
+                            ])
+        if user_option != 1 and user_option != 'Y':
+            print(f"** Bad usage of log option. Log will print to {logName} 
**")
+
+    
+
+
+
+if __name__ == '__main__':
+    """
+    Execution:
+        python cdms_reader.py filename
+        OR
+        python3 cdms_reader.py filename 
+        OR
+        python3 cdms_reader.py filename -c -g 
+        OR
+        python3 cdms_reader.py filename --csv --meta
+
+    Note (For Help Try):
+            python3 cdms_reader.py -h
+            OR
+            python3 cdms_reader.py --help
+
+    """
+   
+    u0 = '\n%(prog)s -h OR --help \n'
+    u1 = '%(prog)s filename -c -g\n%(prog)s filename --csv --meta\n'
+    u2 ='Use -l OR -l1 to modify destination of logs'
+    p = argparse.ArgumentParser(usage= u0 + u1 + u2)
+
+    #below block is to customize user options
+    p.add_argument('filename', help='CDMS netCDF file to read')
+    p.add_argument('-c', '--csv', nargs='?', const= 'Y', default='N',
+     help='Use -c or --csv to retrieve CSV output')
+    p.add_argument('-g', '--meta', nargs='?', const='Y', default='N',
+     help='Use -g or --meta to retrieve global attributes / metadata')
+    p.add_argument('-l', '--log', nargs='?', const='N', default='Y',
+     help='Use -l or --log to AVOID creating log files, OR use -l1 to print to 
stdout/console') 
+
+    #arguments are processed by the next line
+    args = p.parse_args()
+
+    logName = args.filename.replace(".nc", ".log")
+    create_logs(args.log, logName)
+    
+    cdms_matches = assemble_matches(args.filename)
+
+    if args.csv == 'Y' :
+        matches_to_csv(cdms_matches, args.filename.replace(".nc",".csv"))
+
+    if args.meta == 'Y' :
+        get_globals(args.filename)
+
+
+
+
+    
+
+    
+    
\ No newline at end of file
diff --git a/tests/regression/conftest.py b/tests/conftest.py
similarity index 72%
rename from tests/regression/conftest.py
rename to tests/conftest.py
index a99e35c..c6b941b 100644
--- a/tests/regression/conftest.py
+++ b/tests/conftest.py
@@ -17,8 +17,14 @@ import pytest
 
 
 def pytest_addoption(parser):
-    parser.addoption("--skip-matchup", action="store_true")
-    parser.addoption("--force-subset", action="store_true")
+    parser.addoption("--skip-matchup", action="store_true",
+                     help="Skip matchup_spark test. (Only for script testing 
purposes)")
+    parser.addoption("--force-subset", action="store_true",
+                     help="Force cdmssubset test to run. It is currently 
skipped by default.")
+    parser.addoption('--matchup-fail-on-miscount', action='store_true',
+                     help='Fail matchup tests if they return an unexpected 
number of matches; '
+                          'otherwise issue a warning')
+
 
 def pytest_collection_modifyitems(config, items):
     skip_matchup = config.getoption("--skip-matchup")
diff --git a/tests/regression/cdms_reader.py b/tests/regression/cdms_reader.py
deleted file mode 120000
index 3c3895c..0000000
--- a/tests/regression/cdms_reader.py
+++ /dev/null
@@ -1 +0,0 @@
-../../tools/cdms/cdms_reader.py
\ No newline at end of file
diff --git a/tests/regression/test_cdms.py b/tests/regression/test_cdms.py
deleted file mode 100644
index 3c95bca..0000000
--- a/tests/regression/test_cdms.py
+++ /dev/null
@@ -1,746 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import copy
-import csv
-import io
-import os
-import warnings
-from datetime import datetime
-from math import cos, radians
-from tempfile import NamedTemporaryFile as Temp
-from urllib.parse import urljoin
-from zipfile import ZipFile
-
-import pandas as pd
-import pytest
-import requests
-from bs4 import BeautifulSoup
-from dateutil.parser import parse
-from pytz import timezone, UTC
-from shapely import wkt
-from shapely.geometry import Polygon, Point, box
-
-import cdms_reader
-
-
-#########################
-#
-# export TEST_HOST=http://localhost:8083/
-# unset TEST_HOST
-#
-#########################
-
-
[email protected]()
-def host():
-    return os.getenv('TEST_HOST', 'http://doms.jpl.nasa.gov')
-
-
[email protected]()
-def insitu_endpoint():
-    return os.getenv(
-        'INSITU_ENDPOINT',
-        'http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination'
-    )
-
-
[email protected]()
-def insitu_swagger_endpoint():
-    return os.getenv(
-        'INSITU_SWAGGER_ENDPOINT',
-        'http://doms.jpl.nasa.gov/insitu/1.0/insitu_query_swagger/'
-    )
-
-
[email protected](scope="module")
-def eid():
-    return {
-        'successful': False,
-        'eid': [],
-        'params': []
-    }
-
-
-def skip(msg=""):
-    raise pytest.skip(msg)
-
-
-def b_to_polygon(b):
-    west, south, east, north = [float(p) for p in b.split(",")]
-    polygon = Polygon([(west, south), (east, south), (east, north), (west, 
north), (west, south)])
-    return polygon
-
-
-def iso_time_to_epoch(str_time):
-    EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
-
-    return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
-        tzinfo=UTC) - EPOCH).total_seconds()
-
-
-def expand_by_tolerance(point, rt):
-    def add_meters_to_lon_lat(point, meters):
-        lon = point.x
-        lat = point.y
-
-        longitude = lon + ((meters / 111111) * cos(radians(lat)))
-        latitude = lat + (meters / 111111)
-
-        return longitude, latitude
-
-    min_lon, min_lat = add_meters_to_lon_lat(point, -1 * rt)
-    max_lon, max_lat = add_meters_to_lon_lat(point, rt)
-
-    return box(min_lon, min_lat, max_lon, max_lat)
-
-
-def translate_global_rows(rows):
-    translated = {}
-
-    for row in rows:
-        parts = row.split(',', 1)
-        translated[parts[0]] = parts[1]
-
-    return translated
-
-
-def translate_matchup_rows(rows):
-    headers = rows[0].split(',')
-
-    translated_rows = []
-
-    for row in rows[1:]:
-        translated_row = {}
-
-        buf = io.StringIO(row)
-        reader = csv.reader(buf)
-        fields = list(reader)[0]
-
-        assert len(headers) == len(fields)
-
-        for i, field in enumerate(fields):
-            header = headers[i]
-
-            if header not in translated_row:
-                translated_row[header] = field
-            else:
-                translated_row[f"{header}_secondary"] = field
-
-        translated_rows.append(translated_row)
-
-    return translated_rows
-
-
-def lat_lon_to_point(lat, lon):
-    return wkt.loads(f"Point({lon} {lat})")
-
-
-def format_time(timestamp):
-    t = parse(timestamp)
-
-    ISO_8601 = '%Y-%m-%dT%H:%M:%SZ'
-
-    return t.strftime(ISO_8601)
-
-
-def verify_match(match, point, time, s_point, s_time, params, bounding_poly):
-    # Check primary point is as expected
-    assert match['point'] == point
-    assert match['time'] == time
-
-    # Check primary point within search bounds
-    assert iso_time_to_epoch(params['startTime']) \
-           <= match['time'] \
-           <= iso_time_to_epoch(params['endTime'])
-    assert bounding_poly.contains(wkt.loads(match['point']))
-
-    secondary = match['matches'][0]
-
-    # Check secondary point is as expected
-    assert secondary['point'] == s_point
-    assert secondary['time'] == s_time
-
-    # Check secondary point within specified spatial & temporal tolerances for 
matched primary
-    assert expand_by_tolerance(
-        wkt.loads(match['point']),
-        params['rt']
-    ).contains(wkt.loads(secondary['point']))
-
-    assert (match['time'] - params['tt']) \
-           <= secondary['time'] \
-           <= (match['time'] + params['tt'])
-
-
[email protected]
-def test_matchup_spark(host, eid):
-    url = urljoin(host, 'match_spark')
-
-    params = {
-        "primary": "MUR25-JPL-L4-GLOB-v04.2",
-        "startTime": "2018-08-01T09:00:00Z",
-        "endTime": "2018-09-01T00:00:00Z",
-        "tt": 43200,
-        "rt": 1000,
-        "b": "-100,20,-79,30",
-        "depthMin": -20,
-        "depthMax": 10,
-        "matchOnce": True,
-        "secondary": "ICOADS Release 3.0",
-        "resultSizeLimit": 7000,
-        "platforms": "42"
-    }
-
-    response = requests.get(url, params=params)
-
-    assert response.status_code == 200
-
-    bounding_poly = b_to_polygon(params['b'])
-
-    body = response.json()
-    data = body['data']
-
-    assert body['count'] == len(data)
-
-    data.sort(key=lambda e: e['point'])
-    body['data'] = data
-
-    eid['eid'].append(body['executionId'])
-    eid['params'].append(copy.deepcopy(params))
-
-    verify_match(
-        data[0],    'Point(-86.125 27.625)',
-        1535360400, 'Point(-86.13 27.63)',
-        1535374800,  params, bounding_poly
-    )
-
-    verify_match(
-        data[1],    'Point(-90.125 27.625)',
-        1534496400, 'Point(-90.13 27.63)',
-        1534491000,  params, bounding_poly
-    )
-
-    verify_match(
-        data[2],    'Point(-90.125 28.125)',
-        1534928400, 'Point(-90.13 28.12)',
-        1534899600,  params, bounding_poly
-    )
-
-    verify_match(
-        data[3],    'Point(-90.375 28.125)',
-        1534842000, 'Point(-90.38 28.12)',
-        1534813200,  params, bounding_poly
-    )
-
-    params['primary'] = 'JPL-L4-MRVA-CHLA-GLOB-v3.0'
-
-    response = requests.get(url, params=params)
-
-    assert response.status_code == 200
-
-    body = response.json()
-
-    data = body['data']
-
-    assert body['count'] == len(data)
-
-    data.sort(key=lambda e: e['point'])
-    body['data'] = data
-
-    eid['eid'].append(body['executionId'])
-    eid['params'].append(copy.deepcopy(params))
-
-    verify_match(
-        data[0],    'Point(-86.125 27.625)',
-        1535371200, 'Point(-86.13 27.63)',
-        1535374800,  params, bounding_poly
-    )
-
-    verify_match(
-        data[1],    'Point(-90.125 27.625)',
-        1534507200, 'Point(-90.13 27.63)',
-        1534491000,  params, bounding_poly
-    )
-
-    verify_match(
-        data[2],    'Point(-90.125 28.125)',
-        1534939200, 'Point(-90.13 28.12)',
-        1534899600,  params, bounding_poly
-    )
-
-    verify_match(
-        data[3],    'Point(-90.375 28.125)',
-        1534852800, 'Point(-90.38 28.12)',
-        1534813200,  params, bounding_poly
-    )
-
-    eid['successful'] = True
-
-
[email protected]
-def test_domsresults_json(host, eid):
-    url = urljoin(host, 'domsresults')
-
-    # Skip the test automatically if the matchup request was not successful
-    if not eid['successful']:
-        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
-
-    def fetch_result(eid, output):
-        return requests.get(url, params={"id": eid, "output": output})
-
-    eids = eid['eid']
-    param_list = eid['params']
-
-    response = fetch_result(eids[0], "JSON")
-
-    assert response.status_code == 200
-
-    body = response.json()
-
-    data = body['data']
-    assert len(data) == 4
-
-    for m in data:
-        m['point'] = f"Point({m['lon']} {m['lat']})"
-        for s in m['matches']:
-            s['point'] = f"Point({s['lon']} {s['lat']})"
-
-    data.sort(key=lambda e: e['point'])
-
-    params = param_list[0]
-    bounding_poly = b_to_polygon(params['b'])
-
-    verify_match(data[0],    'Point(-86.125 27.625)',
-                 1535360400, 'Point(-86.13 27.63)',
-                 1535374800, params, bounding_poly
-                 )
-
-    verify_match(data[1],    'Point(-90.125 27.625)',
-                 1534496400, 'Point(-90.13 27.63)',
-                 1534491000,  params, bounding_poly
-                 )
-
-    verify_match(data[2],    'Point(-90.125 28.125)',
-                 1534928400, 'Point(-90.13 28.12)',
-                 1534899600,  params, bounding_poly
-                 )
-
-    verify_match(data[3],    'Point(-90.375 28.125)',
-                 1534842000, 'Point(-90.38 28.12)',
-                 1534813200,  params, bounding_poly
-                 )
-
-    response = fetch_result(eids[1], "JSON")
-
-    assert response.status_code == 200
-
-    body = response.json()
-
-    data = body['data']
-    assert len(data) == 4
-
-    for m in data:
-        m['point'] = f"Point({m['lon']} {m['lat']})"
-        for s in m['matches']:
-            s['point'] = f"Point({s['lon']} {s['lat']})"
-
-    data.sort(key=lambda e: e['point'])
-
-    params = param_list[1]
-    bounding_poly = b_to_polygon(params['b'])
-
-    verify_match(data[0],    'Point(-86.125 27.625)',
-                 1535371200, 'Point(-86.13 27.63)',
-                 1535374800,  params, bounding_poly
-                 )
-
-    verify_match(data[1],    'Point(-90.125 27.625)',
-                 1534507200, 'Point(-90.13 27.63)',
-                 1534491000,  params, bounding_poly
-                 )
-
-    verify_match(data[2],    'Point(-90.125 28.125)',
-                 1534939200, 'Point(-90.13 28.12)',
-                 1534899600,  params, bounding_poly
-                 )
-
-    verify_match(data[3],    'Point(-90.375 28.125)',
-                 1534852800, 'Point(-90.38 28.12)',
-                 1534813200,  params, bounding_poly
-                 )
-
-
[email protected]
-def test_domsresults_csv(host, eid):
-    url = urljoin(host, 'domsresults')
-
-    # Skip the test automatically if the matchup request was not successful
-    if not eid['successful']:
-        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
-
-    def fetch_result(eid, output):
-        return requests.get(url, params={"id": eid, "output": output})
-
-    eids = eid['eid']
-    param_list = eid['params']
-
-    response = fetch_result(eids[0], "CSV")
-    params = param_list[0]
-    bounding_poly = b_to_polygon(params['b'])
-
-    assert response.status_code == 200
-
-    rows = response.text.split('\r\n')
-    index = rows.index('')
-
-    global_rows = rows[:index]
-    matchup_rows = rows[index + 1:-1]  # Drop trailing empty string from 
trailing newline
-
-    global_rows = translate_global_rows(global_rows)
-    matchup_rows = translate_matchup_rows(matchup_rows)
-
-    assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched'])
-
-    for row in matchup_rows:
-        primary_point = lat_lon_to_point(row['lat'], row['lon'])
-
-        assert bounding_poly.contains(primary_point)
-        assert params['startTime'] <= format_time(row['time']) <= 
params['endTime']
-
-        secondary_point = lat_lon_to_point(row['lat_secondary'], 
row['lon_secondary'])
-
-        assert expand_by_tolerance(primary_point, 
params['rt']).contains(secondary_point)
-        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
-               <= iso_time_to_epoch(format_time(row['time_secondary'])) \
-               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
-
-    response = fetch_result(eids[1], "CSV")
-    params = param_list[1]
-    bounding_poly = b_to_polygon(params['b'])
-
-    assert response.status_code == 200
-
-    rows = response.text.split('\r\n')
-    index = rows.index('')
-
-    global_rows = rows[:index]
-    matchup_rows = rows[index + 1:-1]  # Drop trailing empty string from 
trailing newline
-
-    global_rows = translate_global_rows(global_rows)
-    matchup_rows = translate_matchup_rows(matchup_rows)
-
-    assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched'])
-
-    for row in matchup_rows:
-        primary_point = lat_lon_to_point(row['lat'], row['lon'])
-
-        assert bounding_poly.contains(primary_point)
-        assert params['startTime'] <= format_time(row['time']) <= 
params['endTime']
-
-        secondary_point = lat_lon_to_point(row['lat_secondary'], 
row['lon_secondary'])
-
-        assert expand_by_tolerance(primary_point, 
params['rt']).contains(secondary_point)
-        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
-               <= iso_time_to_epoch(format_time(row['time_secondary'])) \
-               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
-
-
[email protected]
[email protected]
-def test_domsresults_netcdf(host, eid):
-    warnings.filterwarnings('ignore')
-
-    url = urljoin(host, 'domsresults')
-
-    # Skip the test automatically if the matchup request was not successful
-    if not eid['successful']:
-        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
-
-    def fetch_result(eid, output):
-        return requests.get(url, params={"id": eid, "output": output})
-
-    eids = eid['eid']
-    param_list = eid['params']
-
-    temp_file = Temp(mode='wb+', suffix='.csv.tmp', prefix='CDMSReader_')
-
-    response = fetch_result(eids[0], "NETCDF")
-    params = param_list[0]
-    bounding_poly = b_to_polygon(params['b'])
-
-    assert response.status_code == 200
-
-    temp_file.write(response.content)
-    temp_file.flush()
-    temp_file.seek(0)
-
-    matches = cdms_reader.assemble_matches(temp_file.name)
-
-    cdms_reader.matches_to_csv(matches, temp_file.name)
-
-    with open(temp_file.name) as f:
-        reader = csv.DictReader(f)
-        rows = list(reader)
-
-    for row in rows:
-        primary_point = lat_lon_to_point(row['PrimaryData_lat'], 
row['PrimaryData_lon'])
-
-        assert bounding_poly.contains(primary_point)
-        assert iso_time_to_epoch(params['startTime']) \
-               <= float(row['PrimaryData_time']) \
-               <= iso_time_to_epoch(params['endTime'])
-
-        secondary_point = lat_lon_to_point(row['SecondaryData_lat'], 
row['SecondaryData_lon'])
-
-        assert expand_by_tolerance(primary_point, 
params['rt']).contains(secondary_point)
-        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
-               <= float(row['SecondaryData_time']) \
-               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
-
-    response = fetch_result(eids[1], "NETCDF")
-    params = param_list[1]
-    bounding_poly = b_to_polygon(params['b'])
-
-    assert response.status_code == 200
-
-    temp_file.write(response.content)
-    temp_file.flush()
-    temp_file.seek(0)
-
-    matches = cdms_reader.assemble_matches(temp_file.name)
-
-    cdms_reader.matches_to_csv(matches, temp_file.name)
-
-    with open(temp_file.name) as f:
-        reader = csv.DictReader(f)
-        rows = list(reader)
-
-    for row in rows:
-        primary_point = lat_lon_to_point(row['PrimaryData_lat'], 
row['PrimaryData_lon'])
-
-        assert bounding_poly.contains(primary_point)
-        assert iso_time_to_epoch(params['startTime']) \
-               <= float(row['PrimaryData_time']) \
-               <= iso_time_to_epoch(params['endTime'])
-
-        secondary_point = lat_lon_to_point(row['SecondaryData_lat'], 
row['SecondaryData_lon'])
-
-        assert expand_by_tolerance(primary_point, 
params['rt']).contains(secondary_point)
-        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
-               <= float(row['SecondaryData_time']) \
-               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
-
-    temp_file.close()
-    warnings.filterwarnings('default')
-
-
[email protected]
-def test_domslist(host):
-    url = urljoin(host, 'domslist')
-
-    response = requests.get(url)
-
-    assert response.status_code == 200
-
-    body = response.json()
-
-    data = body['data']
-
-    num_satellite = len(data['satellite'])
-    num_insitu = len(data['insitu'])
-
-    assert num_insitu > 0
-    assert num_satellite > 0
-
-    # assert body['count'] == num_satellite + num_insitu
-
-
[email protected]
-def test_cdmssubset(host):
-    url = urljoin(host, 'cdmssubset')
-
-    params = {
-        "dataset": "MUR25-JPL-L4-GLOB-v04.2",
-        "parameter": "sst",
-        "startTime": "2018-09-24T00:00:00Z",
-        "endTime": "2018-09-30T00:00:00Z",
-        "b": "160,-30,180,-25",
-        "output": "ZIP"
-    }
-
-    response = requests.get(url, params=params)
-
-    assert response.status_code == 200
-
-    bounding_poly = b_to_polygon(params['b'])
-
-    response_buf = io.BytesIO(response.content)
-
-    with ZipFile(response_buf) as data:
-        namelist = data.namelist()
-
-        assert namelist == ['MUR25-JPL-L4-GLOB-v04.2.csv']
-
-        csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
-        csv_data = pd.read_csv(csv_buf)
-
-    def validate_row_bounds(row):
-        assert bounding_poly.contains(Point(row['longitude'], row['latitude']))
-        assert params['startTime'] <= row['time'] <= params['endTime']
-
-    for i in range(0, len(csv_data)):
-        validate_row_bounds(csv_data.iloc[i])
-
-    params['dataset'] = 'OISSS_L4_multimission_7day_v1'
-
-    response = requests.get(url, params=params)
-
-    assert response.status_code == 200
-
-    response_buf = io.BytesIO(response.content)
-
-    with ZipFile(response_buf) as data:
-        namelist = data.namelist()
-
-        assert namelist == ['OISSS_L4_multimission_7day_v1.csv']
-
-        csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
-        csv_data = pd.read_csv(csv_buf)
-
-    for i in range(0, len(csv_data)):
-        validate_row_bounds(csv_data.iloc[i])
-
-
[email protected]
-def test_insitu(insitu_endpoint):
-    params = {
-        'itemsPerPage': 1000,
-        'startTime': '2018-05-15T00:00:00Z',
-        'endTime': '2018-06-01T00:00:00Z',
-        'bbox': '-80,25,-75,30',
-        'minDepth': 0.0,
-        'maxDepth': 5.0,
-        'provider': 'NCAR',
-        'project': 'ICOADS Release 3.0',
-        'platform': '42',
-        'markerTime': '2018-05-15T00:00:00Z'
-    }
-
-    response = requests.get(insitu_endpoint, params=params)
-
-    assert response.status_code == 200
-
-    body = response.json()
-
-    if body['total'] <= params['itemsPerPage']:
-        assert body['total'] == len(body['results'])
-    else:
-        assert len(body['results']) == params['itemsPerPage']
-
-    bounding_poly = b_to_polygon(params['bbox'])
-
-    for result in body['results']:
-        assert bounding_poly.contains(
-            wkt.loads(f"Point({result['longitude']} {result['latitude']})")
-        )
-
-        if result['depth'] != -99999.0:
-            assert params['minDepth'] <= result['depth'] <= params['maxDepth']
-
-        assert params['startTime'] <= result['time'] <= params['endTime']
-
-
[email protected]
-def test_swaggerui_sdap(host):
-    url = urljoin(host, 'apidocs/')
-
-    response = requests.get(url)
-
-    assert response.status_code == 200
-    assert 'swagger-ui' in response.text
-
-    try:
-        # There's probably a better way to do this, but extract the .yml file 
for the docs from the returned text
-        soup = BeautifulSoup(response.text, 'html.parser')
-
-        script = str([tag for tag in soup.find_all('script') if tag.attrs == 
{}][0])
-
-        start_index = script.find('url:')
-        end_index = script.find('",\n', start_index)
-
-        script = script[start_index:end_index]
-
-        yml_filename = script.split('"')[1]
-
-        url = urljoin(url, yml_filename)
-
-        response = requests.get(url)
-
-        assert response.status_code == 200
-    except AssertionError:
-        raise
-    except:
-        try:
-            url = urljoin(url, 'openapi.yml')
-
-            response = requests.get(url)
-
-            assert response.status_code == 200
-
-            warnings.warn("Could not extract documentation yaml filename from 
response text, "
-                          "but using an assumed value worked successfully")
-        except:
-            raise ValueError("Could not verify documentation yaml file, 
assumed value also failed")
-
-
[email protected]
-def test_swaggerui_insitu(insitu_swagger_endpoint):
-    response = requests.get(insitu_swagger_endpoint)
-
-    assert response.status_code == 200
-    assert 'swagger-ui' in response.text
-
-    try:
-        # There's probably a better way to do this, but extract the .yml file 
for the docs from the returned text
-        soup = BeautifulSoup(response.text, 'html.parser')
-
-        script = str([tag for tag in soup.find_all('script') if tag.attrs == 
{}][0])
-
-        start_index = script.find('url:')
-        end_index = script.find('",\n', start_index)
-
-        script = script[start_index:end_index]
-
-        yml_filename = script.split('"')[1]
-
-        url = urljoin(insitu_swagger_endpoint, yml_filename)
-
-        response = requests.get(url)
-
-        assert response.status_code == 200
-    except AssertionError:
-        raise
-    except:
-        try:
-            url = urljoin(insitu_swagger_endpoint, 'insitu-spec-0.0.1.yml')
-
-            response = requests.get(url)
-
-            assert response.status_code == 200
-
-            warnings.warn("Could not extract documentation yaml filename from 
response text, "
-                          "but using an assumed value worked successfully")
-        except:
-            raise ValueError("Could not verify documentation yaml file, 
assumed value also failed")
diff --git a/tests/test_cdms.py b/tests/test_cdms.py
new file mode 100644
index 0000000..29a9088
--- /dev/null
+++ b/tests/test_cdms.py
@@ -0,0 +1,1707 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import csv
+import datetime
+import io
+import json
+import os
+import warnings
+from datetime import datetime
+from pathlib import Path
+from tempfile import NamedTemporaryFile as Temp
+from time import sleep
+from urllib.parse import urljoin
+from zipfile import ZipFile
+
+import pandas as pd
+import pytest
+import requests
+from bs4 import BeautifulSoup
+from dateutil.parser import parse
+from geopy.distance import geodesic
+from pytz import timezone, UTC
+from requests.exceptions import ConnectTimeout
+from shapely import wkt
+from shapely.geometry import Polygon, Point
+
+import cdms_reader
+
+
+#########################
+#
+# export TEST_HOST=http://localhost:8083/
+# unset TEST_HOST
+#
+#########################
+
+
[email protected](scope="session")
+def host():
+    return os.getenv('TEST_HOST', 'https://doms.jpl.nasa.gov')
+
+
[email protected]()
+def insitu_endpoint_JPL():
+    return os.getenv(
+        'INSITU_ENDPOINT_JPL',
+        
'https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination'
+    )
+
+
[email protected]()
+def insitu_endpoint_NCAR():
+    return os.getenv(
+        'INSITU_ENDPOINT_NCAR',
+        'https://cdms.ucar.edu/insitu/1.0/query_data_doms_custom_pagination'
+    )
+
+
[email protected]()
+def insitu_endpoint_saildrone():
+    return os.getenv(
+        'INSITU_ENDPOINT_SAILDRONE',
+        
'https://nasa-cdms.saildrone.com/insitu/1.0/query_data_doms_custom_pagination'
+    )
+
+
[email protected]()
+def insitu_swagger_endpoint():
+    return os.getenv(
+        'INSITU_SWAGGER_ENDPOINT',
+        'https://doms.jpl.nasa.gov/insitu/1.0/insitu_query_swagger/'
+    )
+
+
[email protected](scope="session")
+def eid():
+    return {
+        'successful': False,
+        'eid': [],
+        'params': []
+    }
+
+
+start_time = None
+
+
[email protected](scope="session")
+def start():
+    global start_time
+
+    if start_time is None:
+        start_time = datetime.now().strftime("%G%m%d%H%M%S%z")
+
+    return start_time
+
+
[email protected]()
+def timeouts():
+    connect_timeout = 9.05  # Recommended to be just above a multiple of 3 
seconds
+    read_timeout = 303  # Just above current gateway timeout
+    timeouts = (connect_timeout, read_timeout)
+
+    return timeouts
+
+
[email protected]()
+def fail_on_miscount(request):
+    return request.config.getoption('--matchup-fail-on-miscount', 
default=False)
+
+
[email protected](scope='session')
+def distance_vs_time_query(host, start):
+    result = {
+        'distances': {  # Tuples: sec_lat, sec_lon, sec_time
+            'min_dist': (),
+            'min_time': ()
+        },
+        'backup': {  # Tuples: sec_lat, sec_lon, sec_time
+            'min_dist': ("26.6141296", "-130.0827904", 1522637640),
+            'min_time': ("26.6894016", "-130.0547072", 1522626840)
+        },
+        'success': False
+    }
+
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "JPL-L4-MRVA-CHLA-GLOB-v3.0",
+        "secondary": "shark-2018",
+        "startTime": "2018-04-01T00:00:00Z",
+        "endTime": "2018-04-01T23:59:59Z",
+        "b": "-131,26,-130,27",
+        "depthMin": -5,
+        "depthMax": 5,
+        "tt": 86400,
+        "rt": 10000,
+        "matchOnce": False,
+        "resultSizeLimit": 0,
+        "platforms": "3B",
+        "parameter": "mass_concentration_of_chlorophyll_in_sea_water",
+    }
+
+    try:
+        body = run_matchup(url, params)
+
+        data = body['data']
+
+        assert body['count'] == len(data)
+        check_count(len(data), 1, True)
+
+        primary_point = data[0]
+
+        def compute_distance(primary, secondary):
+            return geodesic((primary['lat'], primary['lon']), 
(secondary['lat'], secondary['lon'])).m
+
+        def compute_time(primary, secondary):
+            return abs(primary['time'] - secondary['time'])
+
+        distances = [
+            (s['lat'], s['lon'], s['time'], compute_distance(primary_point, 
s), compute_time(primary_point, s))
+            for s in primary_point['matches']
+        ]
+
+        try_save('computed_distances', start, distances)
+
+        min_dist = min(distances, key=lambda x: x[3])
+        min_time = min(distances, key=lambda x: x[4])
+
+        result['distances']['min_dist'] = min_dist[:3]
+        result['distances']['min_time'] = min_time[:3]
+
+        result['success'] = True
+    except:
+        warnings.warn('Could not determine point distances for prioritization 
tests, using backup values instead')
+
+    return result
+
+
+def skip(msg=""):
+    raise pytest.skip(msg)
+
+
+def b_to_polygon(b):
+    west, south, east, north = [float(p) for p in b.split(",")]
+    polygon = Polygon([(west, south), (east, south), (east, north), (west, 
north), (west, south)])
+    return polygon
+
+
+def iso_time_to_epoch(str_time):
+    epoch = timezone('UTC').localize(datetime(1970, 1, 1))
+
+    return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
+        tzinfo=UTC) - epoch).total_seconds()
+
+
+def verify_secondary_in_tolerance(primary, secondary, rt):
+    distance = geodesic((primary['lat'], primary['lon']), (secondary['lat'], 
secondary['lon'])).m
+
+    assert distance <= rt
+
+
+def translate_global_rows(rows):
+    translated = {}
+
+    for row in rows:
+        parts = row.split(',', 1)
+        translated[parts[0]] = parts[1]
+
+    return translated
+
+
+def translate_matchup_rows(rows):
+    headers = rows[0].split(',')
+
+    translated_rows = []
+
+    for row in rows[1:]:
+        translated_row = {}
+
+        buf = io.StringIO(row)
+        reader = csv.reader(buf)
+        fields = list(reader)[0]
+
+        assert len(headers) == len(fields)
+
+        for i, field in enumerate(fields):
+            header = headers[i]
+
+            if header not in translated_row:
+                translated_row[header] = field
+            else:
+                translated_row[f"{header}_secondary"] = field
+
+        translated_rows.append(translated_row)
+
+    return translated_rows
+
+
+def lat_lon_to_point(lat, lon):
+    return wkt.loads(f"Point({lon} {lat})")
+
+
+def format_time(timestamp):
+    t = parse(timestamp)
+    return t.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+
+def verify_match(match, point, time, s_point, s_time, params, bounding_poly):
+    # Check primary point is as expected
+    assert match['point'] == point
+    assert match['time'] == time
+
+    # Check primary point within search bounds
+    assert iso_time_to_epoch(params['startTime']) \
+           <= match['time'] \
+           <= iso_time_to_epoch(params['endTime'])
+    assert bounding_poly.intersects(wkt.loads(match['point']))
+
+    secondary = match['matches'][0]
+
+    # Check secondary point is as expected
+    assert secondary['point'] == s_point
+    assert secondary['time'] == s_time
+
+    # Check secondary point within specified spatial & temporal tolerances for 
matched primary
+    verify_secondary_in_tolerance(match, secondary, params['rt'])
+
+    assert (match['time'] - params['tt']) \
+           <= secondary['time'] \
+           <= (match['time'] + params['tt'])
+
+
+def verify_match_consistence(match, params, bounding_poly):
+    # Check primary point within search bounds
+    assert iso_time_to_epoch(params['startTime']) \
+           <= match['time'] \
+           <= iso_time_to_epoch(params['endTime'])
+    assert bounding_poly.intersects(wkt.loads(match['point']))
+
+    for secondary in match['matches']:
+        # Check secondary point within specified spatial & temporal tolerances 
for matched primary
+        verify_secondary_in_tolerance(match, secondary, params['rt'])
+
+        assert (match['time'] - params['tt']) \
+               <= secondary['time'] \
+               <= (match['time'] + params['tt'])
+
+
+def validate_insitu(body, params, test):
+    if body['total'] <= params['itemsPerPage']:
+        assert body['total'] == len(body['results'])
+    else:
+        assert len(body['results']) == params['itemsPerPage']
+
+    if len(body['results']) == 0:
+        warnings.warn(f'Insitu test ({test}) returned no results!')
+
+    bounding_poly = b_to_polygon(params['bbox'])
+
+    for result in body['results']:
+        assert bounding_poly.intersects(
+            wkt.loads(f"Point({result['longitude']} {result['latitude']})")
+        )
+
+        if result['depth'] != -99999.0:
+            assert params['minDepth'] <= result['depth'] <= params['maxDepth']
+
+        assert params['startTime'] <= result['time'] <= params['endTime']
+
+
+def try_save(name, time, response, ext='json', mode='w'):
+    Path(f'responses/{time}/').mkdir(parents=True, exist_ok=True)
+
+    try:
+        with open(f'responses/{time}/{name}.{ext}', mode=mode) as f:
+            if ext == 'json':
+                json.dump(response, f, indent=4)
+            elif ext == 'csv':
+                f.write(response.text)
+            else:
+                f.write(response.content)
+    except Exception as e:
+        warnings.warn(f"Failed to save response for {name}\n{e}", 
RuntimeWarning)
+
+
+def uniq_primaries(primaries, xfail=False, case=None):
+    class Primary:
+        def __init__(self, p):
+            self.platform = p['platform']
+            self.device = p['device']
+            self.lon = p['lon']
+            self.lat = p['lat']
+            self.point = p['point']
+            self.time = p['time']
+            self.depth = p['depth']
+            self.fileurl = p['fileurl']
+            self.id = p['id']
+            self.source = p['source']
+            self.primary = p['primary']
+            self.matches = p['matches']
+
+        def __eq__(self, other):
+            if not isinstance(other, Primary):
+                return False
+
+            return self.platform == other.platform and \
+                self.device == other.device and \
+                self.lon == other.lon and \
+                self.lat == other.lat and \
+                self.point == other.point and \
+                self.time == other.time and \
+                self.depth == other.depth and \
+                self.fileurl == other.fileurl and \
+                self.id == other.id and \
+                self.source == other.source and \
+                self.primary == other.primary
+
+        def __str__(self):
+            primary = {
+                "platform": self.platform,
+                "device": self.device,
+                "lon": self.lon,
+                "lat": self.lat,
+                "point": self.point,
+                "time": self.time,
+                "depth": self.depth,
+                "fileurl": self.fileurl,
+                "id": self.id,
+                "source": self.source,
+                "primary": self.primary,
+            }
+
+            return json.dumps(primary, indent=4)
+
+    points = [Primary(p) for p in primaries]
+
+    checked = []
+    duplicates = {}
+
+    for p in points:
+        for c in checked:
+            if p == c:
+                if p.id not in duplicates:
+                    duplicates[p.id] = [p, c]
+                else:
+                    duplicates[p.id].append(p)
+                break
+        checked.append(p)
+
+    if len(duplicates) > 0:
+        m = print if not xfail else warnings.warn
+
+        msg = f'Duplicate point(s) found ({len(duplicates)} total)'
+
+        if case is not None:
+            msg += f' for case {case}'
+
+        msg += '\n\n-----\n\n'
+
+        for d in duplicates:
+            d = duplicates[d]
+
+            msg += 'Primary point:\n' + str(d[0]) + '\n\n'
+
+            matches = [p.matches for p in d]
+
+            msg += f'Matches to ({len(matches)}):\n'
+            msg += json.dumps(matches, indent=4)
+            msg += '\n\n'
+
+        m(msg)
+
+        if xfail:
+            pytest.xfail('Duplicate points found')
+        else:
+            assert False, 'Duplicate points found'
+
+
+def check_count(count, expected, fail_on_mismatch):
+    if count == expected:
+        return
+    elif fail_on_mismatch:
+        raise AssertionError(f'Incorrect count: Expected {expected}, got 
{count}')
+    else:
+        warnings.warn(f'Incorrect count: Expected {expected}, got {count}')
+
+
+# Run the matchup query and return json output (and eid?)
+# Should be able to work if match_spark is synchronous or asynchronous
+def run_matchup(url, params, page_size=3500):
+    TIMEOUT = 60 * 60
+    # TIMEOUT = float('inf')
+
+    response = requests.get(url, params=params)
+
+    assert response.status_code == 200, 'Initial match_spark query failed'
+    response_json = response.json()
+
+    asynchronous = 'status' in response_json
+
+    if not asynchronous:
+        return response_json
+    else:
+        start = datetime.utcnow()
+        job_url = [link for link in response_json['links'] if link['rel'] == 
'self'][0]['href']
+
+        retries = 3
+        timeouts = [2, 5, 10]
+
+        while response_json['status'] == 'running' and (datetime.utcnow() - 
start).total_seconds() <= TIMEOUT:
+            status_response = requests.get(job_url)
+            status_code = response.status_code
+
+            # /job poll may fail internally. This does not necessarily 
indicate job failure (ie, Cassandra read
+            # timed out). Retry it a couple of times and fail the test if it 
persists.
+            if status_code == 500 and retries > 0:
+                warnings.warn('/job poll failed; retrying')
+                sleep(timeouts[3 - retries])
+                retries -= 1
+                continue
+
+            assert status_response.status_code == 200, '/job status polling 
failed'
+            response_json = status_response.json()
+
+            if response_json['status'] == 'running':
+                sleep(10)
+
+        job_status = response_json['status']
+
+        if job_status == 'running':
+            skip(f'Job has been running too long ({(datetime.utcnow() - 
start)}), skipping to run other tests')
+        elif job_status in ['cancelled', 'failed']:
+            raise ValueError(f'Async matchup job finished with incomplete 
status ({job_status})')
+        else:
+            stac_url = [
+                link for link in response_json['links'] if 'STAC' in 
link['title']
+            ][0]['href']
+
+            catalogue_response = requests.get(stac_url)
+            assert catalogue_response.status_code == 200, 'Catalogue fetch 
failed'
+
+            catalogue_response = catalogue_response.json()
+
+            json_cat_url = [
+                link for link in catalogue_response['links'] if 'JSON' in 
link['title']
+            ][0]['href']
+
+            catalogue_response = requests.get(json_cat_url)
+            assert catalogue_response.status_code == 200, 'Catalogue fetch 
failed'
+
+            catalogue_response = catalogue_response.json()
+
+            results_urls = [
+                link['href'] for link in catalogue_response['links'] if 
'output=JSON' in link['href']
+                # link['href'] for link in response_json['links'] if 
link['type'] == 'application/json'
+            ]
+
+            def get_results(url):
+                retries = 3
+                retry_delay = 1.5
+
+                while retries > 0:
+                    response = requests.get(url)
+
+                    try:
+                        response.raise_for_status()
+                        return response.json()
+                    except:
+                        retries -= 1
+                        sleep(retry_delay)
+                        retry_delay *= 2
+
+            assert len(results_urls) > 0, 'STAC catalogue returned no result 
queries'
+
+            matchup_result = get_results(results_urls[0])
+
+            for url in results_urls[1:]:
+                matchup_result['data'].append(get_results(url)['data'])
+
+            return matchup_result
+
+
[email protected]
+def test_matchup_spark_L4_IS_ICOADS(host, eid, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "MUR25-JPL-L4-GLOB-v04.2",
+        "secondary": "ICOADS Release 3.0",
+        # "secondary": "ICOADS_JPL",
+        "startTime": "2018-08-16T00:00:00Z",
+        "endTime": "2018-08-27T23:59:59Z",
+        "b": "-90.38,27.625,-86.125,28.125",
+        "depthMin": -20,
+        "depthMax": 10,
+        "tt": 43200,
+        "rt": 1000,
+        "matchOnce": True,
+        "resultSizeLimit": 7000,
+        "platforms": "42"
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_L4_IS_ICOADS_A", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L4_IS_ICOADS_A')
+    check_count(len(data), 5, fail_on_miscount)
+
+    data.sort(key=lambda e: e['point'])
+    body['data'] = data
+
+    eid['eid'].append(body['executionId'])
+    eid['params'].append(copy.deepcopy(params))
+
+    verify_match(
+        data[0],    'Point(-86.125 27.625)',
+        1535360400, 'Point(-86.130 27.630)',
+        1535374800,  params, bounding_poly
+    )
+
+    verify_match(
+        data[1],    'Point(-88.875 27.875)',
+        1534669200, 'Point(-88.880 27.880)',
+        1534698000,  params, bounding_poly
+    )
+
+    verify_match(
+        data[2],    'Point(-90.125 27.625)',
+        1534496400, 'Point(-90.130 27.630)',
+        1534491000,  params, bounding_poly
+    )
+
+    verify_match(
+        data[3],    'Point(-90.125 28.125)',
+        1534928400, 'Point(-90.130 28.120)',
+        1534899600,  params, bounding_poly
+    )
+
+    verify_match(
+        data[4],    'Point(-90.375 28.125)',
+        1534842000, 'Point(-90.380 28.120)',
+        1534813200,  params, bounding_poly
+    )
+
+    params['primary'] = 'JPL-L4-MRVA-CHLA-GLOB-v3.0'
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_L4_IS_ICOADS_B", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L4_IS_ICOADS_B')
+    check_count(len(data), 5, fail_on_miscount)
+
+    data.sort(key=lambda e: e['point'])
+    body['data'] = data
+
+    eid['eid'].append(body['executionId'])
+    eid['params'].append(copy.deepcopy(params))
+
+    verify_match(
+        data[0],    'Point(-86.125 27.625)',
+        1535371200, 'Point(-86.130 27.630)',
+        1535374800,  params, bounding_poly
+    )
+
+    verify_match(
+        data[1],    'Point(-88.875 27.875)',
+        1534680000, 'Point(-88.880 27.880)',
+        1534698000,  params, bounding_poly
+    )
+
+    verify_match(
+        data[2],    'Point(-90.125 27.625)',
+        1534507200, 'Point(-90.130 27.630)',
+        1534491000,  params, bounding_poly
+    )
+
+    verify_match(
+        data[3],    'Point(-90.125 28.125)',
+        1534939200, 'Point(-90.130 28.120)',
+        1534899600,  params, bounding_poly
+    )
+
+    verify_match(
+        data[4],    'Point(-90.375 28.125)',
+        1534852800, 'Point(-90.380 28.120)',
+        1534813200,  params, bounding_poly
+    )
+
+    eid['successful'] = True
+
+
[email protected]
+def test_matchup_spark_L4_IS_SAMOS(host, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "MUR25-JPL-L4-GLOB-v04.2",
+        "secondary": "SAMOS",
+        # "startTime": "2018-01-01T00:00:00Z",
+        "startTime": "2018-01-02T00:00:00Z",
+        "endTime": "2018-01-10T23:59:59Z",
+        "b": "-130,20,-110,40",
+        "depthMin": -35,
+        "depthMax": 10,
+        "tt": 86400,
+        "rt": 50000,
+        "matchOnce": True,
+        "resultSizeLimit": 0,
+        "platforms": "30",
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_L4_IS_SAMOS", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L4_IS_SAMOS')
+    check_count(len(data), 194, fail_on_miscount)
+
+    for match in data:
+        verify_match_consistence(match, params, bounding_poly)
+
+
[email protected]
+def test_matchup_spark_L4_IS_shark2018(host, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "JPL-L4-MRVA-CHLA-GLOB-v3.0",
+        "secondary": "shark-2018",
+        "startTime": "2018-04-01T00:00:00Z",
+        "endTime": "2018-04-01T23:59:59Z",
+        "b": "-140,10,-110,40",
+        "depthMin": -5,
+        "depthMax": 5,
+        "tt": 86400,
+        "rt": 50000,
+        "matchOnce": True,
+        "resultSizeLimit": 0,
+        "platforms": "3B",
+        "parameter": "mass_concentration_of_chlorophyll_in_sea_water"
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_L4_IS_shark2018", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L4_IS_shark2018')
+    check_count(len(data), 27, fail_on_miscount)
+
+    for match in data:
+        verify_match_consistence(match, params, bounding_poly)
+        assert "mass_concentration_of_chlorophyll_in_sea_water" in 
match['matches'][0]['secondary'][0]['variable_name']
+
+
[email protected]
+def test_matchup_spark_L2_IS_ICOADS(host, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "ASCATB-L2-Coastal",
+        # "secondary": "ICOADS Release 3.0",
+        "secondary": "ICOADS_JPL",
+        "startTime": "2017-07-01T02:00:00Z",
+        "endTime": "2017-07-01T02:59:59Z",
+        # "endTime": "2017-07-01T23:59:59Z",
+        # "b": "-100,20,-79,30",
+        "b": "-90,25,-85,30",
+        "depthMin": -10,
+        "depthMax": 10,
+        "tt": 3600,
+        "rt": 25000,
+        "matchOnce": False,
+        "resultSizeLimit": 0,
+        "platforms": "0,16,17,30,41,42"
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_L2_IS_ICOADS", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L2_IS_ICOADS')
+    check_count(len(data), 51, fail_on_miscount)
+
+    for match in data:
+        verify_match_consistence(match, params, bounding_poly)
+
+
[email protected]
+def test_matchup_spark_L2_IS_SAMOS(host, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "ASCATB-L2-Coastal",
+        "secondary": "SAMOS",
+        "startTime": "2017-05-01T00:00:00Z",
+        "endTime": "2017-05-03T23:59:59Z",
+        "b": "-100,20,-79,30",
+        "depthMin": -20,
+        "depthMax": 10,
+        "tt": 7200,
+        "rt": 50000,
+        "matchOnce": True,
+        "resultSizeLimit": 0,
+        "platforms": "30",
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_L2_IS_SAMOS", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L2_IS_SAMOS')
+    check_count(len(data), 135, fail_on_miscount)
+
+    for match in data:
+        verify_match_consistence(match, params, bounding_poly)
+
+
[email protected]
+def test_matchup_spark_L2_IS_shark2018(host, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "ASCATB-L2-Coastal",
+        "secondary": "shark-2018",
+        "startTime": "2018-04-02T18:30:00Z",
+        "endTime": "2018-04-02T18:59:59Z",
+        "b": "-140,10,-110,40",
+        "depthMin": -5,
+        "depthMax": 5,
+        "tt": 3600,
+        "rt": 50000,
+        "matchOnce": True,
+        "resultSizeLimit": 0,
+        "platforms": "3B"
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_L2_IS_shark2018", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L2_IS_shark2018')
+    check_count(len(data), 53, fail_on_miscount)
+
+    for match in data:
+        verify_match_consistence(match, params, bounding_poly)
+
+
[email protected]
+def test_matchup_spark_L4_L4(host, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "MUR25-JPL-L4-GLOB-v04.2",
+        "secondary": "JPL-L4-MRVA-CHLA-GLOB-v3.0",
+        "startTime": "2018-08-01T00:00:00Z",
+        "endTime": "2018-08-02T00:00:00Z",
+        "b": "-100,20,-90,30",
+        "depthMin": -20,
+        "depthMax": 10,
+        "tt": 43200,
+        "rt": 1000,
+        "matchOnce": True,
+        "resultSizeLimit": 7000,
+        "platforms": "42"
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_L4_L4", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L4_L4')
+    check_count(len(data), 1110, fail_on_miscount)
+
+    for match in data:
+        verify_match_consistence(match, params, bounding_poly)
+
+
[email protected]
+def test_matchup_spark_L4_L2(host, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "MUR25-JPL-L4-GLOB-v04.2",
+        "secondary": "ASCATB-L2-Coastal",
+        "startTime": "2018-07-05T00:00:00Z",
+        "endTime": "2018-07-05T23:59:59Z",
+        "b": "-127,32,-120,40",
+        "depthMin": -20,
+        "depthMax": 10,
+        "tt": 12000,
+        "rt": 1000,
+        "matchOnce": True,
+        "resultSizeLimit": 7000,
+        "platforms": "42"
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_L4_L2", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L4_L2')
+    check_count(len(data), 6, fail_on_miscount)
+
+    for match in data:
+        verify_match_consistence(match, params, bounding_poly)
+
+
[email protected]
+def test_matchup_spark_L2_L4(host, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "ASCATB-L2-Coastal",
+        "secondary": "MUR25-JPL-L4-GLOB-v04.2",
+        "startTime": "2018-08-01T00:00:00Z",
+        "endTime": "2018-08-02T00:00:00Z",
+        "b": "-100,20,-90,30",
+        "depthMin": -20,
+        "depthMax": 10,
+        "tt": 43200,
+        "rt": 1000,
+        "matchOnce": True,
+        "resultSizeLimit": 7000,
+        "platforms": "65"
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = body = run_matchup(url, params)
+    try_save("test_matchup_spark_L2_L4", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L2_L4')
+    check_count(len(data), 21, fail_on_miscount)
+
+    for match in data:
+        verify_match_consistence(match, params, bounding_poly)
+
+
[email protected]
+def test_matchup_spark_L2_L2(host, start, fail_on_miscount):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "VIIRS_NPP-2018_Heatwave",
+        "secondary": "ASCATB-L2-Coastal",
+        "startTime": "2018-07-05T00:00:00Z",
+        "endTime": "2018-07-05T23:59:59Z",
+        "b": "-120,28,-118,30",
+        "depthMin": -20,
+        "depthMax": 10,
+        "tt": 43200,
+        "rt": 1000,
+        "matchOnce": True,
+        "resultSizeLimit": 7000,
+        "platforms": "42"
+    }
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    body = run_matchup(url, params, page_size=10000)
+    try_save("test_matchup_spark_L2_L2", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    uniq_primaries(data, case='test_matchup_spark_L2_L2')
+    check_count(len(data), 4027, fail_on_miscount)
+
+    for match in data:
+        verify_match_consistence(match, params, bounding_poly)
+
+
[email protected]
+def test_matchup_spark_prioritize_distance(host, start, 
distance_vs_time_query):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "JPL-L4-MRVA-CHLA-GLOB-v3.0",
+        "secondary": "shark-2018",
+        "startTime": "2018-04-01T00:00:00Z",
+        "endTime": "2018-04-01T23:59:59Z",
+        "b": "-131,26,-130,27",
+        "depthMin": -5,
+        "depthMax": 5,
+        "tt": 86400,
+        "rt": 10000,
+        "matchOnce": True,
+        "resultSizeLimit": 0,
+        "platforms": "3B",
+        "parameter": "mass_concentration_of_chlorophyll_in_sea_water",
+        "prioritizeDistance": True
+    }
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_prioritize_distance", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    check_count(len(data), 1, True)
+
+    primary_point = data[0]
+
+    valid_primary = [
+        "96bc2f4b-fd78-3c41-a3cb-cc6b2d7197a3[[0, 16, 19]]",
+        "96bc2f4b-fd78-3c41-a3cb-cc6b2d7197a3[[16, 19]]"
+    ]
+
+    assert primary_point['id'] in valid_primary, \
+        f'Incorrect primary returned{primary_point["id"]}'
+
+    match_point = primary_point['matches'][0]
+
+    if distance_vs_time_query['success']:
+        min_dist = distance_vs_time_query['distances']['min_dist']
+    else:
+        min_dist = distance_vs_time_query['backup']['min_dist']
+
+    assert match_point['lat'] == min_dist[0]  # "26.6141296"
+    assert match_point['lon'] == min_dist[1]  # "-130.0827904"
+    assert match_point['time'] == min_dist[2]  # 1522637640
+
+
[email protected]
+def test_matchup_spark_prioritize_time(host, start, distance_vs_time_query):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "JPL-L4-MRVA-CHLA-GLOB-v3.0",
+        "secondary": "shark-2018",
+        "startTime": "2018-04-01T00:00:00Z",
+        "endTime": "2018-04-01T23:59:59Z",
+        "b": "-131,26,-130,27",
+        "depthMin": -5,
+        "depthMax": 5,
+        "tt": 86400,
+        "rt": 10000,
+        "matchOnce": True,
+        "resultSizeLimit": 0,
+        "platforms": "3B",
+        "parameter": "mass_concentration_of_chlorophyll_in_sea_water",
+        "prioritizeDistance": False
+    }
+
+    body = run_matchup(url, params)
+    try_save("test_matchup_spark_prioritize_time", start, body)
+    data = body['data']
+
+    assert body['count'] == len(data)
+    check_count(len(data), 1, True)
+
+    primary_point = data[0]
+
+    valid_primary = [
+        "96bc2f4b-fd78-3c41-a3cb-cc6b2d7197a3[[0, 16, 19]]",
+        "96bc2f4b-fd78-3c41-a3cb-cc6b2d7197a3[[16, 19]]"
+    ]
+
+    assert primary_point['id'] in valid_primary, \
+        f'Incorrect primary returned{primary_point["id"]}'
+
+    match_point = primary_point['matches'][0]
+
+    if distance_vs_time_query['success']:
+        min_time = distance_vs_time_query['distances']['min_time']
+    else:
+        min_time = distance_vs_time_query['backup']['min_time']
+
+    assert match_point['lat'] == min_time[0]  # "26.6894016"
+    assert match_point['lon'] == min_time[1]  # "-130.0547072"
+    assert match_point['time'] == min_time[2]  # 1522626840
+
+
[email protected]
+def test_matchup_spark_job_cancellation(host, start):
+    url = urljoin(host, 'match_spark')
+
+    params = {
+        "primary": "MUR25-JPL-L4-GLOB-v04.2",
+        "secondary": "SAMOS",
+        # "startTime": "2018-01-01T00:00:00Z",
+        "startTime": "2018-01-02T00:00:00Z",
+        "endTime": "2018-01-10T23:59:59Z",
+        "b": "-130,20,-110,40",
+        "depthMin": -35,
+        "depthMax": 10,
+        "tt": 86400,
+        "rt": 50000,
+        "matchOnce": True,
+        "resultSizeLimit": 0,
+        "platforms": "30",
+    }
+
+    response = requests.get(url, params=params)
+
+    assert response.status_code == 200, 'Initial match_spark query failed'
+    response_json = response.json()
+
+    asynchronous = 'status' in response_json
+
+    if not asynchronous:
+        skip('Deployed SDAP version does not have asynchronous matchup')
+    else:
+        sleep(25) # Time to allow spark workers to start working
+
+        if response_json['status'] != 'running':
+            skip('Job finished before it could be cancelled')
+        else:
+            cancel_url = [link for link in response_json['links'] if 
link['rel'] == 'cancel'][0]['href']
+
+            cancel_response = requests.get(cancel_url)
+            assert cancel_response.status_code == 200, 'Cancellation query 
failed'
+
+            cancel_json = cancel_response.json()
+
+            assert cancel_json['status'] != 'running', 'Job did not cancel'
+
+            if cancel_json['status'] in ['success', 'failed']:
+                warnings.warn(f'Job status after cancellation is not 
\'cancelled\' ({cancel_json["status"]}), passing '
+                              f'case because it is no longer \'running\', but 
actual cancellation could not be tested '
+                              f'here.')
+
+
[email protected]
+def test_cdmsresults_json(host, eid, start):
+    url = urljoin(host, 'cdmsresults')
+
+    # Skip the test automatically if the matchup request was not successful
+    if not eid['successful']:
+        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
+
+    def fetch_result(execution_id, output):
+        return requests.get(url, params={"id": execution_id, "output": output})
+
+    eid_list = eid['eid']
+    param_list = eid['params']
+
+    response = fetch_result(eid_list[0], "JSON")
+
+    assert response.status_code == 200
+
+    body = response.json()
+    try_save("test_cdmsresults_json_A", start, body)
+
+    data = body['data']
+    assert len(data) == 5
+
+    for m in data:
+        m['point'] = f"Point({m['lon']} {m['lat']})"
+        for s in m['matches']:
+            s['point'] = f"Point({s['lon']} {s['lat']})"
+
+    data.sort(key=lambda e: e['point'])
+
+    params = param_list[0]
+    bounding_poly = b_to_polygon(params['b'])
+
+    verify_match(
+        data[0], 'Point(-86.125 27.625)',
+        1535360400, 'Point(-86.13 27.63)',
+        1535374800, params, bounding_poly
+    )
+
+    verify_match(
+        data[1], 'Point(-88.875 27.875)',
+        1534669200, 'Point(-88.88 27.88)',
+        1534698000, params, bounding_poly
+    )
+
+    verify_match(
+        data[2], 'Point(-90.125 27.625)',
+        1534496400, 'Point(-90.13 27.63)',
+        1534491000, params, bounding_poly
+    )
+
+    verify_match(
+        data[3], 'Point(-90.125 28.125)',
+        1534928400, 'Point(-90.13 28.12)',
+        1534899600, params, bounding_poly
+    )
+
+    verify_match(
+        data[4], 'Point(-90.375 28.125)',
+        1534842000, 'Point(-90.38 28.12)',
+        1534813200, params, bounding_poly
+    )
+
+    response = fetch_result(eid_list[1], "JSON")
+
+    assert response.status_code == 200
+
+    body = response.json()
+    try_save("test_cdmsresults_json_B", start, body)
+
+    data = body['data']
+    assert len(data) == 5
+
+    for m in data:
+        m['point'] = f"Point({m['lon']} {m['lat']})"
+        for s in m['matches']:
+            s['point'] = f"Point({s['lon']} {s['lat']})"
+
+    data.sort(key=lambda e: e['point'])
+
+    params = param_list[1]
+    bounding_poly = b_to_polygon(params['b'])
+
+    verify_match(
+        data[0], 'Point(-86.125 27.625)',
+        1535371200, 'Point(-86.13 27.63)',
+        1535374800, params, bounding_poly
+    )
+
+    verify_match(
+        data[1], 'Point(-88.875 27.875)',
+        1534680000, 'Point(-88.88 27.88)',
+        1534698000, params, bounding_poly
+    )
+
+    verify_match(
+        data[2], 'Point(-90.125 27.625)',
+        1534507200, 'Point(-90.13 27.63)',
+        1534491000, params, bounding_poly
+    )
+
+    verify_match(
+        data[3], 'Point(-90.125 28.125)',
+        1534939200, 'Point(-90.13 28.12)',
+        1534899600, params, bounding_poly
+    )
+
+    verify_match(
+        data[4], 'Point(-90.375 28.125)',
+        1534852800, 'Point(-90.38 28.12)',
+        1534813200, params, bounding_poly
+    )
+
+
[email protected]
+def test_cdmsresults_csv(host, eid, start):
+    url = urljoin(host, 'cdmsresults')
+
+    # Skip the test automatically if the matchup request was not successful
+    if not eid['successful']:
+        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
+
+    def fetch_result(execution_id, output):
+        return requests.get(url, params={"id": execution_id, "output": output})
+
+    eid_list = eid['eid']
+    param_list = eid['params']
+
+    response = fetch_result(eid_list[0], "CSV")
+    params = param_list[0]
+    bounding_poly = b_to_polygon(params['b'])
+
+    assert response.status_code == 200
+
+    try_save("test_cdmsresults_csv_A", start, response, "csv")
+
+    rows = response.text.split('\r\n')
+    index = rows.index('')
+
+    global_rows = rows[:index]
+    matchup_rows = rows[index + 1:-1]  # Drop trailing empty string from 
trailing newline
+
+    global_rows = translate_global_rows(global_rows)
+    matchup_rows = translate_matchup_rows(matchup_rows)
+
+    assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched'])
+
+    for row in matchup_rows:
+        primary_point = lat_lon_to_point(row['lat'], row['lon'])
+
+        assert bounding_poly.intersects(primary_point)
+        assert params['startTime'] <= format_time(row['time']) <= 
params['endTime']
+
+        verify_secondary_in_tolerance(
+            {'lat': row['lat'], 'lon': row['lon']},
+            {'lat': row['lat_secondary'], 'lon': row['lon_secondary']},
+            params['rt']
+        )
+        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
+               <= iso_time_to_epoch(format_time(row['time_secondary'])) \
+               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
+
+    response = fetch_result(eid_list[1], "CSV")
+    params = param_list[1]
+    bounding_poly = b_to_polygon(params['b'])
+
+    assert response.status_code == 200
+
+    try_save("test_cdmsresults_csv_B", start, response, "csv")
+
+    rows = response.text.split('\r\n')
+    index = rows.index('')
+
+    global_rows = rows[:index]
+    matchup_rows = rows[index + 1:-1]  # Drop trailing empty string from 
trailing newline
+
+    global_rows = translate_global_rows(global_rows)
+    matchup_rows = translate_matchup_rows(matchup_rows)
+
+    assert len(matchup_rows) == int(global_rows['CDMS_num_primary_matched'])
+
+    for row in matchup_rows:
+        primary_point = lat_lon_to_point(row['lat'], row['lon'])
+
+        assert bounding_poly.intersects(primary_point)
+        assert params['startTime'] <= format_time(row['time']) <= 
params['endTime']
+
+        verify_secondary_in_tolerance(
+            {'lat': row['lat'], 'lon': row['lon']},
+            {'lat': row['lat_secondary'], 'lon': row['lon_secondary']},
+            params['rt']
+        )
+        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
+               <= iso_time_to_epoch(format_time(row['time_secondary'])) \
+               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
+
+
[email protected]
+def test_cdmsresults_netcdf(host, eid, start):
+    warnings.filterwarnings('ignore')
+
+    url = urljoin(host, 'cdmsresults')
+
+    # Skip the test automatically if the matchup request was not successful
+    if not eid['successful']:
+        skip('Matchup request was unsuccessful so there are no results to get 
from domsresults')
+
+    def fetch_result(execution_id, output):
+        return requests.get(url, params={"id": execution_id, "output": output})
+
+    eid_list = eid['eid']
+    param_list = eid['params']
+
+    temp_file = Temp(mode='wb+', suffix='.csv.tmp', prefix='CDMSReader_')
+
+    response = fetch_result(eid_list[0], "NETCDF")
+    params = param_list[0]
+    bounding_poly = b_to_polygon(params['b'])
+
+    assert response.status_code == 200
+
+    try_save("test_cdmsresults_netcdf_A", start, response, "nc", 'wb')
+
+    temp_file.write(response.content)
+    temp_file.flush()
+    temp_file.seek(0)
+
+    matches = cdms_reader.assemble_matches(temp_file.name)
+
+    cdms_reader.matches_to_csv(matches, temp_file.name)
+
+    with open(temp_file.name) as f:
+        reader = csv.DictReader(f)
+        rows = list(reader)
+
+    for row in rows:
+        primary_point = lat_lon_to_point(row['PrimaryData_lat'], 
row['PrimaryData_lon'])
+
+        assert bounding_poly.intersects(primary_point)
+        assert iso_time_to_epoch(params['startTime']) \
+               <= float(row['PrimaryData_time']) \
+               <= iso_time_to_epoch(params['endTime'])
+
+        verify_secondary_in_tolerance(
+            {'lat': row['PrimaryData_lat'], 'lon': row['PrimaryData_lon']},
+            {'lat': row['SecondaryData_lat'], 'lon': row['SecondaryData_lon']},
+            params['rt']
+        )
+        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
+               <= float(row['SecondaryData_time']) \
+               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
+
+    response = fetch_result(eid_list[1], "NETCDF")
+    params = param_list[1]
+    bounding_poly = b_to_polygon(params['b'])
+
+    assert response.status_code == 200
+
+    try_save("test_cdmsresults_netcdf_B", start, response, "nc", 'wb')
+
+    temp_file.write(response.content)
+    temp_file.flush()
+    temp_file.seek(0)
+
+    matches = cdms_reader.assemble_matches(temp_file.name)
+
+    cdms_reader.matches_to_csv(matches, temp_file.name)
+
+    with open(temp_file.name) as f:
+        reader = csv.DictReader(f)
+        rows = list(reader)
+
+    for row in rows:
+        primary_point = lat_lon_to_point(row['PrimaryData_lat'], 
row['PrimaryData_lon'])
+
+        assert bounding_poly.intersects(primary_point)
+        assert iso_time_to_epoch(params['startTime']) \
+               <= float(row['PrimaryData_time']) \
+               <= iso_time_to_epoch(params['endTime'])
+
+        verify_secondary_in_tolerance(
+            {'lat': row['PrimaryData_lat'], 'lon': row['PrimaryData_lon']},
+            {'lat': row['SecondaryData_lat'], 'lon': row['SecondaryData_lon']},
+            params['rt']
+        )
+        assert (iso_time_to_epoch(params['startTime']) - params['tt']) \
+               <= float(row['SecondaryData_time']) \
+               <= (iso_time_to_epoch(params['endTime']) + params['tt'])
+
+    temp_file.close()
+    warnings.filterwarnings('default')
+
+
[email protected]
+def test_cdmslist(host, start):
+    url = urljoin(host, 'cdmslist')
+
+    response = requests.get(url)
+
+    assert response.status_code == 200
+
+    body = response.json()
+    try_save("test_cdmslist", start, body)
+
+    data = body['data']
+
+    num_satellite = len(data['satellite'])
+    num_insitu = len(data['insitu'])
+
+    assert num_insitu > 0
+    assert num_satellite > 0
+
+
[email protected]
+def test_cdmssubset_L4(host, start):
+    url = urljoin(host, 'cdmssubset')
+
+    params = {
+        "dataset": "MUR25-JPL-L4-GLOB-v04.2",
+        "parameter": "sst",
+        "startTime": "2018-09-24T00:00:00Z",
+        "endTime": "2018-09-30T00:00:00Z",
+        "b": "160,-30,180,-25",
+        "output": "ZIP"
+    }
+
+    response = requests.get(url, params=params)
+
+    assert response.status_code == 200
+
+    try_save("test_cdmssubset_L4_a", start, response, "zip", 'wb')
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    response_buf = io.BytesIO(response.content)
+
+    with ZipFile(response_buf) as data:
+        namelist = data.namelist()
+
+        assert namelist == ['MUR25-JPL-L4-GLOB-v04.2.csv']
+
+        csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
+        csv_data = pd.read_csv(csv_buf)
+
+    def validate_row_bounds(row):
+        assert bounding_poly.intersects(Point(float(row['longitude']), 
float(row['latitude'])))
+        assert params['startTime'] <= row['time'] <= params['endTime']
+
+    for i in range(0, len(csv_data)):
+        validate_row_bounds(csv_data.iloc[i])
+
+    params['dataset'] = 'OISSS_L4_multimission_7day_v1'
+
+    response = requests.get(url, params=params)
+
+    assert response.status_code == 200
+
+    try_save("test_cdmssubset_L4_b", start, response, "zip", 'wb')
+
+    response_buf = io.BytesIO(response.content)
+
+    with ZipFile(response_buf) as data:
+        namelist = data.namelist()
+
+        assert namelist == ['OISSS_L4_multimission_7day_v1.csv']
+
+        csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
+        csv_data = pd.read_csv(csv_buf)
+
+    for i in range(0, len(csv_data)):
+        validate_row_bounds(csv_data.iloc[i])
+
+
[email protected]
+def test_cdmssubset_L2(host, start):
+    url = urljoin(host, 'cdmssubset')
+
+    params = {
+        "dataset": "ASCATB-L2-Coastal",
+        "startTime": "2018-09-24T00:00:00Z",
+        "endTime": "2018-09-30T00:00:00Z",
+        "b": "160,-30,180,-25",
+        "output": "ZIP"
+    }
+
+    response = requests.get(url, params=params)
+
+    assert response.status_code == 200
+
+    try_save("test_cdmssubset_L2", start, response, "zip", 'wb')
+
+    bounding_poly = b_to_polygon(params['b'])
+
+    response_buf = io.BytesIO(response.content)
+
+    with ZipFile(response_buf) as data:
+        namelist = data.namelist()
+
+        assert namelist == ['ASCATB-L2-Coastal.csv']
+
+        csv_buf = io.StringIO(data.read(namelist[0]).decode('utf-8'))
+        csv_data = pd.read_csv(csv_buf)
+
+    def validate_row_bounds(row):
+        assert bounding_poly.intersects(Point(float(row['longitude']), 
float(row['latitude'])))
+        assert params['startTime'] <= row['time'] <= params['endTime']
+
+    for i in range(0, len(csv_data)):
+        validate_row_bounds(csv_data.iloc[i])
+
+
[email protected]
+def test_insitu_JPL(insitu_endpoint_JPL, start, timeouts):
+    params = {
+        'itemsPerPage': 20000,
+        'startTime': '2017-05-01T00:00:00Z',
+        'endTime': '2017-05-01T23:59:59Z',
+        'bbox': '-100,20,-79,30',
+        'minDepth': -20.0,
+        'maxDepth': 20.0,
+        'provider': 'Florida State University, COAPS',
+        'project': 'SAMOS',
+        'platform': '0,16,17,30,41,42',
+    }
+
+    response = requests.get(insitu_endpoint_JPL, params=params, 
timeout=timeouts)
+
+    assert response.status_code == 200
+
+    body = response.json()
+    try_save("test_insitu_JPL", start, body)
+
+    validate_insitu(body, params, 'JPL')
+
+
[email protected]
+# @pytest.mark.xfail
+def test_insitu_NCAR(insitu_endpoint_NCAR, start, timeouts):
+    params = {
+        'itemsPerPage': 1000,
+        'startTime': '2017-07-01T00:00:00Z',
+        # 'endTime': '2017-07-05T00:00:00Z',
+        'endTime': '2017-08-01T00:00:00Z',
+        # 'bbox': '-65,25,-60,30',
+        'bbox': '-65,25,-55,30',
+        'minDepth': 0,
+        'maxDepth': 5,
+        'provider': 'NCAR',
+        'project': 'ICOADS Release 3.0',
+        'platform': '0,16,17,30,41,42',
+    }
+
+    response = requests.get(insitu_endpoint_NCAR, params=params, 
timeout=timeouts)
+
+    assert response.status_code == 200
+
+    body = response.json()
+    try_save("test_insitu_NCAR", start, body)
+
+    validate_insitu(body, params, 'NCAR')
+
+
[email protected]
+def test_insitu_saildrone(insitu_endpoint_saildrone, start, timeouts):
+    params = {
+        'itemsPerPage': 1000,
+        'startTime': '2018-04-01T00:00:00Z',
+        'endTime': '2018-04-01T23:59:59Z',
+        'bbox': '-140,10,-110,40',
+        'minDepth': -10,
+        'maxDepth': 5,
+        'provider': 'Saildrone',
+        'project': 'shark-2018',
+        'platform': '3B',
+    }
+
+    # Saildrone test has a tendency to fail, retry a couple times then just 
issue a warning if it persists
+    response = None
+
+    try:
+        response = requests.get(insitu_endpoint_saildrone, params=params, 
timeout=timeouts)
+        success = response.status_code == 200
+    except ConnectTimeout:
+        success = False
+
+    retries = 3
+    delay = [5, 15, 30]
+
+    while not success and retries > 0:
+        sleep(delay[3 - retries])
+        try:
+            response = requests.get(insitu_endpoint_saildrone, params=params, 
timeout=timeouts)
+            success = response.status_code == 200
+        except ConnectTimeout:
+            success = False
+        retries -= 1
+
+    if not success and response is None:
+        warnings.warn(f'Saildrone request failed due to connection timeout - 
max retries exceeded')
+        return
+
+    assert response.status_code == 200
+
+    body = response.json()
+    try_save("test_insitu_saildrone", start, body)
+
+    validate_insitu(body, params, 'Saildrone')
+
+
[email protected]
+def test_insitu_schema(start, timeouts):
+    url = 'https://doms.jpl.nasa.gov/insitu/1.0/cdms_schema'
+
+    response = requests.get(url, timeout=timeouts)
+
+    assert response.status_code == 200
+
+    assert len(response.json()) > 0
+
+
[email protected]
+def test_swaggerui_sdap(host):
+    url = urljoin(host, 'apidocs/')
+
+    response = requests.get(url)
+
+    assert response.status_code == 200
+    assert 'swagger-ui' in response.text
+
+    try:
+        # There's probably a better way to do this, but extract the .yml file 
for the docs from the returned text
+        soup = BeautifulSoup(response.text, 'html.parser')
+
+        script = str([tag for tag in soup.find_all('script') if tag.attrs == 
{}][0])
+
+        start_index = script.find('url:')
+        end_index = script.find('",\n', start_index)
+
+        script = script[start_index:end_index]
+
+        yml_filename = script.split('"')[1]
+
+        url = urljoin(url, yml_filename)
+
+        response = requests.get(url)
+
+        assert response.status_code == 200
+    except AssertionError:
+        raise
+    except:
+        try:
+            url = urljoin(url, 'openapi.yml')
+
+            response = requests.get(url)
+
+            assert response.status_code == 200
+
+            warnings.warn("Could not extract documentation yaml filename from 
response text, "
+                          "but using an assumed value worked successfully")
+        except:
+            raise ValueError("Could not verify documentation yaml file, 
assumed value also failed")
+
+
[email protected]
+def test_swaggerui_insitu(insitu_swagger_endpoint):
+    response = requests.get(insitu_swagger_endpoint)
+
+    assert response.status_code == 200
+    assert 'swagger-ui' in response.text
+
+    try:
+        # There's probably a better way to do this, but extract the .yml file 
for the docs from the returned text
+        soup = BeautifulSoup(response.text, 'html.parser')
+
+        script = str([tag for tag in soup.find_all('script') if tag.attrs == 
{}][0])
+
+        start_index = script.find('url:')
+        end_index = script.find('",\n', start_index)
+
+        script = script[start_index:end_index]
+
+        yml_filename = script.split('"')[1]
+
+        url = urljoin(insitu_swagger_endpoint, yml_filename)
+
+        response = requests.get(url)
+
+        assert response.status_code == 200
+    except AssertionError:
+        raise
+    except:
+        try:
+            url = urljoin(insitu_swagger_endpoint, 'insitu-spec-0.0.1.yml')
+
+            response = requests.get(url)
+
+            assert response.status_code == 200
+
+            warnings.warn("Could not extract documentation yaml filename from 
response text, "
+                          "but using an assumed value worked successfully")
+        except:
+            raise ValueError("Could not verify documentation yaml file, 
assumed value also failed")

Reply via email to