Changeset: 6781d09b7830 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6781d09b7830
Added Files:
        clients/iotclient/src/Streams/streamscreator.py
Removed Files:
        clients/iotclient/src/Streams/semanticvalidation.py
Modified Files:
        clients/iotclient/requirements.txt
        clients/iotclient/src/Flask/app.py
        clients/iotclient/src/Flask/restresources.py
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Settings/iotlogger.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/flushing.py
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/Utilities/filecreator.py
        clients/iotclient/src/main.py
Branch: iot
Log Message:

Added stream information serialization using a JSON config file


diffs (truncated from 710 to 300 lines):

diff --git a/clients/iotclient/requirements.txt 
b/clients/iotclient/requirements.txt
--- a/clients/iotclient/requirements.txt
+++ b/clients/iotclient/requirements.txt
@@ -4,5 +4,5 @@ rfc3987==1.3.5
 strict-rfc3339==0.6
 python-dateutil==2.5.2
 pytz==2016.3
-pymonetdb==1.0
+pymonetdb==0.1.1
 tzlocal==1.2.2
diff --git a/clients/iotclient/src/Flask/app.py 
b/clients/iotclient/src/Flask/app.py
--- a/clients/iotclient/src/Flask/app.py
+++ b/clients/iotclient/src/Flask/app.py
@@ -1,7 +1,7 @@
 from flask import Flask
 from flask_restful import Api
 
-from restresources import StreamInput, StreamsInfo, StreamsHandling
+from restresources import StreamInput, StreamsInfo, StreamsHandling  # , 
ServerHandler
 
 
 def start_flask_iot_app(host, port):
@@ -21,4 +21,5 @@ def start_flask_admin_app(host, port):
 
     admin_api.add_resource(StreamsInfo, '/streams')
     admin_api.add_resource(StreamsHandling, '/context')
+    #  admin_api.add_resource(ServerHandler, '/server')
     admin_app.run(host=host, port=port, threaded=True)
diff --git a/clients/iotclient/src/Flask/restresources.py 
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -9,8 +9,18 @@ from tzlocal import get_localzone
 from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA
 from Streams.streamscontext import IOTStreamsException, IOTStreams
 
-Stream_context = IOTStreams()
-local_tz = get_localzone()  # for the correction of dates we must add the 
system's timezone
+Streams_Context = None
+Create_Streams_Validator = None
+Delete_Streams_Validator = None
+Local_Timezone = None
+
+
+def init_rest_resources():
+    global Streams_Context, Create_Streams_Validator, 
Delete_Streams_Validator, Local_Timezone
+    Local_Timezone = get_localzone()  # for the correction of dates we must 
add the system's timezone
+    Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
+    Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
+    Streams_Context = IOTStreams()
 
 
 class StreamInput(Resource):
@@ -18,22 +28,22 @@ class StreamInput(Resource):
 
     def get(self, schema_name, stream_name):  # check a single stream data
         try:  # check if stream exists, if not return 404
-            stream = Stream_context.get_existing_stream(schema_name, 
stream_name)
-        except IOTStreamsException as ex:
+            stream = Streams_Context.get_existing_stream(schema_name, 
stream_name)
+        except BaseException as ex:
             return ex.message, 404
-        return stream.get_data_dictionary(), 200
+        return stream.get_data_dictionary(include_number_tuples=True), 200
 
     def post(self, schema_name, stream_name):  # add data to a stream
-        current_stamp = 
datetime.datetime.now(pytz.utc).astimezone(local_tz).isoformat()
+        current_stamp = 
datetime.datetime.now(pytz.utc).astimezone(Local_Timezone).isoformat()
 
         try:  # check if stream exists, if not return 404
-            stream = Stream_context.get_existing_stream(schema_name, 
stream_name)
-        except IOTStreamsException as ex:
+            stream = Streams_Context.get_existing_stream(schema_name, 
stream_name)
+        except BaseException as ex:
             return ex.message, 404
 
         try:  # validate and insert data, if not return 400
             stream.validate_and_insert(json.loads(request.data), current_stamp)
-        except Exception as ex:
+        except BaseException as ex:
             return ex.message, 400
         return '', 201  # all ok, return 201
 
@@ -42,24 +52,21 @@ class StreamsInfo(Resource):
     """Collect all streams information"""
 
     def get(self):  # get all streams data
-        return Stream_context.get_streams_data(), 200
+        return Streams_Context.get_streams_data(), 200
 
 
 class StreamsHandling(Resource):
     """Admin class for creating/deleting streams"""
 
-    CREATE_STREAMS_VALIDATOR = Draft4Validator(CREATE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
-    DELETE_STREAMS_VALIDATOR = Draft4Validator(DELETE_STREAMS_SCHEMA, 
format_checker=FormatChecker())
-
     def __init__(self):
         super(StreamsHandling, self).__init__()
 
     def post(self):
         try:
             schema_to_validate = json.loads(request.data)
-            
StreamsHandling.CREATE_STREAMS_VALIDATOR.validate(schema_to_validate)
-            Stream_context.add_new_stream(schema_to_validate)
-        except Exception as ex:
+            Create_Streams_Validator.validate(schema_to_validate)
+            Streams_Context.add_new_stream(schema_to_validate)
+        except BaseException as ex:
             return ex.message, 400
         else:
             return '', 201
@@ -67,12 +74,12 @@ class StreamsHandling(Resource):
     def delete(self):
         try:
             schema_to_validate = json.loads(request.data)
-            
StreamsHandling.DELETE_STREAMS_VALIDATOR.validate(schema_to_validate)
-        except Exception as ex:
+            Delete_Streams_Validator.validate(schema_to_validate)
+        except BaseException as ex:
             return ex.message, 400
 
         try:  # check if stream exists, if not return 404
-            Stream_context.delete_existing_stream(schema_to_validate)
-        except IOTStreamsException as ex:
+            Streams_Context.delete_existing_stream(schema_to_validate)
+        except BaseException as ex:
             return ex.message, 404
         return '', 204
diff --git a/clients/iotclient/src/Settings/filesystem.py 
b/clients/iotclient/src/Settings/filesystem.py
--- a/clients/iotclient/src/Settings/filesystem.py
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -1,9 +1,12 @@
 import os
 import sys
 
-baskets_base_location = None
+from Utilities.filecreator import create_file_if_not_exists
+
 BASKETS_BASE_DIRECTORY = "baskets"
-
+CONFIG_FILE_DEFAULT_NAME = "config.json"
+Baskets_Base_Location = None
+Config_File_Location = None
 
 if sys.platform in ("linux", "linux2", "darwin"):
     filesystem_location = '/etc/iotcollector'
@@ -13,29 +16,30 @@ elif sys.platform == "win32":
 
 def set_filesystem_location(new_location):
     global filesystem_location
+    filesystem_location = new_location
+
+
+def init_file_system(new_configfile_location=None):
+    global Baskets_Base_Location, Config_File_Location
 
     try:
-        if os.path.isdir(new_location):
-            filesystem_location = new_location
+        Baskets_Base_Location = os.path.join(filesystem_location, 
BASKETS_BASE_DIRECTORY)
+        if not os.path.exists(Baskets_Base_Location):
+            os.makedirs(Baskets_Base_Location)
+
+        if new_configfile_location is not None:
+            Config_File_Location = 
create_file_if_not_exists(new_configfile_location, hidden=False, init_text='[]')
         else:
-            print >> sys.stderr, "The provided filesystem doesn't exist!"
-            sys.exit(1)
+            Config_File_Location = create_file_if_not_exists(
+                os.path.join(filesystem_location, CONFIG_FILE_DEFAULT_NAME), 
hidden=False, init_text='[]')
     except (Exception, OSError) as ex:
         print >> sys.stderr, ex
         sys.exit(1)
 
 
-def init_file_system():
-    global baskets_base_location
+def get_baskets_base_location():
+    return Baskets_Base_Location
 
-    try:
-        baskets_base_location = os.path.join(filesystem_location, 
BASKETS_BASE_DIRECTORY)
-        if not os.path.exists(baskets_base_location):
-            os.makedirs(baskets_base_location)
-    except (Exception, OSError) as ex:
-        print >> sys.stderr, ex
-        sys.exit(1)
 
-
-def get_baskets_base_location():
-    return baskets_base_location
+def get_configfile_location():
+    return Config_File_Location
diff --git a/clients/iotclient/src/Settings/iotlogger.py 
b/clients/iotclient/src/Settings/iotlogger.py
--- a/clients/iotclient/src/Settings/iotlogger.py
+++ b/clients/iotclient/src/Settings/iotlogger.py
@@ -32,3 +32,7 @@ def init_logging():
     except (Exception, OSError) as ex:
         print >> sys.stderr, ex
         sys.exit(1)
+
+
+def add_log(lvl, message, *args, **kwargs):
+    logger.log(lvl, message, *args, **kwargs)
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -10,11 +10,11 @@ def init_monetdb_connection(hostname, po
 
     user_password = getpass.getpass(prompt='User password:')
 
-    try:
+    try:  # the autocommit is set to true so each statement will be independent
         Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=user_password,
-                                       database=database)
-        print >> sys.stdout, 'User %s connection successful to the database 
%s' % (user_name, database)
-    except Exception as ex:
+                                       database=database, autocommit=True)
+        print >> sys.stdout, 'User %s connected successfully to database %s' % 
(user_name, database)
+    except BaseException as ex:
         print >> sys.stderr, ex.message
         sys.exit(1)
 
@@ -33,4 +33,6 @@ def mapi_create_stream(schema, stream, c
 
 
 def mapi_flush_baskets(schema, stream, baskets):
-    Connection.execute(''.join(["CALL iot.push(\"", schema, "\",\"", stream, 
"\",\"", baskets, "\");"]))
+    # this procedure does not work yet. Have to check it with Martin
+    # Connection.execute(''.join(["CALL iot.push(\"", schema, "\",\"", stream, 
"\",\"", baskets, "\");"]))
+    pass
diff --git a/clients/iotclient/src/Streams/datatypes.py 
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -90,7 +90,7 @@ class StreamDataType(object):
         return self.pack_parsed_values(extracted_values, counter, parameters)
 
     def to_json_representation(self):  # get a json representation of the data 
type while checking the stream's info
-        json_data = {'type': self._data_type, 'nullable': self._is_nullable}
+        json_data = {'name': self._column_name, 'type': self._data_type, 
'nullable': self._is_nullable}
         if self._default_value is not None:
             json_data['default'] = self._default_value
         return json_data
diff --git a/clients/iotclient/src/Streams/flushing.py 
b/clients/iotclient/src/Streams/flushing.py
--- a/clients/iotclient/src/Streams/flushing.py
+++ b/clients/iotclient/src/Streams/flushing.py
@@ -4,6 +4,8 @@ from threading import Thread, Event
 
 
 class StoppableThread(Thread):
+    """Stoppable Thread"""
+
     def __init__(self):
         Thread.__init__(self)
         self.stop_event = Event()
@@ -17,10 +19,12 @@ class StoppableThread(Thread):
 
 
 class IntervalTimer(StoppableThread):
+    """Thread working with a timed interval basis"""
+
     def __init__(self, interval, worker_func):
         super(IntervalTimer, self).__init__()
         self._interval = interval  # in seconds
-        self._worker_func = worker_func
+        self._worker_func = worker_func  # function/method to execute 
periodically
 
     def run(self):
         while not self.stop_event.is_set():
@@ -30,6 +34,7 @@ class IntervalTimer(StoppableThread):
 
 class StreamFlushingMethod(object):
     """Base class for flushing"""
+
     __metaclass__ = ABCMeta
 
     def __init__(self):
@@ -41,6 +46,8 @@ class StreamFlushingMethod(object):
 
 
 class TimeBasedFlushing(StreamFlushingMethod):
+    """Time based flushing"""
+
     def __init__(self, interval, time_unit):
         super(TimeBasedFlushing, self).__init__()
         self._interval = interval
@@ -61,13 +68,15 @@ class TimeBasedFlushing(StreamFlushingMe
         self._local_thread.stop()
 
     def get_dictionary_info(self):
-        return {'method': 'time', 'unit': self._time_unit, 'interval': 
self._interval}
+        return {'base': 'time', 'unit': self._time_unit, 'interval': 
self._interval}
 
 
 class TupleBasedFlushing(StreamFlushingMethod):
+    """Tuple based flushing"""
+
     def __init__(self, limit):
         super(TupleBasedFlushing, self).__init__()
         self.limit = limit
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to