Changeset: 233ce5ea7032 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=233ce5ea7032
Added Files:
        clients/iotapi/src/Streams/streams.py
        clients/iotapi/src/WebSockets/__init__.py
        clients/iotapi/src/WebSockets/websockets.py
Modified Files:
        clients/iotapi/requirements.txt
        clients/iotapi/src/Settings/filesystem.py
        clients/iotapi/src/Streams/datatypes.py
        clients/iotapi/src/Streams/streampolling.py
        clients/iotapi/src/Streams/streamscontext.py
        clients/iotapi/src/Utilities/customthreading.py
        clients/iotclient/requirements.txt
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:

Added file watching


diffs (truncated from 451 to 300 lines):

diff --git a/clients/iotapi/requirements.txt b/clients/iotapi/requirements.txt
--- a/clients/iotapi/requirements.txt
+++ b/clients/iotapi/requirements.txt
@@ -1,7 +1,8 @@
 git+https://github.com/dpallot/simple-websocket-server.git
+pymonetdb==0.1.1
 python-dateutil==2.5.3
 pytz==2016.4
-pymonetdb==0.1.1
-tzlocal==1.2.2
 Sphinx==1.4.1
 sphinx-rtd-theme==0.1.9
+tzlocal==1.2.2
+watchdog==0.8.3
diff --git a/clients/iotapi/src/Settings/filesystem.py 
b/clients/iotapi/src/Settings/filesystem.py
--- a/clients/iotapi/src/Settings/filesystem.py
+++ b/clients/iotapi/src/Settings/filesystem.py
@@ -5,24 +5,35 @@ import os
 from iotlogger import add_log
 
 BASKETS_BASE_DIRECTORY = "baskets"
-CONFIG_FILE_DEFAULT_NAME = "config.json"
 
 if sys.platform in ("linux", "linux2", "darwin"):
-    filesystem_location = '/etc/iotcollector'
+    Filesystem_Location = '/etc/iotcollector'
 elif sys.platform == "win32":
-    filesystem_location = os.path.join(os.path.dirname(__file__), os.pardir)
+    Filesystem_Location = os.path.join(os.path.dirname(__file__), os.pardir)
+
+Baskets_Location = None
 
 
 def set_filesystem_location(new_location):
-    global filesystem_location
-    filesystem_location = new_location
+    global Filesystem_Location
+    Filesystem_Location = new_location
 
 
 def init_file_system():
+    global Baskets_Location
+
     try:
-        if not os.path.exists(filesystem_location):
-            os.makedirs(filesystem_location)
+        Baskets_Location = os.path.join(Filesystem_Location, 
BASKETS_BASE_DIRECTORY)
+        if not os.path.exists(Baskets_Location):
+            os.makedirs(Baskets_Location)
+
+        if not os.path.exists(Filesystem_Location):
+            os.makedirs(Filesystem_Location)
     except (Exception, OSError) as ex:
         print >> sys.stdout, ex
         add_log(50, ex)
         sys.exit(1)
+
+
+def get_baskets_base_location():
+    return Baskets_Location
diff --git a/clients/iotapi/src/Streams/datatypes.py 
b/clients/iotapi/src/Streams/datatypes.py
--- a/clients/iotapi/src/Streams/datatypes.py
+++ b/clients/iotapi/src/Streams/datatypes.py
@@ -1,4 +1,3 @@
-import itertools
 import struct
 from abc import ABCMeta, abstractmethod
 from datetime import date, time, datetime
@@ -27,25 +26,31 @@ class StreamDataType(object):
     def __init__(self, **kwargs):
         self._column_name = kwargs['name']  # name of the column
         self._data_type = kwargs['type']  # SQL name of the type
-        self._location = kwargs['location'] + '.tail'  # Location of the file
+        # self._location = kwargs['location'] + '.tail'  # Location of the file
+
+    def is_file_mode_binary(self):
+        return True
 
     @abstractmethod
     def read_next_batch(self, file_pointer, count):
         return []
 
-    def to_json_representation(self):
-        return {'name': self._column_name, 'type': self._data_type}
+    def fetch_new_tuples(self, count):
+        file_pointer = open(self._location, 'rb')
 
 
 class TextType(StreamDataType):
-    """Covers: TEXT, STRING, CLOB and CHARACTER LARGE OBJECT"""
+    """Covers: CHAR, VARCHAR, CLOB"""
 
     def __init__(self, **kwargs):
         super(TextType, self).__init__(**kwargs)
         self._nullable_constant = NIL_STRING
 
+    def is_file_mode_binary(self):
+        return False
+
     def read_next_batch(self, file_pointer, count):
-        array = list(itertools.islice(file_pointer, count))
+        array = file_pointer.readlines()
         return map(lambda x: None if x == self._nullable_constant else x[:-1], 
array)
 
 
@@ -62,7 +67,7 @@ class BooleanType(StreamDataType):
 
 
 class SmallIntegerType(StreamDataType):
-    """Covers: TINYINT, SMALLINT, INT[EGER], BIGINT"""
+    """Covers: TINYINT, SMALLINT, INTEGER, BIGINT"""
 
     def __init__(self, **kwargs):
         super(SmallIntegerType, self).__init__(**kwargs)
@@ -98,7 +103,7 @@ class HugeIntegerType(StreamDataType):
 
 
 class FloatType(StreamDataType):
-    """Covers: REAL, FLOAT and DOUBLE"""
+    """Covers: REAL, DOUBLE"""
 
     def __init__(self, **kwargs):
         super(FloatType, self).__init__(**kwargs)
@@ -112,7 +117,7 @@ class FloatType(StreamDataType):
 
 
 class DecimalType(StreamDataType):
-    """Covers: DECIMAL and NUMERIC"""
+    """Covers: DECIMAL"""
 
     def __init__(self, **kwargs):
         super(DecimalType, self).__init__(**kwargs)
diff --git a/clients/iotapi/src/Streams/streampolling.py 
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -23,7 +23,7 @@ def init_stream_polling_thread(interval)
 
 # elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type, 
elem[4] is location, elem[5] is typewidth
 def stream_polling():
-    array = fetch_streams()
+    array = fetch_streams()  # TODO check whenever stream's columns are updated
     for key, group in groupby(array, lambda x: x[0] + '.' + x[1]):
         if not Streams_context.is_stream_in_context(key):
             columns = {}
@@ -34,4 +34,4 @@ def stream_polling():
                 new_column = reflection_class(kwargs)
                 columns[elem[2]] = new_column
 
-            Streams_context.add_stream(key, DataCellStream(elem[0], elem[1], 
columns))
+            Streams_context.add_stream(key, DataCellStream(key, columns))
diff --git a/clients/iotapi/src/Streams/streams.py 
b/clients/iotapi/src/Streams/streams.py
new file mode 100644
--- /dev/null
+++ b/clients/iotapi/src/Streams/streams.py
@@ -0,0 +1,36 @@
+import os
+
+from Settings.filesystem import get_baskets_base_location
+from watchdog.events import FileSystemEventHandler
+from watchdog.observers import Observer
+
+
+class StreamBasketsHandler(FileSystemEventHandler):
+    def __init__(self, stream):
+        super(StreamBasketsHandler, self).__init__()
+        self._stream = stream
+
+    def on_created(self, event):  # whenever a basket directory is created, 
notify
+        if isinstance(event, 'DirCreatedEvent'):
+            self._stream.read_new_tuples(event.src_path)
+
+
+class DataCellStream(object):
+    """Representation of a stream"""
+
+    def __init__(self, schema_name, stream_name, columns):
+        self._schema_name = schema_name  # name of the schema
+        self._stream_name = stream_name  # name of the stream
+        self._columns = columns  # dictionary of name -> data_types
+        self._base_path = os.path.join(get_baskets_base_location(), 
schema_name, stream_name)
+        self._observer = Observer()
+        self._observer.schedule(StreamBasketsHandler(stream=self), 
self._base_path, recursive=False)
+        self._observer.start()
+
+    def read_new_tuples(self, path):
+        for key, column in self._columns.iteritems():
+            next_file_name = os.path.join(path, key)
+            open_string = 'r'
+            if not column.is_file_mode_binary():
+                open_string += 'u'
+            file_pointer = open(next_file_name, open_string)
diff --git a/clients/iotapi/src/Streams/streamscontext.py 
b/clients/iotapi/src/Streams/streamscontext.py
--- a/clients/iotapi/src/Streams/streamscontext.py
+++ b/clients/iotapi/src/Streams/streamscontext.py
@@ -1,15 +1,6 @@
 import collections
 
 
-class DataCellStream(object):
-    """Representation of a stream"""
-
-    def __init__(self, schema_name, stream_name, columns):
-        self._schema_name = schema_name  # name of the schema
-        self._stream_name = stream_name  # name of the stream
-        self._columns = columns  # dictionary of name -> data_types
-
-
 class IOTStreams(object):
     """Stream's context"""
 
diff --git a/clients/iotapi/src/Utilities/customthreading.py 
b/clients/iotapi/src/Utilities/customthreading.py
--- a/clients/iotapi/src/Utilities/customthreading.py
+++ b/clients/iotapi/src/Utilities/customthreading.py
@@ -20,12 +20,13 @@ class StoppableThread(Thread):
 class PeriodicalThread(StoppableThread):
     """Thread working with a timed interval basis"""
 
-    def __init__(self, interval, worker_func, *args, **kwargs):
+    def __init__(self, interval, worker_func, func_args=None, *args, **kwargs):
         super(PeriodicalThread, self).__init__(*args, **kwargs)
         self._interval = interval  # in seconds
         self._worker_func = worker_func  # function/method to execute 
periodically
+        self._worker_func_args = func_args
 
     def run(self):
         while not self.stop_event.is_set():
-            self._worker_func()
+            self._worker_func(self._worker_func_args)
             time.sleep(self._interval)
diff --git a/clients/iotapi/src/WebSockets/__init__.py 
b/clients/iotapi/src/WebSockets/__init__.py
new file mode 100644
diff --git a/clients/iotapi/src/WebSockets/websockets.py 
b/clients/iotapi/src/WebSockets/websockets.py
new file mode 100644
--- /dev/null
+++ b/clients/iotapi/src/WebSockets/websockets.py
@@ -0,0 +1,36 @@
+import sys
+
+from Settings.iotlogger import add_log
+from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
+
+WebSocketServer = None
+clients = []  # this probably won't scale
+
+
+class IOTAPI(WebSocket):
+    def handleMessage(self):
+        for client in clients:
+            client.sendMessage(self.data)
+
+    def handleConnected(self):
+        clients.append(self)
+        add_log(20, 'Client connected: ' + self.address[0])
+
+    def handleClose(self):
+        clients.remove(self)
+        add_log(20, 'Client disconnected: ' + self.address[0])
+
+
+def init_websockets(host, port):
+    global WebSocketServer
+    try:
+        WebSocketServer = SimpleWebSocketServer(host, port, IOTAPI)
+        WebSocketServer.serveforever()
+    except (Exception, OSError) as ex:
+        print >> sys.stdout, ex
+        add_log(50, ex)
+        sys.exit(1)
+
+
+def terminate_websockets():
+    WebSocketServer.close()
diff --git a/clients/iotclient/requirements.txt 
b/clients/iotclient/requirements.txt
--- a/clients/iotclient/requirements.txt
+++ b/clients/iotclient/requirements.txt
@@ -1,10 +1,10 @@
 Flask-RESTful==0.3.5
 jsonschema==2.5.1
-rfc3987==1.3.5
-strict-rfc3339==0.6
+pymonetdb==0.1.1
 python-dateutil==2.5.3
 pytz==2016.4
-pymonetdb==0.1.1
-tzlocal==1.2.2
+rfc3987==1.3.5
 Sphinx==1.4.1
 sphinx-rtd-theme==0.1.9
+strict-rfc3339==0.6
+tzlocal==1.2.2
diff --git a/clients/iotclient/src/Settings/filesystem.py 
b/clients/iotclient/src/Settings/filesystem.py
--- a/clients/iotclient/src/Settings/filesystem.py
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -9,9 +9,9 @@ BASKETS_BASE_DIRECTORY = "baskets"
 CONFIG_FILE_DEFAULT_NAME = "config.json"
 
 if sys.platform in ("linux", "linux2", "darwin"):
-    filesystem_location = '/etc/iotcollector'
+    Filesystem_Location = '/etc/iotcollector'
 elif sys.platform == "win32":
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to