Changeset: 6781d09b7830 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6781d09b7830 Added Files: clients/iotclient/src/Streams/streamscreator.py Removed Files: clients/iotclient/src/Streams/semanticvalidation.py Modified Files: clients/iotclient/requirements.txt clients/iotclient/src/Flask/app.py clients/iotclient/src/Flask/restresources.py clients/iotclient/src/Settings/filesystem.py clients/iotclient/src/Settings/iotlogger.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/flushing.py clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Utilities/filecreator.py clients/iotclient/src/main.py Branch: iot Log Message:
Added stream information serialization using a JSON config file diffs (truncated from 710 to 300 lines): diff --git a/clients/iotclient/requirements.txt b/clients/iotclient/requirements.txt --- a/clients/iotclient/requirements.txt +++ b/clients/iotclient/requirements.txt @@ -4,5 +4,5 @@ rfc3987==1.3.5 strict-rfc3339==0.6 python-dateutil==2.5.2 pytz==2016.3 -pymonetdb==1.0 +pymonetdb==0.1.1 tzlocal==1.2.2 diff --git a/clients/iotclient/src/Flask/app.py b/clients/iotclient/src/Flask/app.py --- a/clients/iotclient/src/Flask/app.py +++ b/clients/iotclient/src/Flask/app.py @@ -1,7 +1,7 @@ from flask import Flask from flask_restful import Api -from restresources import StreamInput, StreamsInfo, StreamsHandling +from restresources import StreamInput, StreamsInfo, StreamsHandling # , ServerHandler def start_flask_iot_app(host, port): @@ -21,4 +21,5 @@ def start_flask_admin_app(host, port): admin_api.add_resource(StreamsInfo, '/streams') admin_api.add_resource(StreamsHandling, '/context') + # admin_api.add_resource(ServerHandler, '/server') admin_app.run(host=host, port=port, threaded=True) diff --git a/clients/iotclient/src/Flask/restresources.py b/clients/iotclient/src/Flask/restresources.py --- a/clients/iotclient/src/Flask/restresources.py +++ b/clients/iotclient/src/Flask/restresources.py @@ -9,8 +9,18 @@ from tzlocal import get_localzone from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA from Streams.streamscontext import IOTStreamsException, IOTStreams -Stream_context = IOTStreams() -local_tz = get_localzone() # for the correction of dates we must add the system's timezone +Streams_Context = None +Create_Streams_Validator = None +Delete_Streams_Validator = None +Local_Timezone = None + + +def init_rest_resources(): + global Streams_Context, Create_Streams_Validator, Delete_Streams_Validator, Local_Timezone + Local_Timezone = get_localzone() # for the correction of dates we must add the system's timezone + Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, format_checker=FormatChecker()) + Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA, format_checker=FormatChecker()) + Streams_Context = IOTStreams() class StreamInput(Resource): @@ -18,22 +28,22 @@ class StreamInput(Resource): def get(self, schema_name, stream_name): # check a single stream data try: # check if stream exists, if not return 404 - stream = Stream_context.get_existing_stream(schema_name, stream_name) - except IOTStreamsException as ex: + stream = Streams_Context.get_existing_stream(schema_name, stream_name) + except BaseException as ex: return ex.message, 404 - return stream.get_data_dictionary(), 200 + return stream.get_data_dictionary(include_number_tuples=True), 200 def post(self, schema_name, stream_name): # add data to a stream - current_stamp = datetime.datetime.now(pytz.utc).astimezone(local_tz).isoformat() + current_stamp = datetime.datetime.now(pytz.utc).astimezone(Local_Timezone).isoformat() try: # check if stream exists, if not return 404 - stream = Stream_context.get_existing_stream(schema_name, stream_name) - except IOTStreamsException as ex: + stream = Streams_Context.get_existing_stream(schema_name, stream_name) + except BaseException as ex: return ex.message, 404 try: # validate and insert data, if not return 400 stream.validate_and_insert(json.loads(request.data), current_stamp) - except Exception as ex: + except BaseException as ex: return ex.message, 400 return '', 201 # all ok, return 201 @@ -42,24 +52,21 @@ class StreamsInfo(Resource): """Collect all streams information""" def get(self): # get all streams data - return Stream_context.get_streams_data(), 200 + return Streams_Context.get_streams_data(), 200 class StreamsHandling(Resource): """Admin class for creating/deleting streams""" - CREATE_STREAMS_VALIDATOR = Draft4Validator(CREATE_STREAMS_SCHEMA, format_checker=FormatChecker()) - DELETE_STREAMS_VALIDATOR = Draft4Validator(DELETE_STREAMS_SCHEMA, format_checker=FormatChecker()) - def __init__(self): super(StreamsHandling, self).__init__() def post(self): try: schema_to_validate = json.loads(request.data) - StreamsHandling.CREATE_STREAMS_VALIDATOR.validate(schema_to_validate) - Stream_context.add_new_stream(schema_to_validate) - except Exception as ex: + Create_Streams_Validator.validate(schema_to_validate) + Streams_Context.add_new_stream(schema_to_validate) + except BaseException as ex: return ex.message, 400 else: return '', 201 @@ -67,12 +74,12 @@ class StreamsHandling(Resource): def delete(self): try: schema_to_validate = json.loads(request.data) - StreamsHandling.DELETE_STREAMS_VALIDATOR.validate(schema_to_validate) - except Exception as ex: + Delete_Streams_Validator.validate(schema_to_validate) + except BaseException as ex: return ex.message, 400 try: # check if stream exists, if not return 404 - Stream_context.delete_existing_stream(schema_to_validate) - except IOTStreamsException as ex: + Streams_Context.delete_existing_stream(schema_to_validate) + except BaseException as ex: return ex.message, 404 return '', 204 diff --git a/clients/iotclient/src/Settings/filesystem.py b/clients/iotclient/src/Settings/filesystem.py --- a/clients/iotclient/src/Settings/filesystem.py +++ b/clients/iotclient/src/Settings/filesystem.py @@ -1,9 +1,12 @@ import os import sys -baskets_base_location = None +from Utilities.filecreator import create_file_if_not_exists + BASKETS_BASE_DIRECTORY = "baskets" - +CONFIG_FILE_DEFAULT_NAME = "config.json" +Baskets_Base_Location = None +Config_File_Location = None if sys.platform in ("linux", "linux2", "darwin"): filesystem_location = '/etc/iotcollector' @@ -13,29 +16,30 @@ elif sys.platform == "win32": def set_filesystem_location(new_location): global filesystem_location + filesystem_location = new_location + + +def init_file_system(new_configfile_location=None): + global Baskets_Base_Location, Config_File_Location try: - if os.path.isdir(new_location): - filesystem_location = new_location + Baskets_Base_Location = os.path.join(filesystem_location, BASKETS_BASE_DIRECTORY) + if not os.path.exists(Baskets_Base_Location): + os.makedirs(Baskets_Base_Location) + + if new_configfile_location is not None: + Config_File_Location = create_file_if_not_exists(new_configfile_location, hidden=False, init_text='[]') else: - print >> sys.stderr, "The provided filesystem doesn't exist!" - sys.exit(1) + Config_File_Location = create_file_if_not_exists( + os.path.join(filesystem_location, CONFIG_FILE_DEFAULT_NAME), hidden=False, init_text='[]') except (Exception, OSError) as ex: print >> sys.stderr, ex sys.exit(1) -def init_file_system(): - global baskets_base_location +def get_baskets_base_location(): + return Baskets_Base_Location - try: - baskets_base_location = os.path.join(filesystem_location, BASKETS_BASE_DIRECTORY) - if not os.path.exists(baskets_base_location): - os.makedirs(baskets_base_location) - except (Exception, OSError) as ex: - print >> sys.stderr, ex - sys.exit(1) - -def get_baskets_base_location(): - return baskets_base_location +def get_configfile_location(): + return Config_File_Location diff --git a/clients/iotclient/src/Settings/iotlogger.py b/clients/iotclient/src/Settings/iotlogger.py --- a/clients/iotclient/src/Settings/iotlogger.py +++ b/clients/iotclient/src/Settings/iotlogger.py @@ -32,3 +32,7 @@ def init_logging(): except (Exception, OSError) as ex: print >> sys.stderr, ex sys.exit(1) + + +def add_log(lvl, message, *args, **kwargs): + logger.log(lvl, message, *args, **kwargs) diff --git a/clients/iotclient/src/Settings/mapiconnection.py b/clients/iotclient/src/Settings/mapiconnection.py --- a/clients/iotclient/src/Settings/mapiconnection.py +++ b/clients/iotclient/src/Settings/mapiconnection.py @@ -10,11 +10,11 @@ def init_monetdb_connection(hostname, po user_password = getpass.getpass(prompt='User password:') - try: + try: # the autocommit is set to true so each statement will be independent Connection = pymonetdb.connect(hostname=hostname, port=port, username=user_name, password=user_password, - database=database) - print >> sys.stdout, 'User %s connection successful to the database %s' % (user_name, database) - except Exception as ex: + database=database, autocommit=True) + print >> sys.stdout, 'User %s connected successfully to database %s' % (user_name, database) + except BaseException as ex: print >> sys.stderr, ex.message sys.exit(1) @@ -33,4 +33,6 @@ def mapi_create_stream(schema, stream, c def mapi_flush_baskets(schema, stream, baskets): - Connection.execute(''.join(["CALL iot.push(\"", schema, "\",\"", stream, "\",\"", baskets, "\");"])) + # this procedure does not work yet. Have to check it with Martin + # Connection.execute(''.join(["CALL iot.push(\"", schema, "\",\"", stream, "\",\"", baskets, "\");"])) + pass diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -90,7 +90,7 @@ class StreamDataType(object): return self.pack_parsed_values(extracted_values, counter, parameters) def to_json_representation(self): # get a json representation of the data type while checking the stream's info - json_data = {'type': self._data_type, 'nullable': self._is_nullable} + json_data = {'name': self._column_name, 'type': self._data_type, 'nullable': self._is_nullable} if self._default_value is not None: json_data['default'] = self._default_value return json_data diff --git a/clients/iotclient/src/Streams/flushing.py b/clients/iotclient/src/Streams/flushing.py --- a/clients/iotclient/src/Streams/flushing.py +++ b/clients/iotclient/src/Streams/flushing.py @@ -4,6 +4,8 @@ from threading import Thread, Event class StoppableThread(Thread): + """Stoppable Thread""" + def __init__(self): Thread.__init__(self) self.stop_event = Event() @@ -17,10 +19,12 @@ class StoppableThread(Thread): class IntervalTimer(StoppableThread): + """Thread working with a timed interval basis""" + def __init__(self, interval, worker_func): super(IntervalTimer, self).__init__() self._interval = interval # in seconds - self._worker_func = worker_func + self._worker_func = worker_func # function/method to execute periodically def run(self): while not self.stop_event.is_set(): @@ -30,6 +34,7 @@ class IntervalTimer(StoppableThread): class StreamFlushingMethod(object): """Base class for flushing""" + __metaclass__ = ABCMeta def __init__(self): @@ -41,6 +46,8 @@ class StreamFlushingMethod(object): class TimeBasedFlushing(StreamFlushingMethod): + """Time based flushing""" + def __init__(self, interval, time_unit): super(TimeBasedFlushing, self).__init__() self._interval = interval @@ -61,13 +68,15 @@ class TimeBasedFlushing(StreamFlushingMe self._local_thread.stop() def get_dictionary_info(self): - return {'method': 'time', 'unit': self._time_unit, 'interval': self._interval} + return {'base': 'time', 'unit': self._time_unit, 'interval': self._interval} class TupleBasedFlushing(StreamFlushingMethod): + """Tuple based flushing""" + def __init__(self, limit): super(TupleBasedFlushing, self).__init__() self.limit = limit _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list