Changeset: d2138293ca3a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d2138293ca3a
Modified Files:
        clients/iotapi/src/Streams/streampolling.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/streampolling.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:

Added stream context merge and type checking while reading created streams from 
the database


diffs (truncated from 565 to 300 lines):

diff --git a/clients/iotapi/src/Streams/streampolling.py 
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -27,10 +27,10 @@ def init_stream_polling_thread(interval)
 # elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type, 
elem[4] is type_digits
 # elem[5] is type_scale elem[6] is default value elem[7] is nullable
 def stream_polling():
-    current_streams = Streams_Context.get_existing_streams()
+    array = fetch_streams()  # TODO check whenever stream's columns are updated
     retained_streams = []
     new_streams = {}
-    array = fetch_streams()  # TODO check whenever stream's columns are updated
+    current_streams = Streams_Context.get_existing_streams()
 
     if array is not None:
         for key, group in groupby(array, lambda x: 
Streams_Context.get_context_entry_name(x[0], x[1])):
@@ -48,5 +48,4 @@ def stream_polling():
             else:
                 retained_streams.append(key)
 
-    retained_streams_final = [key for key in current_streams if key in 
retained_streams]
-    Streams_Context.merge_context(retained_streams_final, new_streams)
+    Streams_Context.merge_context(retained_streams, new_streams)
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -1,5 +1,5 @@
+import pymonetdb
 import sys
-import pymonetdb
 
 from Settings.iotlogger import add_log
 from Streams.streamscontext import IOTStreams
@@ -49,11 +49,11 @@ def mapi_get_webserver_streams():
     try:
         Connection.execute("START TRANSACTION")
         cursor = Connection.cursor()
-        sql_string = """SELECT tables."id", schemas."name" AS schema, 
tables."name" AS table, extras."has_hostname",
-            extras."base", extras."interval", extras."unit" FROM (SELECT "id", 
"name", "schema_id" FROM sys.tables
-            WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" FROM 
sys.schemas) AS schemas ON
-            (tables."schema_id"=schemas."id") LEFT JOIN (SELECT "table_id", 
"has_hostname", "base", "interval", "unit"
-            FROM iot.webserverstreams) AS extras ON 
(tables."id"=extras."table_id")""".replace('\n', ' ')
+        sql_string = """SELECT tables."id", schemas."name" AS schema, 
tables."name" AS table, extras."base",
+            extras."interval", extras."unit" FROM (SELECT "id", "name", 
"schema_id" FROM sys.tables WHERE type=4)
+            AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS 
schemas ON (tables."schema_id"=schemas."id")
+            LEFT JOIN (SELECT "table_id", "has_hostname", "base", "interval", 
"unit" FROM iot.webserverstreams)
+            AS extras ON (tables."id"=extras."table_id")""".replace('\n', ' ')
         cursor.execute(sql_string)
         tables = cursor.fetchall()
 
@@ -72,7 +72,7 @@ def mapi_get_webserver_streams():
         return tables, columns
     except BaseException as ex:
         add_log(50, ex)
-        return [], []
+        raise
 
 
 def mapi_create_stream(stream):
@@ -98,7 +98,7 @@ def mapi_create_stream(stream):
                                 "'"]))  # get the created table id
         table_id = str(cursor.fetchall()[0][0])
         cursor.execute(''.join(["INSERT INTO iot.webserverstreams VALUES (", 
table_id, flush_statement, ")"]))
-        cursor.execute('SELECT id, "name" FROM sys.columns WHERE table_id=' + 
table_id)
+        cursor.execute("SELECT id, \"name\" FROM sys.columns WHERE table_id=" 
+ table_id)
         columns = cursor.fetchall()
 
         inserts = []
@@ -114,6 +114,7 @@ def mapi_create_stream(stream):
         stream.set_delete_ids(table_id, colums_ids)
     except BaseException as ex:
         add_log(50, ex)
+        raise
 
 
 def mapi_delete_stream(schema, stream, stream_id, columns_ids):
@@ -125,6 +126,7 @@ def mapi_delete_stream(schema, stream, s
         Connection.commit()
     except BaseException as ex:
         add_log(50, ex)
+        raise
 
 
 def mapi_flush_baskets(schema, stream, baskets):
diff --git a/clients/iotclient/src/Streams/streampolling.py 
b/clients/iotclient/src/Streams/streampolling.py
--- a/clients/iotclient/src/Streams/streampolling.py
+++ b/clients/iotclient/src/Streams/streampolling.py
@@ -1,7 +1,8 @@
 from collections import OrderedDict, defaultdict
 from jsonschema import Draft4Validator, FormatChecker
 from datatypes import *
-from streams import TupleBasedStream, TimeBasedStream, AutoFlushedStream
+from streams import TupleBasedStream, TimeBasedStream, AutoFlushedStream, 
IMPLICIT_TIMESTAMP_COLUMN_NAME,\
+    HOST_IDENTIFIER_COLUMN_NAME
 from streamscontext import Streams_Context
 from Settings.iotlogger import add_log
 from Settings.mapiconnection import mapi_get_webserver_streams
@@ -24,7 +25,11 @@ Switcher = [{'types': [UNBOUNDED_TEXT_TY
             {'types': [TIMESTAMP_WITH_TIMEZONE_TYPE_INTERNAL], 'class': 
'TimestampWithTimeZoneType'},
             {'types': [URL_TYPE], 'class': 'URLType'},
             {'types': [INET_TYPE], 'class': 'INetType'},
-            {'types': [UUID_TYPE], 'class': 'UUIDType'}]
+            {'types': [UUID_TYPE], 'class': 'UUIDType'},
+            {'types': [INET6_TYPE], 'class': 'INetSixType'},
+            {'types': [MAC_TYPE], 'class': 'MACType'},
+            {'types': [REGEX_TYPE], 'class': 'RegexType'},
+            {'types': [ENUM_TYPE], 'class': 'EnumType'}]
 
 INTEGER_TYPES = SMALL_INTEGERS_TYPES
 FLOATING_POINT_TYPES = FLOATING_POINT_PRECISION_TYPES + DECIMAL_TYPE
@@ -44,104 +49,127 @@ def init_stream_polling_thread(interval)
 
 
 def stream_polling():
-    streams = OrderedDict()  # dictionary of schema_name + '.' + stream_name 
-> DataCellStream
-    # for tables [0] -> id, [1] -> schema, [2] -> name, [3] -> has_hostname, 
[4] -> base, [5] -> interval, [6] -> unit
+    retained_streams = []
+    new_streams = {}  # dictionary of schema_name + '.' + stream_name -> 
DataCellStream
+    # for tables [0] -> id, [1] -> schema, [2] -> name, [3] -> base, [4] -> 
interval, [5] -> unit
     # FLUSHING_STREAMS = {1: 'TupleBasedStream', 2: 'TimeBasedStream', 3: 
'AutoFlushedStream'}  Important!!!
     # SPECIAL_TYPES = {1: 'MACType', 2: 'RegexType', 3: 'EnumType', 4: 
'INetSixType'}
 
     # for columns [0] -> id, [1] -> table_id, [2] -> name, [3] -> type, [4] -> 
type_digits, [5] -> type_scale,
     # [6] -> default, [7] -> is_null, [8] -> special, [9] -> validation1, [10] 
-> validation2
-    tables, columns = mapi_get_webserver_streams()
-
-    grouped_columns = defaultdict(list)
+    tables, columns = mapi_get_webserver_streams()  # TODO check whenever 
stream's columns are updated
+    grouped_columns = defaultdict(list)  # group the columns to the respective 
tables
     for entry in columns:
         grouped_columns[entry[1]].append(entry)
 
+    current_streams = Streams_Context.get_existing_streams()  # array of 
concatenated names
+
     for entry in tables:
-        retrieved_columns = grouped_columns[entry[0]]
-        built_columns = {}  # dictionary of name -> data_types
-        for column in retrieved_columns:
-            kwargs_dic = {'name': column[2], 'default': column[6], 'nullable': 
column[7]}
+        try:
+            next_concatenated_name = 
Streams_Context.get_context_entry_name(entry[1], entry[2])
+            if next_concatenated_name not in current_streams:
+                retrieved_columns = grouped_columns[entry[0]]
+                built_columns = {}  # dictionary of name -> data_types
+                valid_table = True
+                has_timestamp = False
+                has_hostname = False
+                for column in retrieved_columns:
+                    if column[2] == IMPLICIT_TIMESTAMP_COLUMN_NAME:
+                        has_timestamp = True
+                    elif column[2] == HOST_IDENTIFIER_COLUMN_NAME:
+                        has_hostname = True
+                    else:
+                        kwargs_dic = {'name': column[2], 'default': column[6], 
'nullable': column[7]}
+                        next_switch = column[8]
+                        if next_switch == 1:  # MACType
+                            kwargs_dic['type'] = MAC_TYPE
+                        elif next_switch == 2:  # RegexType
+                            kwargs_dic['type'] = REGEX_TYPE
+                            kwargs_dic['regex'] = column[9]
+                        elif next_switch == 3:  # EnumType
+                            kwargs_dic['type'] = ENUM_TYPE
+                            kwargs_dic['values'] = 
column[9].split(ENUM_TYPE_SEPARATOR)
+                        elif next_switch == 4:  # INetSixType
+                            kwargs_dic['type'] = INET6_TYPE
+                        else:
+                            next_switch = column[3]
+                            kwargs_dic['type'] = next_switch
+                            if next_switch in BOUNDED_TEXT_TYPES:
+                                kwargs_dic['limit'] = column[4]
+                            elif next_switch in INTEGER_TYPES:
+                                if column[6] is not None:
+                                    kwargs_dic['default'] = int(column[6])
+                                if column[10] is not None:
+                                    kwargs_dic['minimum'] = int(column[10])
+                                if column[11] is not None:
+                                    kwargs_dic['maximum'] = int(column[11])
+                            elif next_switch in FLOATING_POINT_TYPES:
+                                if column[6] is not None:
+                                    kwargs_dic['default'] = float(column[6])
+                                if column[10] is not None:
+                                    kwargs_dic['minimum'] = float(column[10])
+                                if column[11] is not None:
+                                    kwargs_dic['maximum'] = float(column[11])
+                                if next_switch == DECIMAL_TYPE:
+                                    kwargs_dic['precision'] = column[4]
+                                    kwargs_dic['scale'] = column[5]
+                            elif next_switch in DATETIME_TYPES:
+                                if column[10] is not None:
+                                    kwargs_dic['minimum'] = column[10]
+                                if column[11] is not None:
+                                    kwargs_dic['maximum'] = column[11]
 
-            next_switch = column[8]
-            if next_switch == 1:  # MACType
-                kwargs_dic['type'] = MAC_TYPE
-            elif next_switch == 2:  # RegexType
-                kwargs_dic['type'] = REGEX_TYPE
-                kwargs_dic['regex'] = column[9]
-            elif next_switch == 3:  # EnumType
-                kwargs_dic['type'] = ENUM_TYPE
-                kwargs_dic['values'] = column[9].split(ENUM_TYPE_SEPARATOR)
-            elif next_switch == 4:  # INetSixType
-                kwargs_dic['type'] = INET6_TYPE
+                        valid_type = False
+                        for variable in Switcher:  # allocate the proper type 
wrapper
+                            if kwargs_dic['type'] in variable['types']:
+                                reflection_class = 
globals()[variable['class']]  # import everything from datatypes!!!
+                                built_columns[kwargs_dic['name']] = 
reflection_class(**column)
+                                valid_type = True
+                                break
+                        if not valid_type:
+                            valid_table = False
+                            break
+                if not valid_table:
+                    continue
+                properties = OrderedDict()
+                req_fields = []
+
+                for key, value in built_columns.iteritems():
+                    value.add_json_schema_entry(properties)  # append new 
properties entry
+                    if not value.is_nullable() and value.get_default_value() 
is None:  # check if it's required or not
+                        req_fields.append(key)
+
+                json_schema = Draft4Validator({
+                    "title": "JSON schema to validate inserts in stream " + 
entry[1] + '.' + entry[2],
+                    "description": "Validate the inserted properties",
+                    "$schema": "http://json-schema.org/draft-04/schema#";,
+                    "id": "http://monetdb.com/schemas/iot_create.json";, 
"type": "array", "minItems": 1,
+                    "items": {"type": "object", "properties": properties, 
"required": req_fields,
+                              "additionalProperties": False}
+                }, format_checker=FormatChecker())
+
+                columns_ids = ','.join(map(lambda x: str(x[0]), 
retrieved_columns))
+
+                if entry[3] == 1:  # TupleBasedStream
+                    new_stream = TupleBasedStream(schema_name=entry[1], 
stream_name=entry[2], columns=built_columns,
+                                                  
validation_schema=json_schema, has_timestamp=has_timestamp,
+                                                  has_hostname=has_hostname, 
table_id=str(entry[0]),
+                                                  columns_ids=columns_ids, 
interval=int(entry[4]))
+                elif entry[3] == 2:  # TimeBasedStream
+                    new_stream = TimeBasedStream(schema_name=entry[1], 
stream_name=entry[2], columns=built_columns,
+                                                 
validation_schema=json_schema, has_timestamp=has_timestamp,
+                                                 has_hostname=has_hostname, 
table_id=str(entry[0]),
+                                                 columns_ids=columns_ids, 
interval=int(entry[4]), time_unit=entry[5])
+                else:  # AutoFlushedStream
+                    new_stream = AutoFlushedStream(schema_name=entry[1], 
stream_name=entry[2], columns=built_columns,
+                                                   
validation_schema=json_schema, has_timestamp=has_timestamp,
+                                                   has_hostname=has_hostname, 
table_id=str(entry[0]),
+                                                   columns_ids=columns_ids)
+                new_streams[next_concatenated_name] = new_stream
             else:
-                next_switch = column[3]
-                kwargs_dic['type'] = next_switch
-                if next_switch in BOUNDED_TEXT_TYPES:
-                    kwargs_dic['limit'] = column[4]
-                elif next_switch in INTEGER_TYPES:
-                    if column[6] is not None:
-                        kwargs_dic['default'] = int(column[6])
-                    if column[10] is not None:
-                        kwargs_dic['minimum'] = int(column[10])
-                    if column[11] is not None:
-                        kwargs_dic['maximum'] = int(column[11])
-                elif next_switch in FLOATING_POINT_TYPES:
-                    if column[6] is not None:
-                        kwargs_dic['default'] = float(column[6])
-                    if column[10] is not None:
-                        kwargs_dic['minimum'] = float(column[10])
-                    if column[11] is not None:
-                        kwargs_dic['maximum'] = float(column[11])
-                    if next_switch == DECIMAL_TYPE:
-                        kwargs_dic['precision'] = column[4]
-                        kwargs_dic['scale'] = column[5]
-                elif next_switch in DATETIME_TYPES:
-                    if column[10] is not None:
-                        kwargs_dic['minimum'] = column[10]
-                    if column[11] is not None:
-                        kwargs_dic['maximum'] = column[11]
+                retained_streams.append(next_concatenated_name)
+        except BaseException as ex:
+            add_log(50, ex)
+            continue
 
-            for variable in Switcher:  # allocate the proper type wrapper
-                if kwargs_dic['type'] in variable['types']:
-                    try:
-                        reflection_class = globals()[variable['class']]  # 
import everything from datatypes!!!
-                        built_columns[kwargs_dic['name']] = 
reflection_class(**column)  # pass the json entry as kwargs
-                    except BaseException as ex:
-                        add_log(50, ex)
-                    break
-
-        properties = OrderedDict()
-        req_fields = []
-
-        for key, value in built_columns.iteritems():
-            value.add_json_schema_entry(properties)  # append new properties 
entry
-            if not value.is_nullable() and value.get_default_value() is None:  
# check if it's required or not
-                req_fields.append(key)
-
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to