Changeset: 233ce5ea7032 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=233ce5ea7032 Added Files: clients/iotapi/src/Streams/streams.py clients/iotapi/src/WebSockets/__init__.py clients/iotapi/src/WebSockets/websockets.py Modified Files: clients/iotapi/requirements.txt clients/iotapi/src/Settings/filesystem.py clients/iotapi/src/Streams/datatypes.py clients/iotapi/src/Streams/streampolling.py clients/iotapi/src/Streams/streamscontext.py clients/iotapi/src/Utilities/customthreading.py clients/iotclient/requirements.txt clients/iotclient/src/Settings/filesystem.py clients/iotclient/src/Streams/streams.py clients/iotclient/src/Streams/streamscontext.py clients/iotclient/src/Streams/streamscreator.py Branch: iot Log Message:
Added file watching diffs (truncated from 451 to 300 lines): diff --git a/clients/iotapi/requirements.txt b/clients/iotapi/requirements.txt --- a/clients/iotapi/requirements.txt +++ b/clients/iotapi/requirements.txt @@ -1,7 +1,8 @@ git+https://github.com/dpallot/simple-websocket-server.git +pymonetdb==0.1.1 python-dateutil==2.5.3 pytz==2016.4 -pymonetdb==0.1.1 -tzlocal==1.2.2 Sphinx==1.4.1 sphinx-rtd-theme==0.1.9 +tzlocal==1.2.2 +watchdog==0.8.3 diff --git a/clients/iotapi/src/Settings/filesystem.py b/clients/iotapi/src/Settings/filesystem.py --- a/clients/iotapi/src/Settings/filesystem.py +++ b/clients/iotapi/src/Settings/filesystem.py @@ -5,24 +5,35 @@ import os from iotlogger import add_log BASKETS_BASE_DIRECTORY = "baskets" -CONFIG_FILE_DEFAULT_NAME = "config.json" if sys.platform in ("linux", "linux2", "darwin"): - filesystem_location = '/etc/iotcollector' + Filesystem_Location = '/etc/iotcollector' elif sys.platform == "win32": - filesystem_location = os.path.join(os.path.dirname(__file__), os.pardir) + Filesystem_Location = os.path.join(os.path.dirname(__file__), os.pardir) + +Baskets_Location = None def set_filesystem_location(new_location): - global filesystem_location - filesystem_location = new_location + global Filesystem_Location + Filesystem_Location = new_location def init_file_system(): + global Baskets_Location + try: - if not os.path.exists(filesystem_location): - os.makedirs(filesystem_location) + Baskets_Location = os.path.join(Filesystem_Location, BASKETS_BASE_DIRECTORY) + if not os.path.exists(Baskets_Location): + os.makedirs(Baskets_Location) + + if not os.path.exists(Filesystem_Location): + os.makedirs(Filesystem_Location) except (Exception, OSError) as ex: print >> sys.stdout, ex add_log(50, ex) sys.exit(1) + + +def get_baskets_base_location(): + return Baskets_Location diff --git a/clients/iotapi/src/Streams/datatypes.py b/clients/iotapi/src/Streams/datatypes.py --- a/clients/iotapi/src/Streams/datatypes.py +++ b/clients/iotapi/src/Streams/datatypes.py @@ -1,4 +1,3 @@ -import itertools import struct from abc import ABCMeta, abstractmethod from datetime import date, time, datetime @@ -27,25 +26,31 @@ class StreamDataType(object): def __init__(self, **kwargs): self._column_name = kwargs['name'] # name of the column self._data_type = kwargs['type'] # SQL name of the type - self._location = kwargs['location'] + '.tail' # Location of the file + # self._location = kwargs['location'] + '.tail' # Location of the file + + def is_file_mode_binary(self): + return True @abstractmethod def read_next_batch(self, file_pointer, count): return [] - def to_json_representation(self): - return {'name': self._column_name, 'type': self._data_type} + def fetch_new_tuples(self, count): + file_pointer = open(self._location, 'rb') class TextType(StreamDataType): - """Covers: TEXT, STRING, CLOB and CHARACTER LARGE OBJECT""" + """Covers: CHAR, VARCHAR, CLOB""" def __init__(self, **kwargs): super(TextType, self).__init__(**kwargs) self._nullable_constant = NIL_STRING + def is_file_mode_binary(self): + return False + def read_next_batch(self, file_pointer, count): - array = list(itertools.islice(file_pointer, count)) + array = file_pointer.readlines() return map(lambda x: None if x == self._nullable_constant else x[:-1], array) @@ -62,7 +67,7 @@ class BooleanType(StreamDataType): class SmallIntegerType(StreamDataType): - """Covers: TINYINT, SMALLINT, INT[EGER], BIGINT""" + """Covers: TINYINT, SMALLINT, INTEGER, BIGINT""" def __init__(self, **kwargs): super(SmallIntegerType, self).__init__(**kwargs) @@ -98,7 +103,7 @@ class HugeIntegerType(StreamDataType): class FloatType(StreamDataType): - """Covers: REAL, FLOAT and DOUBLE""" + """Covers: REAL, DOUBLE""" def __init__(self, **kwargs): super(FloatType, self).__init__(**kwargs) @@ -112,7 +117,7 @@ class FloatType(StreamDataType): class DecimalType(StreamDataType): - """Covers: DECIMAL and NUMERIC""" + """Covers: DECIMAL""" def __init__(self, **kwargs): super(DecimalType, self).__init__(**kwargs) diff --git a/clients/iotapi/src/Streams/streampolling.py b/clients/iotapi/src/Streams/streampolling.py --- a/clients/iotapi/src/Streams/streampolling.py +++ b/clients/iotapi/src/Streams/streampolling.py @@ -23,7 +23,7 @@ def init_stream_polling_thread(interval) # elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type, elem[4] is location, elem[5] is typewidth def stream_polling(): - array = fetch_streams() + array = fetch_streams() # TODO check whenever stream's columns are updated for key, group in groupby(array, lambda x: x[0] + '.' + x[1]): if not Streams_context.is_stream_in_context(key): columns = {} @@ -34,4 +34,4 @@ def stream_polling(): new_column = reflection_class(kwargs) columns[elem[2]] = new_column - Streams_context.add_stream(key, DataCellStream(elem[0], elem[1], columns)) + Streams_context.add_stream(key, DataCellStream(key, columns)) diff --git a/clients/iotapi/src/Streams/streams.py b/clients/iotapi/src/Streams/streams.py new file mode 100644 --- /dev/null +++ b/clients/iotapi/src/Streams/streams.py @@ -0,0 +1,36 @@ +import os + +from Settings.filesystem import get_baskets_base_location +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer + + +class StreamBasketsHandler(FileSystemEventHandler): + def __init__(self, stream): + super(StreamBasketsHandler, self).__init__() + self._stream = stream + + def on_created(self, event): # whenever a basket directory is created, notify + if isinstance(event, 'DirCreatedEvent'): + self._stream.read_new_tuples(event.src_path) + + +class DataCellStream(object): + """Representation of a stream""" + + def __init__(self, schema_name, stream_name, columns): + self._schema_name = schema_name # name of the schema + self._stream_name = stream_name # name of the stream + self._columns = columns # dictionary of name -> data_types + self._base_path = os.path.join(get_baskets_base_location(), schema_name, stream_name) + self._observer = Observer() + self._observer.schedule(StreamBasketsHandler(stream=self), self._base_path, recursive=False) + self._observer.start() + + def read_new_tuples(self, path): + for key, column in self._columns.iteritems(): + next_file_name = os.path.join(path, key) + open_string = 'r' + if not column.is_file_mode_binary(): + open_string += 'u' + file_pointer = open(next_file_name, open_string) diff --git a/clients/iotapi/src/Streams/streamscontext.py b/clients/iotapi/src/Streams/streamscontext.py --- a/clients/iotapi/src/Streams/streamscontext.py +++ b/clients/iotapi/src/Streams/streamscontext.py @@ -1,15 +1,6 @@ import collections -class DataCellStream(object): - """Representation of a stream""" - - def __init__(self, schema_name, stream_name, columns): - self._schema_name = schema_name # name of the schema - self._stream_name = stream_name # name of the stream - self._columns = columns # dictionary of name -> data_types - - class IOTStreams(object): """Stream's context""" diff --git a/clients/iotapi/src/Utilities/customthreading.py b/clients/iotapi/src/Utilities/customthreading.py --- a/clients/iotapi/src/Utilities/customthreading.py +++ b/clients/iotapi/src/Utilities/customthreading.py @@ -20,12 +20,13 @@ class StoppableThread(Thread): class PeriodicalThread(StoppableThread): """Thread working with a timed interval basis""" - def __init__(self, interval, worker_func, *args, **kwargs): + def __init__(self, interval, worker_func, func_args=None, *args, **kwargs): super(PeriodicalThread, self).__init__(*args, **kwargs) self._interval = interval # in seconds self._worker_func = worker_func # function/method to execute periodically + self._worker_func_args = func_args def run(self): while not self.stop_event.is_set(): - self._worker_func() + self._worker_func(self._worker_func_args) time.sleep(self._interval) diff --git a/clients/iotapi/src/WebSockets/__init__.py b/clients/iotapi/src/WebSockets/__init__.py new file mode 100644 diff --git a/clients/iotapi/src/WebSockets/websockets.py b/clients/iotapi/src/WebSockets/websockets.py new file mode 100644 --- /dev/null +++ b/clients/iotapi/src/WebSockets/websockets.py @@ -0,0 +1,36 @@ +import sys + +from Settings.iotlogger import add_log +from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket + +WebSocketServer = None +clients = [] # this probably won't scale + + +class IOTAPI(WebSocket): + def handleMessage(self): + for client in clients: + client.sendMessage(self.data) + + def handleConnected(self): + clients.append(self) + add_log(20, 'Client connected: ' + self.address[0]) + + def handleClose(self): + clients.remove(self) + add_log(20, 'Client disconnected: ' + self.address[0]) + + +def init_websockets(host, port): + global WebSocketServer + try: + WebSocketServer = SimpleWebSocketServer(host, port, IOTAPI) + WebSocketServer.serveforever() + except (Exception, OSError) as ex: + print >> sys.stdout, ex + add_log(50, ex) + sys.exit(1) + + +def terminate_websockets(): + WebSocketServer.close() diff --git a/clients/iotclient/requirements.txt b/clients/iotclient/requirements.txt --- a/clients/iotclient/requirements.txt +++ b/clients/iotclient/requirements.txt @@ -1,10 +1,10 @@ Flask-RESTful==0.3.5 jsonschema==2.5.1 -rfc3987==1.3.5 -strict-rfc3339==0.6 +pymonetdb==0.1.1 python-dateutil==2.5.3 pytz==2016.4 -pymonetdb==0.1.1 -tzlocal==1.2.2 +rfc3987==1.3.5 Sphinx==1.4.1 sphinx-rtd-theme==0.1.9 +strict-rfc3339==0.6 +tzlocal==1.2.2 diff --git a/clients/iotclient/src/Settings/filesystem.py b/clients/iotclient/src/Settings/filesystem.py --- a/clients/iotclient/src/Settings/filesystem.py +++ b/clients/iotclient/src/Settings/filesystem.py @@ -9,9 +9,9 @@ BASKETS_BASE_DIRECTORY = "baskets" CONFIG_FILE_DEFAULT_NAME = "config.json" if sys.platform in ("linux", "linux2", "darwin"): - filesystem_location = '/etc/iotcollector' + Filesystem_Location = '/etc/iotcollector' elif sys.platform == "win32": _______________________________________________ checkin-list mailing list [email protected] https://www.monetdb.org/mailman/listinfo/checkin-list
