Changeset: 5b90b8e6494a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5b90b8e6494a
Modified Files:
clients/iotapi/src/Settings/mapiconnection.py
clients/iotapi/src/Streams/streams.py
clients/iotapi/src/Streams/streamscontext.py
clients/iotapi/src/WebSockets/jsonschemas.py
clients/iotapi/src/WebSockets/websockets.py
clients/iotclient/src/Settings/mapiconnection.py
clients/iotclient/src/Streams/datatypes.py
clients/iotclient/src/Streams/jsonschemas.py
clients/iotclient/src/Streams/streams.py
clients/iotclient/src/Streams/streamscontext.py
clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:
Finished stream reading
diffs (truncated from 661 to 300 lines):
diff --git a/clients/iotapi/src/Settings/mapiconnection.py
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -47,4 +47,3 @@ def fetch_streams():
except BaseException as ex:
print >> sys.stdout, ex
add_log(50, ex)
- sys.exit(1)
diff --git a/clients/iotapi/src/Streams/streams.py
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -28,8 +28,8 @@ class StreamBasketsHandler(FileSystemEve
def on_created(self, event): # whenever a basket directory is created,
notify to subscribed clients
if isinstance(event, 'DirCreatedEvent'):
basket_string = os.path.basename(os.path.normpath(event.src_path))
- self._stream.append_basket(basket_string)
- notify_clients(self._stream.get_schema_name(),
self._stream.get_stream_name())
+ count = self._stream.append_basket(basket_string)
+ notify_clients(self._stream.get_schema_name(),
self._stream.get_stream_name(), count)
def on_deleted(self, event):
if isinstance(event, 'DirDeletedEvent'):
@@ -49,6 +49,7 @@ class IOTStream(object):
for name in os.listdir(self._base_path):
self.append_basket(name)
self._lock = RWLock()
+
self._observer = Observer()
self._observer.schedule(StreamBasketsHandler(stream=self),
self._base_path, recursive=False)
self._observer.start()
@@ -66,6 +67,8 @@ class IOTStream(object):
self._lock.acquire_write()
self._baskets[int(path)] = count
self._lock.release()
+ return count
+ return 0
def delete_basket(self, path):
if represents_int(path):
@@ -126,7 +129,6 @@ class IOTStream(object):
offset = 0
current_basket_number += 1
- # TODO check if this is viable, it could be 1000 tuples!!!!
- keys = results.keys()
+ keys = results.keys() # TODO check if this is viable for many tuples!!
tuples = [dict(zip(keys, values)) for values in zip(*(results[k] for k
in keys))]
return {'total': read_tuples, 'tuples': tuples}
diff --git a/clients/iotapi/src/Streams/streamscontext.py
b/clients/iotapi/src/Streams/streamscontext.py
--- a/clients/iotapi/src/Streams/streamscontext.py
+++ b/clients/iotapi/src/Streams/streamscontext.py
@@ -5,7 +5,7 @@ from WebSockets.websockets import desubs
class IOTStreams(object):
- """Stream's context"""
+ """Streams context"""
@classmethod
def get_context_entry_name(cls, schema_name, stream_name):
diff --git a/clients/iotapi/src/WebSockets/jsonschemas.py
b/clients/iotapi/src/WebSockets/jsonschemas.py
--- a/clients/iotapi/src/WebSockets/jsonschemas.py
+++ b/clients/iotapi/src/WebSockets/jsonschemas.py
@@ -1,6 +1,11 @@
-PUBSUB_STREAMS_SCHEMA = {
+SUBSCRIBE_OPTS = ["sub", "subscribe"]
+UNSUBSCRIBE_OPTS = ["unsub", "unsubscribe"]
+CONCAT_SUB_OPTS = SUBSCRIBE_OPTS + UNSUBSCRIBE_OPTS
+READ_OPTS = ["read"]
+
+CLIENTS_INPUTS_SCHEMA = {
"title": "JSON schema publish/subscribe streams",
- "description": "Validate data inserted",
+ "description": "Validate clients inputs",
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
@@ -8,7 +13,7 @@ PUBSUB_STREAMS_SCHEMA = {
"properties": {
"schema": {"type": "string"},
"stream": {"type": "string"},
- "action": {"type": "string", "enum": ["sub", "subscribe", "desub",
"desubscribe"]},
+ "action": {"type": "string", "enum": CONCAT_SUB_OPTS},
},
"required": ["schema", "stream", "action"],
"additionalProperties": False
@@ -22,7 +27,7 @@ PUBSUB_STREAMS_SCHEMA = {
"properties": {
"schema": {"type": "string"},
"stream": {"type": "string"},
- "action": {"type": "string", "enum": ["read"]},
+ "action": {"type": "string", "enum": READ_OPTS},
"basket": {"type": "integer", "minimum": 1, "default": 1},
"limit": {"type": "integer", "minimum": 0, "default": 0},
"offset": {"type": "integer", "minimum": 0, "default": 0}
diff --git a/clients/iotapi/src/WebSockets/websockets.py
b/clients/iotapi/src/WebSockets/websockets.py
--- a/clients/iotapi/src/WebSockets/websockets.py
+++ b/clients/iotapi/src/WebSockets/websockets.py
@@ -7,24 +7,23 @@ from Streams.streamscontext import Strea
from Utilities.readwritelock import RWLock
from jsonschema import Draft4Validator, FormatChecker
-from jsonschemas import PUBSUB_STREAMS_SCHEMA
+from jsonschemas import CLIENTS_INPUTS_SCHEMA, SUBSCRIBE_OPTS,
UNSUBSCRIBE_OPTS, READ_OPTS
-Client_Messages_Validator = Draft4Validator(PUBSUB_STREAMS_SCHEMA,
format_checker=FormatChecker())
+Client_Messages_Validator = Draft4Validator(CLIENTS_INPUTS_SCHEMA,
format_checker=FormatChecker())
WebSocketServer = None
WebClients = [] # TODO this probably won't scale for many
WebClientsLock = RWLock()
-def notify_stream_inserts_to_clients(schema_name, stream_name):
+def notify_stream_inserts_to_clients(schema_name, stream_name, count):
concatenated_name = IOTStreams.get_context_entry_name(schema_name,
stream_name)
- json_message = json.dumps({'notification': {'schema': schema_name,
'stream': stream_name}})
WebClientsLock.acquire_read()
for client in WebClients:
- client.send_notification_message(concatenated_name, json_message)
+ client.send_notification_message(concatenated_name, schema_name,
stream_name, count)
WebClientsLock.release()
-def desubscribe_removed_streams(concatenated_names):
+def unsubscribe_removed_streams(concatenated_names):
WebClientsLock.acquire_read()
for name in concatenated_names:
for client in WebClients:
@@ -35,85 +34,93 @@ def desubscribe_removed_streams(concaten
class IOTAPI(WebSocket):
- def __init__(self):
- super(IOTAPI, self).__init__()
+ """Client WebSocket"""
+
+ def __init__(self, server, sock, address):
+ super(IOTAPI, self).__init__(server, sock, address)
self._subscriptions = {} # dictionary of schema + '.' + stream ->
IOTStream
self._locker = RWLock()
- def handleMessage(self):
+ def sendMessage(self, message): # overriden
+ super(IOTAPI, self).sendMessage(json.dumps(message)) # send JSON
Strings to clients
+
+ def handleConnected(self): # overriden
+ WebClientsLock.acquire_write()
+ WebClients.append(self)
+ WebClientsLock.release()
+ add_log(20, 'Client connected: ' + self.address[0])
+
+ def handleClose(self): # overriden
+ WebClientsLock.acquire_write()
+ WebClients.remove(self)
+ WebClientsLock.release()
+ add_log(20, 'Client disconnected: ' + self.address[0])
+
+ def handleMessage(self): # overriden
if self.opcode != 0x1: # TEXT frame
self.sendMessage({"error": "Only TEXT frames allowed!"})
try:
input_schema = json.loads(self.data)
Client_Messages_Validator.validate(input_schema)
+ concatenated_name =
IOTStreams.get_context_entry_name(input_schema['schema'],
input_schema['stream'])
- if input_schema['action'] in ("sub", "subscribe"):
- concatenated_name =
IOTStreams.get_context_entry_name(input_schema['schema'],
input_schema['stream'])
- self.subscribe(self, concatenated_name)
- elif input_schema['action'] in ("desub", "desubscribe"):
- concatenated_name =
IOTStreams.get_context_entry_name(input_schema['schema'],
input_schema['stream'])
- self.desubscribe(self, concatenated_name)
+ if input_schema['action'] in SUBSCRIBE_OPTS:
+ self.subscribe(concatenated_name)
+ elif input_schema['action'] in UNSUBSCRIBE_OPTS:
+ self.unsubscribe(concatenated_name)
+ elif input_schema['action'] in READ_OPTS:
+ self.read_stream_batch(concatenated_name,
int(input_schema['basket']), int(input_schema['limit']),
+ int(input_schema['offset']))
except BaseException as ex:
+ self.sendMessage({"error": ex})
add_log(50, ex)
- self.sendMessage(json.dumps({"error": ex}))
-
- def handleConnected(self):
- WebClientsLock.acquire_write()
- WebClients.append(self)
- WebClientsLock.release()
- add_log(20, 'Client connected: ' + self.address[0])
-
- def handleClose(self):
- WebClientsLock.acquire_write()
- WebClients.remove(self)
- WebClientsLock.release()
- add_log(20, 'Client disconnected: ' + self.address[0])
def subscribe(self, concatenated_name):
- try:
- stream = Streams_context.get_existing_stream(concatenated_name)
- except:
- raise
+ stream = Streams_context.get_existing_stream(concatenated_name)
self._locker.acquire_write()
self._subscriptions[concatenated_name] = stream
self._locker.release()
- self.sendMessage(json.dumps({"subscribed": "Subscribed to " +
concatenated_name}))
+ self.sendMessage({"subscribed": "Subscribed to " + concatenated_name})
add_log(20, ''.join(['Client ', self.address[0], 'subscribed stream ',
concatenated_name]))
- def desubscribe(self, concatenated_name):
+ def unsubscribe(self, concatenated_name):
self._locker.acquire_write()
if concatenated_name not in self._subscriptions:
self._locker.release()
- self.sendMessage(json.dumps({"error": "Stream " +
concatenated_name + " not present in subscriptions!"}))
+ self.sendMessage({"error": "Stream " + concatenated_name + " not
present in subscriptions!"})
else:
del self._subscriptions[concatenated_name]
self._locker.release()
- self.sendMessage(json.dumps({"desubscribed": "Desubscribed to " +
concatenated_name}))
- add_log(20, ''.join(['Client ', self.address[0], 'desubscribed
stream ', concatenated_name]))
+ self.sendMessage({"unsubscribed": "Unsubscribed to " +
concatenated_name})
+ add_log(20, ''.join(['Client ', self.address[0], ' unsubscribed
stream ', concatenated_name]))
def remove_subscribed_stream(self, concatenated_name):
self._locker.acquire_write()
if concatenated_name in self._subscriptions:
del self._subscriptions[concatenated_name]
self._locker.release()
- self.sendMessage(json.dumps({"removed": "Stream removed from context:
" + concatenated_name}))
+ self.sendMessage({"removed": "Stream removed from context: " +
concatenated_name})
- def send_notification_message(self, concatenated_name, json_message):
+ def send_notification_message(self, concatenated_name, schema_name,
stream_name, count):
self._locker.acquire_read()
if concatenated_name in self._subscriptions:
self._locker.release()
- self.sendMessage(json_message)
+ self.sendMessage({'notification': {'schema': schema_name,
'stream': stream_name, 'tuples': count}})
add_log(20, ''.join(['Stream notification sent to client ',
self.address[0]]))
else:
self._locker.release()
+ def read_stream_batch(self, concatenated_name, basket_number, limit,
offset):
+ stream = Streams_context.get_existing_stream(concatenated_name)
+ self.sendMessage(stream.read_tuples(basket_number, limit, offset))
+
def init_websockets(host, port):
global WebSocketServer
try:
WebSocketServer = SimpleWebSocketServer(host, port, IOTAPI)
WebSocketServer.serveforever()
- except (Exception, OSError) as ex:
+ except (BaseException, OSError) as ex:
print >> sys.stdout, ex
add_log(50, ex)
sys.exit(1)
diff --git a/clients/iotclient/src/Settings/mapiconnection.py
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -1,7 +1,7 @@
+import getpass
import sys
+
import pymonetdb
-import getpass
-
from Settings.iotlogger import add_log
Connection = None
@@ -37,12 +37,11 @@ def mapi_create_stream(schema, stream, c
except:
pass
- try: # attempt to create te stream table
+ try: # attempt to create the stream table
Connection.execute("SET SCHEMA " + schema + ";")
Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (",
columns, ");"]))
except BaseException as ex:
add_log(40, ex)
- pass
def mapi_flush_baskets(schema, stream, baskets):
@@ -55,4 +54,3 @@ def mapi_flush_baskets(schema, stream, b
Connection.execute(''.join(["CALL iot.basket('", schema, "','",
stream, "','", baskets, "');"]))
except BaseException as ex:
add_log(40, ex)
- pass
diff --git a/clients/iotclient/src/Streams/datatypes.py
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -2,21 +2,17 @@ import copy
import datetime
import itertools
import math
+import re
import struct
+from abc import ABCMeta, abstractmethod
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list