Changeset: 0dbef0f0ddf6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0dbef0f0ddf6
Modified Files:
clients/iotapi/src/Streams/datatypes.py
clients/iotapi/src/Streams/streams.py
clients/iotapi/src/Streams/streamscontext.py
clients/iotclient/src/Flask/restresources.py
clients/iotclient/src/Settings/mapiconnection.py
clients/iotclient/src/Streams/datatypes.py
clients/iotclient/src/Streams/jsonschemas.py
clients/iotclient/src/Streams/streampolling.py
clients/iotclient/src/Streams/streams.py
clients/iotclient/src/Streams/streamscontext.py
clients/iotclient/src/Streams/streamscreator.py
clients/iotclient/src/Utilities/customthreading.py
sql/backends/monet5/iot/50_iot.sql
Branch: iot
Log Message:
Several bugfixes while reading from the database
diffs (truncated from 1062 to 300 lines):
diff --git a/clients/iotapi/src/Streams/datatypes.py
b/clients/iotapi/src/Streams/datatypes.py
--- a/clients/iotapi/src/Streams/datatypes.py
+++ b/clients/iotapi/src/Streams/datatypes.py
@@ -1,6 +1,7 @@
import struct
from abc import ABCMeta, abstractmethod
+from collections import OrderedDict
from datetime import date, time, datetime
from dateutil.relativedelta import relativedelta
@@ -57,10 +58,8 @@ class StreamDataType(object):
return results
def to_json_representation(self): # get a json representation of the data
type while checking the stream's info
- dic = {'name': self._column_name, 'type': self._data_type, 'nullable':
self._is_nullable}
- if self._default_value is not None:
- dic['default'] = self._default_value
- return dic
+ return OrderedDict((('name', self._column_name), ('type',
self._data_type),
+ ('default', self._default_value), ('nullable',
self._is_nullable)))
class TextType(StreamDataType):
@@ -329,7 +328,7 @@ class TimeType(StreamDataType): # Store
return results
-class TimestampType(StreamDataType): # it's represented with the two integers
from time and date
+class TimestampType(StreamDataType): # It is represented with the two
integers from time and date
"""Covers: TIMESTAMP"""
def __init__(self, *args):
diff --git a/clients/iotapi/src/Streams/streams.py
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -65,8 +65,8 @@ class IOTStream:
return self._stream_name
def get_data_dictionary(self):
- dic = OrderedDict({'schema': self._schema_name, 'stream':
self._stream_name,
- 'columns': [value.to_json_representation() for
value in self._columns.values()]})
+ dic = OrderedDict((('schema', self._schema_name), ('stream',
self._stream_name),
+ ('columns', [value.to_json_representation() for
value in self._columns.values()])))
self._baskets_lock.acquire_read()
count = len(self._baskets)
listing = [{'number': k, 'count': v} for k, v in self._baskets.items()]
@@ -146,4 +146,5 @@ class IOTStream:
keys = results.keys() # TODO check if this is viable for many tuples!!
tuples = [dict(zip(keys, values)) for values in zip(*(results[k] for k
in keys))]
- return {'schema': self._schema_name, 'stream': self._stream_name,
'count': read_tuples, 'tuples': tuples}
+ return OrderedDict((('schema', self._schema_name), ('stream',
self._stream_name), ('count', read_tuples),
+ ('tuples', tuples)))
diff --git a/clients/iotapi/src/Streams/streamscontext.py
b/clients/iotapi/src/Streams/streamscontext.py
--- a/clients/iotapi/src/Streams/streamscontext.py
+++ b/clients/iotapi/src/Streams/streamscontext.py
@@ -1,3 +1,4 @@
+from collections import OrderedDict
from Utilities.readwritelock import RWLock
from WebSockets.websockets import unsubscribe_removed_streams
@@ -39,8 +40,8 @@ class IOTStreams:
def get_streams_data(self):
self._locker.acquire_read()
- res = {'streams_count': len(self._context),
- 'streams_listing': [value.get_data_dictionary() for value in
self._context.values()]}
+ res = OrderedDict((('streams_count', len(self._context)),
+ ('streams_listing', [value.get_data_dictionary()
for value in self._context.values()])))
self._locker.release()
return res
diff --git a/clients/iotclient/src/Flask/restresources.py
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -2,7 +2,7 @@ from datetime import datetime
from flask import request
from flask_restful import Resource
from json import loads
-from jsonschema import Draft4Validator, FormatChecker
+from jsonschema import Draft4Validator, FormatChecker, ValidationError
from pytz import utc
from tzlocal import get_localzone
from Settings.iotlogger import add_log
@@ -30,7 +30,7 @@ class StreamInput(Resource):
stream = get_streams_context().get_existing_stream(schema_name,
stream_name)
except BaseException as ex:
add_log(50, ex)
- return ex, 404
+ return ex.message, 404
return stream.get_data_dictionary(), 200
def post(self, schema_name, stream_name): # add data to a stream
@@ -40,13 +40,13 @@ class StreamInput(Resource):
stream = get_streams_context().get_existing_stream(schema_name,
stream_name)
except BaseException as ex:
add_log(50, ex)
- return ex, 404
+ return ex.message, 404
try: # validate and insert data, if not return 400
stream.validate_and_insert(loads(request.data), current_stamp)
- except BaseException as ex:
+ except (ValidationError, BaseException) as ex:
add_log(50, ex)
- return ex, 400
+ return ex.message, 400
return 'The insertions were made with success!', 201
@@ -71,9 +71,9 @@ class StreamsHandling(Resource):
schema_to_validate = loads(request.data)
Create_Streams_Validator.validate(schema_to_validate)
get_streams_context().add_new_stream(schema_to_validate)
- except BaseException as ex:
+ except (ValidationError, BaseException) as ex:
add_log(50, ex)
- return ex, 400
+ return ex.message, 400
add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.',
schema_to_validate['stream'],
' was created']))
return 'The stream was created with success!', 201
@@ -84,13 +84,13 @@ class StreamsHandling(Resource):
Delete_Streams_Validator.validate(schema_to_validate)
except BaseException as ex:
add_log(50, ex)
- return ex, 400
+ return ex.message, 400
try: # check if stream exists, if not return 404
get_streams_context().delete_existing_stream(schema_to_validate)
except BaseException as ex:
add_log(50, ex)
- return ex, 404
+ return ex.message, 404
add_log(20, ''.join(['The stream ', schema_to_validate['schema'], '.',
schema_to_validate['stream'],
' was deleted']))
return 'The stream was deleted with success!', 204
diff --git a/clients/iotclient/src/Settings/mapiconnection.py
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -26,7 +26,7 @@ def mapi_get_webserver_streams(connectio
extras."interval", extras."unit" FROM (SELECT "id", "name",
"schema_id" FROM sys.tables WHERE type=4)
AS tables INNER JOIN (SELECT "id", "name" FROM sys.schemas) AS
schemas ON (tables."schema_id"=schemas."id")
LEFT JOIN (SELECT "table_id", "base", "interval", "unit" FROM
iot.webserverstreams) AS extras
- ON (tables."id"=extras."table_id")""".replace('\n', ' ')
+ ON (tables."id"=extras."table_id") ORDER BY tables."id"
""".replace('\n', ' ')
cursor.execute(sql_string)
tables = cursor.fetchall()
@@ -34,10 +34,10 @@ def mapi_get_webserver_streams(connectio
sql_string = """SELECT columns."id", columns."table_id",
columns."name" AS column, columns."type",
columns."type_digits", columns."type_scale", columns."default",
columns."null", extras."special",
extras."validation1", extras."validation2" FROM (SELECT "id",
"table_id", "name", "type", "type_digits",
- "type_scale", "default", "null" FROM sys.columns) AS columns INNER
JOIN (SELECT "id" FROM sys.tables
- WHERE type=4) AS tables ON (tables."id"=columns."table_id") LEFT
JOIN (SELECT "column_id", "special",
- "validation1", "validation2" FROM iot.webservercolumns) AS extras
ON (columns."id"=extras."column_id")"""\
- .replace('\n', ' ')
+ "type_scale", "default", "null", "number" FROM sys.columns) AS
columns INNER JOIN (SELECT "id"
+ FROM sys.tables WHERE type=4) AS tables ON
(tables."id"=columns."table_id") LEFT JOIN
+ (SELECT "column_id", "special", "validation1", "validation2" FROM
iot.webservercolumns) AS extras
+ ON (columns."id"=extras."column_id") ORDER BY columns."table_id",
columns."number" """.replace('\n', ' ')
cursor.execute(sql_string)
columns = cursor.fetchall()
@@ -45,11 +45,13 @@ def mapi_get_webserver_streams(connectio
return tables, columns
except BaseException as ex:
add_log(50, ex)
+ connection.rollback()
raise
def mapi_create_stream(connection, concatenated_name, stream):
schema = stream.get_schema_name()
+ table = stream.get_stream_name()
flush_statement = stream.get_webserverstreams_sql_statement()
columns_dictionary = stream.get_columns_extra_sql_statements() #
dictionary of column_name -> partial SQL statement
@@ -58,13 +60,13 @@ def mapi_create_stream(connection, conca
connection.execute("CREATE SCHEMA " + schema)
connection.commit()
except:
- pass
+ connection.rollback()
connection.execute(''.join(["CREATE STREAM TABLE ", concatenated_name,
" (", stream.get_sql_create_statement(),
")"]))
cursor = connection.cursor()
cursor.execute("SELECT id FROM sys.schemas WHERE \"name\"='" + schema
+ "'")
schema_id = str(cursor.fetchall()[0][0])
- cursor.execute(''.join(["SELECT id FROM sys.tables WHERE schema_id=",
schema_id, " AND \"name\"='", stream,
+ cursor.execute(''.join(["SELECT id FROM sys.tables WHERE schema_id=",
schema_id, " AND \"name\"='", table,
"'"])) # get the created table id
table_id = str(cursor.fetchall()[0][0])
cursor.execute(''.join(["INSERT INTO iot.webserverstreams VALUES (",
table_id, flush_statement, ")"]))
@@ -72,18 +74,19 @@ def mapi_create_stream(connection, conca
columns = cursor.fetchall()
inserts = []
- colums_ids = ','.join(map(lambda x: str(x[0]), columns))
+ columns_ids = ','.join(map(lambda x: str(x[0]), columns))
for key, value in columns_dictionary.iteritems():
for entry in columns: # the imp_timestamp and host identifier are
also fetched!!
if entry[1] == key: # check for column's name
- inserts.append(''.join(['(', entry[0], value, ')'])) #
append the sql statement
+ inserts.append(''.join(['(', str(entry[0]), value, ')']))
# append the sql statement
break
cursor.execute("INSERT INTO iot.webservercolumns VALUES " +
','.join(inserts))
connection.commit()
- stream.set_delete_ids(table_id, colums_ids)
+ stream.set_delete_ids(table_id, columns_ids)
except BaseException as ex:
add_log(50, ex)
+ connection.rollback()
raise
@@ -95,6 +98,7 @@ def mapi_delete_stream(connection, conca
connection.commit()
except BaseException as ex:
add_log(50, ex)
+ connection.rollback()
raise
@@ -104,3 +108,4 @@ def mapi_flush_baskets(connection, schem
connection.commit()
except BaseException as ex:
add_log(40, ex)
+ connection.rollback()
diff --git a/clients/iotclient/src/Streams/datatypes.py
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -1,6 +1,7 @@
import struct
from abc import ABCMeta, abstractmethod
+from collections import OrderedDict
from copy import deepcopy
from datetime import datetime, timedelta
from dateutil import parser
@@ -39,7 +40,7 @@ class StreamDataType:
self._data_type = kwargs['type'] # SQL name of the type
self._is_nullable = kwargs.get('nullable', True) # boolean
if 'default' in kwargs and kwargs['default'] is not None:
- self._default_value = self.set_default_value(kwargs['default'])
+ self._default_value = self.process_default_value(kwargs['default'])
else:
self._default_value = None
@@ -50,8 +51,8 @@ class StreamDataType:
def get_nullable_constant(self): # get the nullable constant if the
column is nullable
return None
- def set_default_value(self, default_value): # set the default value
representation in the data type
- self._default_value = default_value
+ def process_default_value(self, default_value): # process the default
value representation in the data type
+ return default_value
def get_default_value(self): # get the default value representation in
the data type
return self._default_value
@@ -88,21 +89,19 @@ class StreamDataType:
return self.pack_parsed_values(extracted_values, counter, parameters)
def to_json_representation(self): # get a json representation of the data
type while checking the stream's info
- json_data = {'name': self._column_name, 'type': self._data_type,
'nullable': self._is_nullable}
- if self._default_value is not None:
- json_data['default'] = self._default_value
- return json_data
+ return OrderedDict((('name', self._column_name), ('type',
self._data_type),
+ ('default', self._default_value), ('nullable',
self._is_nullable)))
def process_sql_parameters(self, array): # get other possible parameters
such as a limit, minimum and maximum
pass
def create_stream_sql(self): # get column creation statement on SQL
array = [self._column_name, " ", self._data_type]
- self.process_sql_parameters(array) # add extra parameters to the SQL
statement
if self._default_value is not None:
array.extend([" DEFAULT '", str(self._default_value), "'"])
if not self._is_nullable:
array.append(" NOT NULL")
+ self.process_sql_parameters(array) # add extra parameters to the SQL
statement
return ''.join(array)
def get_extra_sql_statement(self): # data to iot.webservervalidation
@@ -174,14 +173,14 @@ class RegexType(TextType):
"""Covers: Regex"""
def __init__(self, **kwargs):
- super(RegexType, self).__init__(**kwargs)
self._regex = compile(kwargs['regex'])
self._regex_text = kwargs['regex']
+ super(RegexType, self).__init__(**kwargs)
- def set_default_value(self, default_value):
+ def process_default_value(self, default_value):
if self._regex.match(default_value) is None:
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list