Changeset: d2138293ca3a for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d2138293ca3a Modified Files: clients/iotapi/src/Streams/streampolling.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/streampolling.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Streams/streamscreator.py Branch: iot Log Message:
Added stream context merge and type checking while reading created streams from the database diffs (truncated from 565 to 300 lines): diff --git a/clients/iotapi/src/Streams/streampolling.py b/clients/iotapi/src/Streams/streampolling.py --- a/clients/iotapi/src/Streams/streampolling.py +++ b/clients/iotapi/src/Streams/streampolling.py @@ -27,10 +27,10 @@ def init_stream_polling_thread(interval) # elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type, elem[4] is type_digits # elem[5] is type_scale elem[6] is default value elem[7] is nullable def stream_polling(): - current_streams = Streams_Context.get_existing_streams() + array = fetch_streams() # TODO check whenever stream's columns are updated retained_streams = [] new_streams = {} - array = fetch_streams() # TODO check whenever stream's columns are updated + current_streams = Streams_Context.get_existing_streams() if array is not None: for key, group in groupby(array, lambda x: Streams_Context.get_context_entry_name(x[0], x[1])): @@ -48,5 +48,4 @@ def stream_polling(): else: retained_streams.append(key) - retained_streams_final = [key for key in current_streams if key in retained_streams] - Streams_Context.merge_context(retained_streams_final, new_streams) + Streams_Context.merge_context(retained_streams, new_streams) diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -1,5 +1,5 @@ +import pymonetdb import sys -import pymonetdb from Settings.iotlogger import add_log from Streams.streamscontext import IOTStreams @@ -49,11 +49,11 @@ def mapi_get_webserver_streams(): try: Connection.execute("START TRANSACTION") cursor = Connection.cursor() - sql_string = """SELECT tables."id", schemas."name" AS schema, tables."name" AS table, extras."has_hostname", - extras."base", extras."interval", extras."unit" FROM (SELECT "id", "name", "schema_id" FROM sys.tables - WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS schemas ON - (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", "has_hostname", "base", "interval", "unit" - FROM iot.webserverstreams) AS extras ON (tables."id"=extras."table_id")""".replace('\n', ' ') + sql_string = """SELECT tables."id", schemas."name" AS schema, tables."name" AS table, extras."base", + extras."interval", extras."unit" FROM (SELECT "id", "name", "schema_id" FROM sys.tables WHERE type=4) + AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS schemas ON (tables."schema_id"=schemas."id") + LEFT JOIN (SELECT "table_id", "has_hostname", "base", "interval", "unit" FROM iot.webserverstreams) + AS extras ON (tables."id"=extras."table_id")""".replace('\n', ' ') cursor.execute(sql_string) tables = cursor.fetchall() @@ -72,7 +72,7 @@ def mapi_get_webserver_streams(): return tables, columns except BaseException as ex: add_log(50, ex) - return [], [] + raise def mapi_create_stream(stream): @@ -98,7 +98,7 @@ def mapi_create_stream(stream): "'"])) # get the created table id table_id = str(cursor.fetchall()[0][0]) cursor.execute(''.join(["INSERT INTO iot.webserverstreams VALUES (", table_id, flush_statement, ")"])) - cursor.execute('SELECT id, "name" FROM sys.columns WHERE table_id=' + table_id) + cursor.execute("SELECT id, \"name\" FROM sys.columns WHERE table_id=" + table_id) columns = cursor.fetchall() inserts = [] @@ -114,6 +114,7 @@ def mapi_create_stream(stream): stream.set_delete_ids(table_id, colums_ids) except BaseException as ex: add_log(50, ex) + raise def mapi_delete_stream(schema, stream, stream_id, columns_ids): @@ -125,6 +126,7 @@ def mapi_delete_stream(schema, stream, s Connection.commit() except BaseException as ex: add_log(50, ex) + raise def mapi_flush_baskets(schema, stream, baskets): diff --git a/clients/iotclient/src/Streams/streampolling.py b/clients/iotclient/src/Streams/streampolling.py --- a/clients/iotclient/src/Streams/streampolling.py +++ b/clients/iotclient/src/Streams/streampolling.py @@ -1,7 +1,8 @@ from collections import OrderedDict, defaultdict from jsonschema import Draft4Validator, FormatChecker from datatypes import * -from streams import TupleBasedStream, TimeBasedStream, AutoFlushedStream +from streams import TupleBasedStream, TimeBasedStream, AutoFlushedStream, IMPLICIT_TIMESTAMP_COLUMN_NAME,\ + HOST_IDENTIFIER_COLUMN_NAME from streamscontext import Streams_Context from Settings.iotlogger import add_log from Settings.mapiconnection import mapi_get_webserver_streams @@ -24,7 +25,11 @@ Switcher = [{'types': [UNBOUNDED_TEXT_TY {'types': [TIMESTAMP_WITH_TIMEZONE_TYPE_INTERNAL], 'class': 'TimestampWithTimeZoneType'}, {'types': [URL_TYPE], 'class': 'URLType'}, {'types': [INET_TYPE], 'class': 'INetType'}, - {'types': [UUID_TYPE], 'class': 'UUIDType'}] + {'types': [UUID_TYPE], 'class': 'UUIDType'}, + {'types': [INET6_TYPE], 'class': 'INetSixType'}, + {'types': [MAC_TYPE], 'class': 'MACType'}, + {'types': [REGEX_TYPE], 'class': 'RegexType'}, + {'types': [ENUM_TYPE], 'class': 'EnumType'}] INTEGER_TYPES = SMALL_INTEGERS_TYPES FLOATING_POINT_TYPES = FLOATING_POINT_PRECISION_TYPES + DECIMAL_TYPE @@ -44,104 +49,127 @@ def init_stream_polling_thread(interval) def stream_polling(): - streams = OrderedDict() # dictionary of schema_name + '.' + stream_name -> DataCellStream - # for tables [0] -> id, [1] -> schema, [2] -> name, [3] -> has_hostname, [4] -> base, [5] -> interval, [6] -> unit + retained_streams = [] + new_streams = {} # dictionary of schema_name + '.' + stream_name -> DataCellStream + # for tables [0] -> id, [1] -> schema, [2] -> name, [3] -> base, [4] -> interval, [5] -> unit # FLUSHING_STREAMS = {1: 'TupleBasedStream', 2: 'TimeBasedStream', 3: 'AutoFlushedStream'} Important!!! # SPECIAL_TYPES = {1: 'MACType', 2: 'RegexType', 3: 'EnumType', 4: 'INetSixType'} # for columns [0] -> id, [1] -> table_id, [2] -> name, [3] -> type, [4] -> type_digits, [5] -> type_scale, # [6] -> default, [7] -> is_null, [8] -> special, [9] -> validation1, [10] -> validation2 - tables, columns = mapi_get_webserver_streams() - - grouped_columns = defaultdict(list) + tables, columns = mapi_get_webserver_streams() # TODO check whenever stream's columns are updated + grouped_columns = defaultdict(list) # group the columns to the respective tables for entry in columns: grouped_columns[entry[1]].append(entry) + current_streams = Streams_Context.get_existing_streams() # array of concatenated names + for entry in tables: - retrieved_columns = grouped_columns[entry[0]] - built_columns = {} # dictionary of name -> data_types - for column in retrieved_columns: - kwargs_dic = {'name': column[2], 'default': column[6], 'nullable': column[7]} + try: + next_concatenated_name = Streams_Context.get_context_entry_name(entry[1], entry[2]) + if next_concatenated_name not in current_streams: + retrieved_columns = grouped_columns[entry[0]] + built_columns = {} # dictionary of name -> data_types + valid_table = True + has_timestamp = False + has_hostname = False + for column in retrieved_columns: + if column[2] == IMPLICIT_TIMESTAMP_COLUMN_NAME: + has_timestamp = True + elif column[2] == HOST_IDENTIFIER_COLUMN_NAME: + has_hostname = True + else: + kwargs_dic = {'name': column[2], 'default': column[6], 'nullable': column[7]} + next_switch = column[8] + if next_switch == 1: # MACType + kwargs_dic['type'] = MAC_TYPE + elif next_switch == 2: # RegexType + kwargs_dic['type'] = REGEX_TYPE + kwargs_dic['regex'] = column[9] + elif next_switch == 3: # EnumType + kwargs_dic['type'] = ENUM_TYPE + kwargs_dic['values'] = column[9].split(ENUM_TYPE_SEPARATOR) + elif next_switch == 4: # INetSixType + kwargs_dic['type'] = INET6_TYPE + else: + next_switch = column[3] + kwargs_dic['type'] = next_switch + if next_switch in BOUNDED_TEXT_TYPES: + kwargs_dic['limit'] = column[4] + elif next_switch in INTEGER_TYPES: + if column[6] is not None: + kwargs_dic['default'] = int(column[6]) + if column[10] is not None: + kwargs_dic['minimum'] = int(column[10]) + if column[11] is not None: + kwargs_dic['maximum'] = int(column[11]) + elif next_switch in FLOATING_POINT_TYPES: + if column[6] is not None: + kwargs_dic['default'] = float(column[6]) + if column[10] is not None: + kwargs_dic['minimum'] = float(column[10]) + if column[11] is not None: + kwargs_dic['maximum'] = float(column[11]) + if next_switch == DECIMAL_TYPE: + kwargs_dic['precision'] = column[4] + kwargs_dic['scale'] = column[5] + elif next_switch in DATETIME_TYPES: + if column[10] is not None: + kwargs_dic['minimum'] = column[10] + if column[11] is not None: + kwargs_dic['maximum'] = column[11] - next_switch = column[8] - if next_switch == 1: # MACType - kwargs_dic['type'] = MAC_TYPE - elif next_switch == 2: # RegexType - kwargs_dic['type'] = REGEX_TYPE - kwargs_dic['regex'] = column[9] - elif next_switch == 3: # EnumType - kwargs_dic['type'] = ENUM_TYPE - kwargs_dic['values'] = column[9].split(ENUM_TYPE_SEPARATOR) - elif next_switch == 4: # INetSixType - kwargs_dic['type'] = INET6_TYPE + valid_type = False + for variable in Switcher: # allocate the proper type wrapper + if kwargs_dic['type'] in variable['types']: + reflection_class = globals()[variable['class']] # import everything from datatypes!!! + built_columns[kwargs_dic['name']] = reflection_class(**column) + valid_type = True + break + if not valid_type: + valid_table = False + break + if not valid_table: + continue + properties = OrderedDict() + req_fields = [] + + for key, value in built_columns.iteritems(): + value.add_json_schema_entry(properties) # append new properties entry + if not value.is_nullable() and value.get_default_value() is None: # check if it's required or not + req_fields.append(key) + + json_schema = Draft4Validator({ + "title": "JSON schema to validate inserts in stream " + entry[1] + '.' + entry[2], + "description": "Validate the inserted properties", + "$schema": "http://json-schema.org/draft-04/schema#", + "id": "http://monetdb.com/schemas/iot_create.json", "type": "array", "minItems": 1, + "items": {"type": "object", "properties": properties, "required": req_fields, + "additionalProperties": False} + }, format_checker=FormatChecker()) + + columns_ids = ','.join(map(lambda x: str(x[0]), retrieved_columns)) + + if entry[3] == 1: # TupleBasedStream + new_stream = TupleBasedStream(schema_name=entry[1], stream_name=entry[2], columns=built_columns, + validation_schema=json_schema, has_timestamp=has_timestamp, + has_hostname=has_hostname, table_id=str(entry[0]), + columns_ids=columns_ids, interval=int(entry[4])) + elif entry[3] == 2: # TimeBasedStream + new_stream = TimeBasedStream(schema_name=entry[1], stream_name=entry[2], columns=built_columns, + validation_schema=json_schema, has_timestamp=has_timestamp, + has_hostname=has_hostname, table_id=str(entry[0]), + columns_ids=columns_ids, interval=int(entry[4]), time_unit=entry[5]) + else: # AutoFlushedStream + new_stream = AutoFlushedStream(schema_name=entry[1], stream_name=entry[2], columns=built_columns, + validation_schema=json_schema, has_timestamp=has_timestamp, + has_hostname=has_hostname, table_id=str(entry[0]), + columns_ids=columns_ids) + new_streams[next_concatenated_name] = new_stream else: - next_switch = column[3] - kwargs_dic['type'] = next_switch - if next_switch in BOUNDED_TEXT_TYPES: - kwargs_dic['limit'] = column[4] - elif next_switch in INTEGER_TYPES: - if column[6] is not None: - kwargs_dic['default'] = int(column[6]) - if column[10] is not None: - kwargs_dic['minimum'] = int(column[10]) - if column[11] is not None: - kwargs_dic['maximum'] = int(column[11]) - elif next_switch in FLOATING_POINT_TYPES: - if column[6] is not None: - kwargs_dic['default'] = float(column[6]) - if column[10] is not None: - kwargs_dic['minimum'] = float(column[10]) - if column[11] is not None: - kwargs_dic['maximum'] = float(column[11]) - if next_switch == DECIMAL_TYPE: - kwargs_dic['precision'] = column[4] - kwargs_dic['scale'] = column[5] - elif next_switch in DATETIME_TYPES: - if column[10] is not None: - kwargs_dic['minimum'] = column[10] - if column[11] is not None: - kwargs_dic['maximum'] = column[11] + retained_streams.append(next_concatenated_name) + except BaseException as ex: + add_log(50, ex) + continue - for variable in Switcher: # allocate the proper type wrapper - if kwargs_dic['type'] in variable['types']: - try: - reflection_class = globals()[variable['class']] # import everything from datatypes!!! - built_columns[kwargs_dic['name']] = reflection_class(**column) # pass the json entry as kwargs - except BaseException as ex: - add_log(50, ex) - break - - properties = OrderedDict() - req_fields = [] - - for key, value in built_columns.iteritems(): - value.add_json_schema_entry(properties) # append new properties entry - if not value.is_nullable() and value.get_default_value() is None: # check if it's required or not - req_fields.append(key) - _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list