This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new eca2bac  SDAP-487 - Changes to doms_data schema to improve result 
fetch speed (#275)
eca2bac is described below

commit eca2bac44bead7d13a348dfbaa354b1e272baa87
Author: Riley Kuttruff <[email protected]>
AuthorDate: Wed Sep 13 14:48:28 2023 -0700

    SDAP-487 - Changes to doms_data schema to improve result fetch speed (#275)
    
    * Renamed doms-ops dir and existing script
    
    * Altered doms_data primary key to support primary id in WHERE clause
    
    * Narrow secondary data query(ies) by pvid
    
    * Conversion script
    
    * Improved fetch
    
    Benchmark ~27m to ~6m for 661 p & 2,116,205 s
    
    * Corrected primary insert params
    
    Expected `None` to set `is_primary` to true. Since it cannot be `None` as 
part of the PK, it now uses the value `PRIMARY`
    
    * Update analysis/webservice/algorithms/doms/ResultsStorage.py
    
    Co-authored-by: Stepheny Perez <[email protected]>
    
    * Addressing some comments
    
    * Descriptive docstring + cleanup
    
    ---------
    
    Co-authored-by: rileykk <[email protected]>
    Co-authored-by: Stepheny Perez <[email protected]>
---
 .../algorithms/doms/DomsInitialization.py          |   2 +-
 .../webservice/algorithms/doms/ResultsStorage.py   |  43 +--
 tools/doms-data-tools/update_doms_data_pk.py       | 288 +++++++++++++++++++++
 .../update_values_type.py}                         |  16 ++
 4 files changed, 330 insertions(+), 19 deletions(-)

diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py 
b/analysis/webservice/algorithms/doms/DomsInitialization.py
index 74a2aa6..43627b1 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -166,7 +166,7 @@ class DomsInitializer:
               measurement_values_json text,
               depth decimal,
               file_url text,
-              PRIMARY KEY (execution_id, is_primary, id)
+              PRIMARY KEY ((execution_id, is_primary), primary_value_id, id)
             );
         """
         session.execute(cql)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py 
b/analysis/webservice/algorithms/doms/ResultsStorage.py
index 48a89a0..39db27b 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -16,23 +16,23 @@
 import configparser
 import json
 import logging
-from time import sleep
-import math
 import uuid
 from datetime import datetime
+from time import sleep
 
-import numpy as np
 import pkg_resources
 from cassandra.auth import PlainTextAuthProvider
 from cassandra.cluster import Cluster
+from cassandra.concurrent import execute_concurrent_with_args
 from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
-from cassandra.query import BatchStatement, SimpleStatement
 from pytz import UTC
 from webservice.algorithms.doms.BaseDomsHandler import DomsEncoder
 from webservice.webmodel import NexusProcessingException
 
 BATCH_SIZE = 1024
 
+logger = logging.getLogger(__name__)
+
 
 class ResultInsertException(IOError):
     pass
@@ -192,7 +192,9 @@ class ResultsStorage(AbstractResultsContainer):
         inserts = []
 
         for result in results:
-            inserts.extend(self.__prepare_result(execution_id, None, result, 
insertStatement))
+            # 'PRIMARY' arg since primary values cannot have primary_value_id 
be null anymore
+            # Secondary matches are prepped recursively from this call
+            inserts.extend(self.__prepare_result(execution_id, 'PRIMARY', 
result, insertStatement))
 
         for i in range(5):
             success, failed_entries = self.__insert_result_batches(inserts, 
insertStatement)
@@ -261,7 +263,7 @@ class ResultsStorage(AbstractResultsContainer):
             result["platform"] if "platform" in result else None,
             result["device"] if "device" in result else None,
             json.dumps(data, cls=DomsEncoder),
-            1 if primaryId is None else 0,
+            1 if primaryId is 'PRIMARY' else 0,
             result["depth"],
             result['fileurl']
         )
@@ -275,8 +277,6 @@ class ResultsStorage(AbstractResultsContainer):
         return params_list
 
 
-
-
 class ResultsRetrieval(AbstractResultsContainer):
     def __init__(self, config=None):
         AbstractResultsContainer.__init__(self, config)
@@ -297,15 +297,22 @@ class ResultsRetrieval(AbstractResultsContainer):
         return data
 
     def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False):
-        cql = "SELECT * FROM doms_data where execution_id = %s and is_primary 
= false"
-        rows = self._session.execute(cql, (id,))
+        cql = f"SELECT * FROM doms_data where execution_id = {str(id)} and 
is_primary = false and primary_value_id = ?"
+        statement = self._session.prepare(cql)
 
-        for row in rows:
-            entry = self.__rowToDataEntry(row, trim_data=trim_data)
-            if row.primary_value_id in dataMap:
-                if not "matches" in dataMap[row.primary_value_id]:
-                    dataMap[row.primary_value_id]["matches"] = []
-                dataMap[row.primary_value_id]["matches"].append(entry)
+        primary_ids = list(dataMap.keys())
+
+        logger.info(f'Getting secondary data for {len(primary_ids)} primaries 
of {str(id)}')
+
+        for (success, rows) in execute_concurrent_with_args(
+            self._session, statement, [(i,) for i in primary_ids], 
concurrency=50, results_generator=True
+        ):
+            for row in rows:
+                entry = self.__rowToDataEntry(row, trim_data=trim_data)
+                if row.primary_value_id in dataMap:
+                    if not "matches" in dataMap[row.primary_value_id]:
+                        dataMap[row.primary_value_id]["matches"] = []
+                    dataMap[row.primary_value_id]["matches"].append(entry)
 
     def __retrievePrimaryData(self, id, trim_data=False, page_num=2, 
page_size=10):
         cql = "SELECT * FROM doms_data where execution_id = %s and is_primary 
= true limit %s"
@@ -361,7 +368,7 @@ class ResultsRetrieval(AbstractResultsContainer):
             }
             return stats
 
-        raise Exception("Execution not found with id '%s'" % id)
+        raise NexusProcessingException(reason=f'No stats found for id 
{str(id)}', code=404)
 
     def retrieveParams(self, id):
         cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
@@ -387,7 +394,7 @@ class ResultsRetrieval(AbstractResultsContainer):
             }
             return params
 
-        raise Exception("Execution not found with id '%s'" % id)
+        raise NexusProcessingException(reason=f'No params found for id 
{str(id)}', code=404)
 
     def retrieveExecution(self, execution_id):
         """
diff --git a/tools/doms-data-tools/update_doms_data_pk.py 
b/tools/doms-data-tools/update_doms_data_pk.py
new file mode 100644
index 0000000..ed8dbe5
--- /dev/null
+++ b/tools/doms-data-tools/update_doms_data_pk.py
@@ -0,0 +1,288 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Script to transition doms.doms_data table to new schema with a primary key 
that will
+enable faster execution retrieval for large matchOnce=false matchups. Due to 
the nature
+of Cassandra, this will necessitate creating a temporary table, copying the 
data over,
+dropping the old table, recreating the table with the adjusted schema, copying 
the data
+back, and dropping the temporary table. This script does those steps, with the 
added
+option to stop after the initial copy for testing purposes.
+"""
+
+import argparse
+import logging
+import sys
+
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, 
EXEC_PROFILE_DEFAULT
+from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy
+
+BATCH_SIZE = 10000
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s',
+    stream=sys.stdout
+)
+log = logging.getLogger(__name__)
+
+
+def main():
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        '-u', '--cassandra-username',
+        help='The username used to connect to Cassandra.',
+        dest='username',
+        required=False,
+        default='cassandra',
+        metavar='USERNAME'
+    )
+
+    parser.add_argument(
+        '-p', '--cassandra-password',
+        dest='password',
+        help='The password used to connect to Cassandra.',
+        required=False,
+        default='cassandra',
+        metavar='PASSWORD'
+    )
+
+    parser.add_argument(
+        '--cassandra',
+        help='The hostname(s) or IP(s) of the Cassandra server(s).',
+        required=False,
+        default=['localhost'],
+        dest='hosts',
+        nargs='+',
+        metavar=('localhost', '127.0.0.101')
+    )
+
+    parser.add_argument(
+        '--cassandraPort',
+        help='The port used to connect to Cassandra.',
+        dest='port',
+        required=False,
+        default=9042,
+        type=int
+    )
+
+    parser.add_argument(
+        '--action',
+        help='Copy or move',
+        dest='action',
+        required=False,
+        default='move',
+        choices=['move', 'copy']
+    )
+
+    args = parser.parse_args()
+
+    log.info('Connecting to Cassandra cluster')
+
+    dc_policy = RoundRobinPolicy()
+    token_policy = TokenAwarePolicy(dc_policy)
+
+    if args.username and args.password:
+        auth_provider = PlainTextAuthProvider(username=args.username, 
password=args.password)
+    else:
+        auth_provider = None
+
+    contact_points = []
+
+    for host_list in args.hosts:
+        contact_points.extend(host_list.split(','))
+
+    try:
+        with Cluster(contact_points,
+                     port=int(args.port),
+                     execution_profiles={
+                         EXEC_PROFILE_DEFAULT: ExecutionProfile(
+                             load_balancing_policy=token_policy,
+                             request_timeout=60.0,
+                         )
+                     },
+                     protocol_version=3,
+                     auth_provider=auth_provider) as cluster:
+
+            session = cluster.connect('doms')
+
+            log.info('Connected successfully to Cassandra')
+
+            cql = """
+            CREATE TABLE IF NOT EXISTS doms_data_temp (
+              id uuid,
+              execution_id uuid,
+              value_id text,
+              primary_value_id text,
+              is_primary boolean,
+              x decimal,
+              y decimal,
+              source_dataset text,
+              measurement_time timestamp,
+              platform text,
+              device text,
+              measurement_values_json text,
+              depth decimal,
+              file_url text,
+              PRIMARY KEY ((execution_id, is_primary), primary_value_id, id)
+            );
+            """
+
+            log.info('Creating temp data table')
+
+            session.execute(cql)
+
+            def move_table(src_t, dst_t, can_delete=True):
+                move_rows = []
+
+                cql = f"""
+                SELECT * FROM {src_t};
+                """
+
+                data_rows = session.execute(cql)
+
+                insert_cql = f"""
+                INSERT INTO {dst_t}
+                    (id, execution_id, value_id, primary_value_id, x, y, 
source_dataset, measurement_time, platform, device, measurement_values_json, 
is_primary, depth, file_url)
+               VALUES
+                    (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                """
+
+                insert_statement = session.prepare(insert_cql)
+
+                n_moved = 0
+
+                def do_move(rows):
+                    remaining_rows = rows
+                    failed = []
+                    futures = []
+
+                    log.info(f'Inserting {len(rows):,} rows to {dst_t}')
+
+                    while len(remaining_rows) > 0:
+                        for entry in rows:
+                            futures.append((entry, 
session.execute_async(insert_statement, entry)))
+
+                        for entry, future in futures:
+                            try:
+                                future.result()
+                            except Exception:
+                                failed.append(entry)
+
+                        if len(failed) > 0:
+                            remaining_rows = failed
+                            failed = []
+
+                            log.warning(f'Need to retry 
{len(remaining_rows):,} inserts')
+                        else:
+                            remaining_rows = []
+
+                        return len(rows)
+
+                for row in data_rows:
+                    pvid = row.primary_value_id
+
+                    if pvid is None:
+                        pvid = 'PRIMARY'
+
+                    move_rows.append(
+                        (
+                            row.id,
+                            row.execution_id,
+                            row.value_id,
+                            pvid,
+                            row.x,
+                            row.y,
+                            row.source_dataset,
+                            row.measurement_time,
+                            row.platform,
+                            row.device,
+                            row.measurement_values_json,
+                            row.is_primary,
+                            row.depth,
+                            row.file_url
+                        )
+                    )
+
+                    if len(move_rows) >= BATCH_SIZE:
+                        n_moved += do_move(move_rows)
+                        log.info(f'Moved {n_moved:,} rows so far')
+                        move_rows = []
+
+                if len(move_rows) > 0:
+                    n_moved += do_move(move_rows)
+                    log.info(f'Moved {n_moved:,} rows so far')
+
+            log.info('Copying data to temp table')
+
+            move_table('doms_data', 'doms_data_temp')
+
+            if args.action == 'move':
+                cql = """
+                DROP TABLE doms_data;
+                """
+
+                log.info('Dropping old table')
+
+                session.execute(cql)
+
+                cql = """
+                            CREATE TABLE doms_data (
+                              id uuid,
+                              execution_id uuid,
+                              value_id text,
+                              primary_value_id text,
+                              is_primary boolean,
+                              x decimal,
+                              y decimal,
+                              source_dataset text,
+                              measurement_time timestamp,
+                              platform text,
+                              device text,
+                              measurement_values_json text,
+                              depth decimal,
+                              file_url text,
+                              PRIMARY KEY ((execution_id, is_primary), 
primary_value_id, id)
+                            );
+                            """
+
+                log.info('Creating data table with corrected schema')
+
+                session.execute(cql)
+
+                log.info('Copying data back')
+
+                move_table('doms_data_temp', 'doms_data', False)
+
+                cql = """
+                        DROP TABLE doms_data_temp;
+                        """
+
+                log.info('Dropping temp table')
+
+                session.execute(cql)
+
+            log.info('Disconnecting from Cassandra')
+            session.shutdown()
+
+        log.info('Done')
+    except NoHostAvailable as ne:
+        log.exception(ne)
+        exit(1)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/tools/update-doms-data-schema/update.py 
b/tools/doms-data-tools/update_values_type.py
similarity index 90%
rename from tools/update-doms-data-schema/update.py
rename to tools/doms-data-tools/update_values_type.py
index 1f08b12..c5e5ada 100644
--- a/tools/update-doms-data-schema/update.py
+++ b/tools/doms-data-tools/update_values_type.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
 import argparse
 import configparser
 import decimal

Reply via email to