Changeset: d2138293ca3a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d2138293ca3a
Modified Files:
clients/iotapi/src/Streams/streampolling.py
clients/iotclient/src/Settings/mapiconnection.py
clients/iotclient/src/Streams/streampolling.py
clients/iotclient/src/Streams/streams.py
clients/iotclient/src/Streams/streamscontext.py
clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:
Added stream context merge and type checking while reading created streams from
the database
diffs (truncated from 565 to 300 lines):
diff --git a/clients/iotapi/src/Streams/streampolling.py
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -27,10 +27,10 @@ def init_stream_polling_thread(interval)
# elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type,
elem[4] is type_digits
# elem[5] is type_scale elem[6] is default value elem[7] is nullable
def stream_polling():
- current_streams = Streams_Context.get_existing_streams()
+ array = fetch_streams() # TODO check whenever stream's columns are updated
retained_streams = []
new_streams = {}
- array = fetch_streams() # TODO check whenever stream's columns are updated
+ current_streams = Streams_Context.get_existing_streams()
if array is not None:
for key, group in groupby(array, lambda x:
Streams_Context.get_context_entry_name(x[0], x[1])):
@@ -48,5 +48,4 @@ def stream_polling():
else:
retained_streams.append(key)
- retained_streams_final = [key for key in current_streams if key in
retained_streams]
- Streams_Context.merge_context(retained_streams_final, new_streams)
+ Streams_Context.merge_context(retained_streams, new_streams)
diff --git a/clients/iotclient/src/Settings/mapiconnection.py
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -1,5 +1,5 @@
+import pymonetdb
import sys
-import pymonetdb
from Settings.iotlogger import add_log
from Streams.streamscontext import IOTStreams
@@ -49,11 +49,11 @@ def mapi_get_webserver_streams():
try:
Connection.execute("START TRANSACTION")
cursor = Connection.cursor()
- sql_string = """SELECT tables."id", schemas."name" AS schema,
tables."name" AS table, extras."has_hostname",
- extras."base", extras."interval", extras."unit" FROM (SELECT "id",
"name", "schema_id" FROM sys.tables
- WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" FROM
sys.schemas) AS schemas ON
- (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id",
"has_hostname", "base", "interval", "unit"
- FROM iot.webserverstreams) AS extras ON
(tables."id"=extras."table_id")""".replace('\n', ' ')
+ sql_string = """SELECT tables."id", schemas."name" AS schema,
tables."name" AS table, extras."base",
+ extras."interval", extras."unit" FROM (SELECT "id", "name",
"schema_id" FROM sys.tables WHERE type=4)
+ AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS
schemas ON (tables."schema_id"=schemas."id")
+ LEFT JOIN (SELECT "table_id", "has_hostname", "base", "interval",
"unit" FROM iot.webserverstreams)
+ AS extras ON (tables."id"=extras."table_id")""".replace('\n', ' ')
cursor.execute(sql_string)
tables = cursor.fetchall()
@@ -72,7 +72,7 @@ def mapi_get_webserver_streams():
return tables, columns
except BaseException as ex:
add_log(50, ex)
- return [], []
+ raise
def mapi_create_stream(stream):
@@ -98,7 +98,7 @@ def mapi_create_stream(stream):
"'"])) # get the created table id
table_id = str(cursor.fetchall()[0][0])
cursor.execute(''.join(["INSERT INTO iot.webserverstreams VALUES (",
table_id, flush_statement, ")"]))
- cursor.execute('SELECT id, "name" FROM sys.columns WHERE table_id=' +
table_id)
+ cursor.execute("SELECT id, \"name\" FROM sys.columns WHERE table_id="
+ table_id)
columns = cursor.fetchall()
inserts = []
@@ -114,6 +114,7 @@ def mapi_create_stream(stream):
stream.set_delete_ids(table_id, colums_ids)
except BaseException as ex:
add_log(50, ex)
+ raise
def mapi_delete_stream(schema, stream, stream_id, columns_ids):
@@ -125,6 +126,7 @@ def mapi_delete_stream(schema, stream, s
Connection.commit()
except BaseException as ex:
add_log(50, ex)
+ raise
def mapi_flush_baskets(schema, stream, baskets):
diff --git a/clients/iotclient/src/Streams/streampolling.py
b/clients/iotclient/src/Streams/streampolling.py
--- a/clients/iotclient/src/Streams/streampolling.py
+++ b/clients/iotclient/src/Streams/streampolling.py
@@ -1,7 +1,8 @@
from collections import OrderedDict, defaultdict
from jsonschema import Draft4Validator, FormatChecker
from datatypes import *
-from streams import TupleBasedStream, TimeBasedStream, AutoFlushedStream
+from streams import TupleBasedStream, TimeBasedStream, AutoFlushedStream,
IMPLICIT_TIMESTAMP_COLUMN_NAME,\
+ HOST_IDENTIFIER_COLUMN_NAME
from streamscontext import Streams_Context
from Settings.iotlogger import add_log
from Settings.mapiconnection import mapi_get_webserver_streams
@@ -24,7 +25,11 @@ Switcher = [{'types': [UNBOUNDED_TEXT_TY
{'types': [TIMESTAMP_WITH_TIMEZONE_TYPE_INTERNAL], 'class':
'TimestampWithTimeZoneType'},
{'types': [URL_TYPE], 'class': 'URLType'},
{'types': [INET_TYPE], 'class': 'INetType'},
- {'types': [UUID_TYPE], 'class': 'UUIDType'}]
+ {'types': [UUID_TYPE], 'class': 'UUIDType'},
+ {'types': [INET6_TYPE], 'class': 'INetSixType'},
+ {'types': [MAC_TYPE], 'class': 'MACType'},
+ {'types': [REGEX_TYPE], 'class': 'RegexType'},
+ {'types': [ENUM_TYPE], 'class': 'EnumType'}]
INTEGER_TYPES = SMALL_INTEGERS_TYPES
FLOATING_POINT_TYPES = FLOATING_POINT_PRECISION_TYPES + DECIMAL_TYPE
@@ -44,104 +49,127 @@ def init_stream_polling_thread(interval)
def stream_polling():
- streams = OrderedDict() # dictionary of schema_name + '.' + stream_name
-> DataCellStream
- # for tables [0] -> id, [1] -> schema, [2] -> name, [3] -> has_hostname,
[4] -> base, [5] -> interval, [6] -> unit
+ retained_streams = []
+ new_streams = {} # dictionary of schema_name + '.' + stream_name ->
DataCellStream
+ # for tables [0] -> id, [1] -> schema, [2] -> name, [3] -> base, [4] ->
interval, [5] -> unit
# FLUSHING_STREAMS = {1: 'TupleBasedStream', 2: 'TimeBasedStream', 3:
'AutoFlushedStream'} Important!!!
# SPECIAL_TYPES = {1: 'MACType', 2: 'RegexType', 3: 'EnumType', 4:
'INetSixType'}
# for columns [0] -> id, [1] -> table_id, [2] -> name, [3] -> type, [4] ->
type_digits, [5] -> type_scale,
# [6] -> default, [7] -> is_null, [8] -> special, [9] -> validation1, [10]
-> validation2
- tables, columns = mapi_get_webserver_streams()
-
- grouped_columns = defaultdict(list)
+ tables, columns = mapi_get_webserver_streams() # TODO check whenever
stream's columns are updated
+ grouped_columns = defaultdict(list) # group the columns to the respective
tables
for entry in columns:
grouped_columns[entry[1]].append(entry)
+ current_streams = Streams_Context.get_existing_streams() # array of
concatenated names
+
for entry in tables:
- retrieved_columns = grouped_columns[entry[0]]
- built_columns = {} # dictionary of name -> data_types
- for column in retrieved_columns:
- kwargs_dic = {'name': column[2], 'default': column[6], 'nullable':
column[7]}
+ try:
+ next_concatenated_name =
Streams_Context.get_context_entry_name(entry[1], entry[2])
+ if next_concatenated_name not in current_streams:
+ retrieved_columns = grouped_columns[entry[0]]
+ built_columns = {} # dictionary of name -> data_types
+ valid_table = True
+ has_timestamp = False
+ has_hostname = False
+ for column in retrieved_columns:
+ if column[2] == IMPLICIT_TIMESTAMP_COLUMN_NAME:
+ has_timestamp = True
+ elif column[2] == HOST_IDENTIFIER_COLUMN_NAME:
+ has_hostname = True
+ else:
+ kwargs_dic = {'name': column[2], 'default': column[6],
'nullable': column[7]}
+ next_switch = column[8]
+ if next_switch == 1: # MACType
+ kwargs_dic['type'] = MAC_TYPE
+ elif next_switch == 2: # RegexType
+ kwargs_dic['type'] = REGEX_TYPE
+ kwargs_dic['regex'] = column[9]
+ elif next_switch == 3: # EnumType
+ kwargs_dic['type'] = ENUM_TYPE
+ kwargs_dic['values'] =
column[9].split(ENUM_TYPE_SEPARATOR)
+ elif next_switch == 4: # INetSixType
+ kwargs_dic['type'] = INET6_TYPE
+ else:
+ next_switch = column[3]
+ kwargs_dic['type'] = next_switch
+ if next_switch in BOUNDED_TEXT_TYPES:
+ kwargs_dic['limit'] = column[4]
+ elif next_switch in INTEGER_TYPES:
+ if column[6] is not None:
+ kwargs_dic['default'] = int(column[6])
+ if column[10] is not None:
+ kwargs_dic['minimum'] = int(column[10])
+ if column[11] is not None:
+ kwargs_dic['maximum'] = int(column[11])
+ elif next_switch in FLOATING_POINT_TYPES:
+ if column[6] is not None:
+ kwargs_dic['default'] = float(column[6])
+ if column[10] is not None:
+ kwargs_dic['minimum'] = float(column[10])
+ if column[11] is not None:
+ kwargs_dic['maximum'] = float(column[11])
+ if next_switch == DECIMAL_TYPE:
+ kwargs_dic['precision'] = column[4]
+ kwargs_dic['scale'] = column[5]
+ elif next_switch in DATETIME_TYPES:
+ if column[10] is not None:
+ kwargs_dic['minimum'] = column[10]
+ if column[11] is not None:
+ kwargs_dic['maximum'] = column[11]
- next_switch = column[8]
- if next_switch == 1: # MACType
- kwargs_dic['type'] = MAC_TYPE
- elif next_switch == 2: # RegexType
- kwargs_dic['type'] = REGEX_TYPE
- kwargs_dic['regex'] = column[9]
- elif next_switch == 3: # EnumType
- kwargs_dic['type'] = ENUM_TYPE
- kwargs_dic['values'] = column[9].split(ENUM_TYPE_SEPARATOR)
- elif next_switch == 4: # INetSixType
- kwargs_dic['type'] = INET6_TYPE
+ valid_type = False
+ for variable in Switcher: # allocate the proper type
wrapper
+ if kwargs_dic['type'] in variable['types']:
+ reflection_class =
globals()[variable['class']] # import everything from datatypes!!!
+ built_columns[kwargs_dic['name']] =
reflection_class(**column)
+ valid_type = True
+ break
+ if not valid_type:
+ valid_table = False
+ break
+ if not valid_table:
+ continue
+ properties = OrderedDict()
+ req_fields = []
+
+ for key, value in built_columns.iteritems():
+ value.add_json_schema_entry(properties) # append new
properties entry
+ if not value.is_nullable() and value.get_default_value()
is None: # check if it's required or not
+ req_fields.append(key)
+
+ json_schema = Draft4Validator({
+ "title": "JSON schema to validate inserts in stream " +
entry[1] + '.' + entry[2],
+ "description": "Validate the inserted properties",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "id": "http://monetdb.com/schemas/iot_create.json",
"type": "array", "minItems": 1,
+ "items": {"type": "object", "properties": properties,
"required": req_fields,
+ "additionalProperties": False}
+ }, format_checker=FormatChecker())
+
+ columns_ids = ','.join(map(lambda x: str(x[0]),
retrieved_columns))
+
+ if entry[3] == 1: # TupleBasedStream
+ new_stream = TupleBasedStream(schema_name=entry[1],
stream_name=entry[2], columns=built_columns,
+
validation_schema=json_schema, has_timestamp=has_timestamp,
+ has_hostname=has_hostname,
table_id=str(entry[0]),
+ columns_ids=columns_ids,
interval=int(entry[4]))
+ elif entry[3] == 2: # TimeBasedStream
+ new_stream = TimeBasedStream(schema_name=entry[1],
stream_name=entry[2], columns=built_columns,
+
validation_schema=json_schema, has_timestamp=has_timestamp,
+ has_hostname=has_hostname,
table_id=str(entry[0]),
+ columns_ids=columns_ids,
interval=int(entry[4]), time_unit=entry[5])
+ else: # AutoFlushedStream
+ new_stream = AutoFlushedStream(schema_name=entry[1],
stream_name=entry[2], columns=built_columns,
+
validation_schema=json_schema, has_timestamp=has_timestamp,
+ has_hostname=has_hostname,
table_id=str(entry[0]),
+ columns_ids=columns_ids)
+ new_streams[next_concatenated_name] = new_stream
else:
- next_switch = column[3]
- kwargs_dic['type'] = next_switch
- if next_switch in BOUNDED_TEXT_TYPES:
- kwargs_dic['limit'] = column[4]
- elif next_switch in INTEGER_TYPES:
- if column[6] is not None:
- kwargs_dic['default'] = int(column[6])
- if column[10] is not None:
- kwargs_dic['minimum'] = int(column[10])
- if column[11] is not None:
- kwargs_dic['maximum'] = int(column[11])
- elif next_switch in FLOATING_POINT_TYPES:
- if column[6] is not None:
- kwargs_dic['default'] = float(column[6])
- if column[10] is not None:
- kwargs_dic['minimum'] = float(column[10])
- if column[11] is not None:
- kwargs_dic['maximum'] = float(column[11])
- if next_switch == DECIMAL_TYPE:
- kwargs_dic['precision'] = column[4]
- kwargs_dic['scale'] = column[5]
- elif next_switch in DATETIME_TYPES:
- if column[10] is not None:
- kwargs_dic['minimum'] = column[10]
- if column[11] is not None:
- kwargs_dic['maximum'] = column[11]
+ retained_streams.append(next_concatenated_name)
+ except BaseException as ex:
+ add_log(50, ex)
+ continue
- for variable in Switcher: # allocate the proper type wrapper
- if kwargs_dic['type'] in variable['types']:
- try:
- reflection_class = globals()[variable['class']] #
import everything from datatypes!!!
- built_columns[kwargs_dic['name']] =
reflection_class(**column) # pass the json entry as kwargs
- except BaseException as ex:
- add_log(50, ex)
- break
-
- properties = OrderedDict()
- req_fields = []
-
- for key, value in built_columns.iteritems():
- value.add_json_schema_entry(properties) # append new properties
entry
- if not value.is_nullable() and value.get_default_value() is None:
# check if it's required or not
- req_fields.append(key)
-
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list