This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new eca2bac SDAP-487 - Changes to doms_data schema to improve result
fetch speed (#275)
eca2bac is described below
commit eca2bac44bead7d13a348dfbaa354b1e272baa87
Author: Riley Kuttruff <[email protected]>
AuthorDate: Wed Sep 13 14:48:28 2023 -0700
SDAP-487 - Changes to doms_data schema to improve result fetch speed (#275)
* Renamed doms-ops dir and existing script
* Altered doms_data primary key to support primary id in WHERE clause
* Narrow secondary data query(ies) by pvid
* Conversion script
* Improved fetch
Benchmark ~27m to ~6m for 661 p & 2,116,205 s
* Corrected primary insert params
Expected `None` to set `is_primary` to true. Since it cannot be `None` as
part of the PK, it now uses the value `PRIMARY`
* Update analysis/webservice/algorithms/doms/ResultsStorage.py
Co-authored-by: Stepheny Perez <[email protected]>
* Addressing some comments
* Descriptive docstring + cleanup
---------
Co-authored-by: rileykk <[email protected]>
Co-authored-by: Stepheny Perez <[email protected]>
---
.../algorithms/doms/DomsInitialization.py | 2 +-
.../webservice/algorithms/doms/ResultsStorage.py | 43 +--
tools/doms-data-tools/update_doms_data_pk.py | 288 +++++++++++++++++++++
.../update_values_type.py} | 16 ++
4 files changed, 330 insertions(+), 19 deletions(-)
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py
b/analysis/webservice/algorithms/doms/DomsInitialization.py
index 74a2aa6..43627b1 100644
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ b/analysis/webservice/algorithms/doms/DomsInitialization.py
@@ -166,7 +166,7 @@ class DomsInitializer:
measurement_values_json text,
depth decimal,
file_url text,
- PRIMARY KEY (execution_id, is_primary, id)
+ PRIMARY KEY ((execution_id, is_primary), primary_value_id, id)
);
"""
session.execute(cql)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py
b/analysis/webservice/algorithms/doms/ResultsStorage.py
index 48a89a0..39db27b 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -16,23 +16,23 @@
import configparser
import json
import logging
-from time import sleep
-import math
import uuid
from datetime import datetime
+from time import sleep
-import numpy as np
import pkg_resources
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
+from cassandra.concurrent import execute_concurrent_with_args
from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
-from cassandra.query import BatchStatement, SimpleStatement
from pytz import UTC
from webservice.algorithms.doms.BaseDomsHandler import DomsEncoder
from webservice.webmodel import NexusProcessingException
BATCH_SIZE = 1024
+logger = logging.getLogger(__name__)
+
class ResultInsertException(IOError):
pass
@@ -192,7 +192,9 @@ class ResultsStorage(AbstractResultsContainer):
inserts = []
for result in results:
- inserts.extend(self.__prepare_result(execution_id, None, result,
insertStatement))
+ # 'PRIMARY' arg since primary values cannot have primary_value_id
be null anymore
+ # Secondary matches are prepped recursively from this call
+ inserts.extend(self.__prepare_result(execution_id, 'PRIMARY',
result, insertStatement))
for i in range(5):
success, failed_entries = self.__insert_result_batches(inserts,
insertStatement)
@@ -261,7 +263,7 @@ class ResultsStorage(AbstractResultsContainer):
result["platform"] if "platform" in result else None,
result["device"] if "device" in result else None,
json.dumps(data, cls=DomsEncoder),
- 1 if primaryId is None else 0,
+ 1 if primaryId is 'PRIMARY' else 0,
result["depth"],
result['fileurl']
)
@@ -275,8 +277,6 @@ class ResultsStorage(AbstractResultsContainer):
return params_list
-
-
class ResultsRetrieval(AbstractResultsContainer):
def __init__(self, config=None):
AbstractResultsContainer.__init__(self, config)
@@ -297,15 +297,22 @@ class ResultsRetrieval(AbstractResultsContainer):
return data
def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False):
- cql = "SELECT * FROM doms_data where execution_id = %s and is_primary
= false"
- rows = self._session.execute(cql, (id,))
+ cql = f"SELECT * FROM doms_data where execution_id = {str(id)} and
is_primary = false and primary_value_id = ?"
+ statement = self._session.prepare(cql)
- for row in rows:
- entry = self.__rowToDataEntry(row, trim_data=trim_data)
- if row.primary_value_id in dataMap:
- if not "matches" in dataMap[row.primary_value_id]:
- dataMap[row.primary_value_id]["matches"] = []
- dataMap[row.primary_value_id]["matches"].append(entry)
+ primary_ids = list(dataMap.keys())
+
+ logger.info(f'Getting secondary data for {len(primary_ids)} primaries
of {str(id)}')
+
+ for (success, rows) in execute_concurrent_with_args(
+ self._session, statement, [(i,) for i in primary_ids],
concurrency=50, results_generator=True
+ ):
+ for row in rows:
+ entry = self.__rowToDataEntry(row, trim_data=trim_data)
+ if row.primary_value_id in dataMap:
+ if not "matches" in dataMap[row.primary_value_id]:
+ dataMap[row.primary_value_id]["matches"] = []
+ dataMap[row.primary_value_id]["matches"].append(entry)
def __retrievePrimaryData(self, id, trim_data=False, page_num=2,
page_size=10):
cql = "SELECT * FROM doms_data where execution_id = %s and is_primary
= true limit %s"
@@ -361,7 +368,7 @@ class ResultsRetrieval(AbstractResultsContainer):
}
return stats
- raise Exception("Execution not found with id '%s'" % id)
+ raise NexusProcessingException(reason=f'No stats found for id
{str(id)}', code=404)
def retrieveParams(self, id):
cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
@@ -387,7 +394,7 @@ class ResultsRetrieval(AbstractResultsContainer):
}
return params
- raise Exception("Execution not found with id '%s'" % id)
+ raise NexusProcessingException(reason=f'No params found for id
{str(id)}', code=404)
def retrieveExecution(self, execution_id):
"""
diff --git a/tools/doms-data-tools/update_doms_data_pk.py
b/tools/doms-data-tools/update_doms_data_pk.py
new file mode 100644
index 0000000..ed8dbe5
--- /dev/null
+++ b/tools/doms-data-tools/update_doms_data_pk.py
@@ -0,0 +1,288 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Script to transition doms.doms_data table to new schema with a primary key
that will
+enable faster execution retrieval for large matchOnce=false matchups. Due to
the nature
+of Cassandra, this will necessitate creating a temporary table, copying the
data over,
+dropping the old table, recreating the table with the adjusted schema, copying
the data
+back, and dropping the temporary table. This script does those steps, with the
added
+option to stop after the initial copy for testing purposes.
+"""
+
+import argparse
+import logging
+import sys
+
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile,
EXEC_PROFILE_DEFAULT
+from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy
+
+BATCH_SIZE = 10000
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s',
+ stream=sys.stdout
+)
+log = logging.getLogger(__name__)
+
+
+def main():
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument(
+ '-u', '--cassandra-username',
+ help='The username used to connect to Cassandra.',
+ dest='username',
+ required=False,
+ default='cassandra',
+ metavar='USERNAME'
+ )
+
+ parser.add_argument(
+ '-p', '--cassandra-password',
+ dest='password',
+ help='The password used to connect to Cassandra.',
+ required=False,
+ default='cassandra',
+ metavar='PASSWORD'
+ )
+
+ parser.add_argument(
+ '--cassandra',
+ help='The hostname(s) or IP(s) of the Cassandra server(s).',
+ required=False,
+ default=['localhost'],
+ dest='hosts',
+ nargs='+',
+ metavar=('localhost', '127.0.0.101')
+ )
+
+ parser.add_argument(
+ '--cassandraPort',
+ help='The port used to connect to Cassandra.',
+ dest='port',
+ required=False,
+ default=9042,
+ type=int
+ )
+
+ parser.add_argument(
+ '--action',
+ help='Copy or move',
+ dest='action',
+ required=False,
+ default='move',
+ choices=['move', 'copy']
+ )
+
+ args = parser.parse_args()
+
+ log.info('Connecting to Cassandra cluster')
+
+ dc_policy = RoundRobinPolicy()
+ token_policy = TokenAwarePolicy(dc_policy)
+
+ if args.username and args.password:
+ auth_provider = PlainTextAuthProvider(username=args.username,
password=args.password)
+ else:
+ auth_provider = None
+
+ contact_points = []
+
+ for host_list in args.hosts:
+ contact_points.extend(host_list.split(','))
+
+ try:
+ with Cluster(contact_points,
+ port=int(args.port),
+ execution_profiles={
+ EXEC_PROFILE_DEFAULT: ExecutionProfile(
+ load_balancing_policy=token_policy,
+ request_timeout=60.0,
+ )
+ },
+ protocol_version=3,
+ auth_provider=auth_provider) as cluster:
+
+ session = cluster.connect('doms')
+
+ log.info('Connected successfully to Cassandra')
+
+ cql = """
+ CREATE TABLE IF NOT EXISTS doms_data_temp (
+ id uuid,
+ execution_id uuid,
+ value_id text,
+ primary_value_id text,
+ is_primary boolean,
+ x decimal,
+ y decimal,
+ source_dataset text,
+ measurement_time timestamp,
+ platform text,
+ device text,
+ measurement_values_json text,
+ depth decimal,
+ file_url text,
+ PRIMARY KEY ((execution_id, is_primary), primary_value_id, id)
+ );
+ """
+
+ log.info('Creating temp data table')
+
+ session.execute(cql)
+
+ def move_table(src_t, dst_t, can_delete=True):
+ move_rows = []
+
+ cql = f"""
+ SELECT * FROM {src_t};
+ """
+
+ data_rows = session.execute(cql)
+
+ insert_cql = f"""
+ INSERT INTO {dst_t}
+ (id, execution_id, value_id, primary_value_id, x, y,
source_dataset, measurement_time, platform, device, measurement_values_json,
is_primary, depth, file_url)
+ VALUES
+ (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """
+
+ insert_statement = session.prepare(insert_cql)
+
+ n_moved = 0
+
+ def do_move(rows):
+ remaining_rows = rows
+ failed = []
+ futures = []
+
+ log.info(f'Inserting {len(rows):,} rows to {dst_t}')
+
+ while len(remaining_rows) > 0:
+ for entry in rows:
+ futures.append((entry,
session.execute_async(insert_statement, entry)))
+
+ for entry, future in futures:
+ try:
+ future.result()
+ except Exception:
+ failed.append(entry)
+
+ if len(failed) > 0:
+ remaining_rows = failed
+ failed = []
+
+ log.warning(f'Need to retry
{len(remaining_rows):,} inserts')
+ else:
+ remaining_rows = []
+
+ return len(rows)
+
+ for row in data_rows:
+ pvid = row.primary_value_id
+
+ if pvid is None:
+ pvid = 'PRIMARY'
+
+ move_rows.append(
+ (
+ row.id,
+ row.execution_id,
+ row.value_id,
+ pvid,
+ row.x,
+ row.y,
+ row.source_dataset,
+ row.measurement_time,
+ row.platform,
+ row.device,
+ row.measurement_values_json,
+ row.is_primary,
+ row.depth,
+ row.file_url
+ )
+ )
+
+ if len(move_rows) >= BATCH_SIZE:
+ n_moved += do_move(move_rows)
+ log.info(f'Moved {n_moved:,} rows so far')
+ move_rows = []
+
+ if len(move_rows) > 0:
+ n_moved += do_move(move_rows)
+ log.info(f'Moved {n_moved:,} rows so far')
+
+ log.info('Copying data to temp table')
+
+ move_table('doms_data', 'doms_data_temp')
+
+ if args.action == 'move':
+ cql = """
+ DROP TABLE doms_data;
+ """
+
+ log.info('Dropping old table')
+
+ session.execute(cql)
+
+ cql = """
+ CREATE TABLE doms_data (
+ id uuid,
+ execution_id uuid,
+ value_id text,
+ primary_value_id text,
+ is_primary boolean,
+ x decimal,
+ y decimal,
+ source_dataset text,
+ measurement_time timestamp,
+ platform text,
+ device text,
+ measurement_values_json text,
+ depth decimal,
+ file_url text,
+ PRIMARY KEY ((execution_id, is_primary),
primary_value_id, id)
+ );
+ """
+
+ log.info('Creating data table with corrected schema')
+
+ session.execute(cql)
+
+ log.info('Copying data back')
+
+ move_table('doms_data_temp', 'doms_data', False)
+
+ cql = """
+ DROP TABLE doms_data_temp;
+ """
+
+ log.info('Dropping temp table')
+
+ session.execute(cql)
+
+ log.info('Disconnecting from Cassandra')
+ session.shutdown()
+
+ log.info('Done')
+ except NoHostAvailable as ne:
+ log.exception(ne)
+ exit(1)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tools/update-doms-data-schema/update.py
b/tools/doms-data-tools/update_values_type.py
similarity index 90%
rename from tools/update-doms-data-schema/update.py
rename to tools/doms-data-tools/update_values_type.py
index 1f08b12..c5e5ada 100644
--- a/tools/update-doms-data-schema/update.py
+++ b/tools/doms-data-tools/update_values_type.py
@@ -1,3 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
import argparse
import configparser
import decimal