Changeset: b53bfefff4b3 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b53bfefff4b3
Modified Files:
clients/iotapi/src/Streams/streampolling.py
clients/iotapi/src/Streams/streams.py
clients/iotapi/src/WebSockets/websockets.py
clients/iotclient/src/Streams/datatypes.py
clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:
Corrected basket searching
diffs (243 lines):
diff --git a/clients/iotapi/src/Streams/streampolling.py
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -5,15 +5,17 @@ from Utilities.customthreading import Pe
from streamscontext import Streams_context, DataCellStream
-SWITCHER = [{'types': ['clob', 'char', 'varchar'], 'class': 'TextType'},
- {'types': ['boolean'], 'class': 'BooleanType'},
+SWITCHER = [{'types': ['clob', 'char', 'varchar', 'url'], 'class': 'TextType'},
{'types': ['tinyint', 'smallint', 'int', 'bigint'], 'class':
'SmallIntegerType'},
{'types': ['hugeint'], 'class': 'HugeIntegerType'},
{'types': ['real', 'double'], 'class': 'FloatType'},
{'types': ['decimal'], 'class': 'DecimalType'},
+ {'types': ['boolean'], 'class': 'BooleanType'},
{'types': ['date'], 'class': 'DateType'},
{'types': ['time'], 'class': 'TimeType'},
- {'types': ['timestamp'], 'class': 'TimestampType'}]
+ {'types': ['timestamp'], 'class': 'TimestampType'},
+ {'types': ['inet'], 'class': 'INetType'},
+ {'types': ['uuid'], 'class': 'UUIDType'}]
def init_stream_polling_thread(interval):
diff --git a/clients/iotapi/src/Streams/streams.py
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -2,12 +2,14 @@ import struct
import os
from Settings.filesystem import get_baskets_base_location
+from Utilities.readwritelock import RWLock
from WebSockets.websockets import notify_clients
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
BASKETS_COUNT_FILE = 'count'
+
def represents_int(s):
try:
int(s)
@@ -26,56 +28,95 @@ class StreamBasketsHandler(FileSystemEve
def on_created(self, event): # whenever a basket directory is created,
notify to subscribed clients
if isinstance(event, 'DirCreatedEvent'):
basket_string = os.path.basename(os.path.normpath(event.src_path))
- self._stream.baskets.append_basket(basket_string)
- notify_clients(self._stream.schema_name, self._stream.stream_name)
+ self._stream.append_basket(basket_string)
+ notify_clients(self._stream.get_schema_name(),
self._stream.get_stream_name())
+ def on_deleted(self, event):
+ if isinstance(event, 'DirDeletedEvent'):
+ basket_string = os.path.basename(os.path.normpath(event.src_path))
+ self._stream.delete_basket(basket_string)
-class DataCellStream(object):
+
+class IOTStream(object):
"""Representation of a stream"""
def __init__(self, schema_name, stream_name, columns):
- self.schema_name = schema_name # name of the schema
- self.stream_name = stream_name # name of the stream
+ self._schema_name = schema_name # name of the schema
+ self._stream_name = stream_name # name of the stream
self._columns = columns # dictionary of name -> data_types
self._base_path = os.path.join(get_baskets_base_location(),
schema_name, stream_name)
- self.baskets = {} # dictionary of basket_number -> total_tuples
+ self._baskets = {} # dictionary of basket_number -> total_tuples
for name in os.listdir(self._base_path):
self.append_basket(name)
+ self._lock = RWLock()
self._observer = Observer()
self._observer.schedule(StreamBasketsHandler(stream=self),
self._base_path, recursive=False)
self._observer.start()
+ def get_schema_name(self):
+ return self._schema_name
+
+ def get_stream_name(self):
+ return self._stream_name
+
def append_basket(self, path):
if represents_int(path):
with open(os.path.join(self._base_path, path)) as f:
count = struct.unpack('i', f.read(4))[0]
- self.baskets[int(path)] = count
+ self._lock.acquire_write()
+ self._baskets[int(path)] = count
+ self._lock.release()
- # TODO add delete basket!!!!
+ def delete_basket(self, path):
+ if represents_int(path):
+ number = int(path)
+ self._lock.acquire_write()
+ if number in self._baskets:
+ del self._baskets[number]
+ self._lock.release()
+
+ def get_next_basket_number_tuple(self, basket_number):
+ self._lock.acquire_read()
+ if basket_number in self._baskets:
+ self._lock.release()
+ return basket_number, self._baskets[basket_number]
+ else:
+ filtered = filter(lambda x: x > basket_number,
self._baskets.keys())
+ if len(filtered) > 0:
+ min_basket_number = min(filtered)
+ min_basket_tuples = self._baskets[min_basket_number]
+ self._lock.release()
+ return min_basket_number, min_basket_tuples
+ else:
+ self._lock.release()
+ return None, None
def read_tuples(self, basket_number, limit, offset):
results = {column: [] for column in self._columns.keys()}
- current_basket = int(basket_number)
+ current_basket_number = int(basket_number)
read_tuples = 0
+ skipped_tuples = 0
finished = False
while True:
- if current_basket not in self.baskets:
+ current_basket_number, current_tuple_number =
self.get_next_basket_number_tuple(current_basket_number)
+ if current_basket_number is None:
finished = True
break
- offset -= self.baskets[current_basket]
- if offset < 0:
+ if skipped_tuples + current_tuple_number > offset:
+ offset = offset - skipped_tuples
break
- current_basket += 1
+ skipped_tuples += current_tuple_number
+ current_basket_number += 1
if not finished:
- offset = abs(offset)
+ while True:
+ current_basket_number, current_tuple_number =
self.get_next_basket_number_tuple(current_basket_number)
+ if current_basket_number is None or read_tuples >= limit:
+ break
- while True:
- if current_basket not in self.baskets:
- break
- next_path = os.path.join(self._base_path, str(current_basket))
- next_read_size = min(self.baskets[current_basket], limit)
+ next_path = os.path.join(self._base_path,
str(current_basket_number))
+ next_read_size = min(limit - read_tuples,
current_tuple_number) - offset
for key, column in self._columns.iteritems():
next_file_name = os.path.join(next_path, key)
@@ -85,12 +126,11 @@ class DataCellStream(object):
file_pointer = open(next_file_name, open_string)
results[key].append(column.read_next_batch(file_pointer,
offset, next_read_size))
+ read_tuples += next_read_size
offset = 0
- current_basket += 1
- read_tuples += next_read_size
- limit -= self.baskets[current_basket]
- if limit <= 0:
- break
+ current_basket_number += 1
# TODO check if this is viable, it could be 1000 tuples!!!!
- return {'total': read_tuples, 'tuples': zip(*results)} # TODO not
done this way!!!
+ keys = results.keys()
+ tuples = [dict(zip(keys, values)) for values in zip(*(results[k] for k
in keys))]
+ return {'total': read_tuples, 'tuples': tuples}
diff --git a/clients/iotapi/src/WebSockets/websockets.py
b/clients/iotapi/src/WebSockets/websockets.py
--- a/clients/iotapi/src/WebSockets/websockets.py
+++ b/clients/iotapi/src/WebSockets/websockets.py
@@ -37,7 +37,7 @@ def desubscribe_removed_streams(concaten
class IOTAPI(WebSocket):
def __init__(self):
super(IOTAPI, self).__init__()
- self._subscriptions = {}
+ self._subscriptions = {} # dictionary of schema + '.' + stream ->
IOTStream
self._locker = RWLock()
def handleMessage(self):
diff --git a/clients/iotclient/src/Streams/datatypes.py
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -259,14 +259,14 @@ class EnumType(TextType):
return ''.join(array)
-class INetSix(TextType):
+class INetSixType(TextType):
"""Covers: Inet6"""
def __init__(self, **kwargs):
- super(INetSix, self).__init__(**kwargs)
+ super(INetSixType, self).__init__(**kwargs)
def add_json_schema_entry(self, schema):
- super(INetSix, self).add_json_schema_entry(schema)
+ super(INetSixType, self).add_json_schema_entry(schema)
schema[self._column_name]['format'] = 'ipv6'
#
http://stackoverflow.com/questions/166132/maximum-length-of-the-textual-representation-of-an-ipv6-address
@@ -274,17 +274,17 @@ class INetSix(TextType):
array[2] = 'char(45)'
-class INet(StreamDataType):
+class INetType(StreamDataType):
"""Covers: Inet"""
def __init__(self, **kwargs):
- super(INet, self).__init__(**kwargs)
+ super(INetType, self).__init__(**kwargs)
def get_nullable_constant(self):
return "0" # has to trick because it is impossible to get a null
value from a valid IPv4 address in MonetDB
def add_json_schema_entry(self, schema):
- super(INet, self).add_json_schema_entry(schema)
+ super(INetType, self).add_json_schema_entry(schema)
schema[self._column_name]['pattern'] = IPV4_REGEX
def process_next_value(self, entry, counter, parameters, errors):
diff --git a/clients/iotclient/src/Streams/streamscreator.py
b/clients/iotclient/src/Streams/streamscreator.py
--- a/clients/iotclient/src/Streams/streamscreator.py
+++ b/clients/iotclient/src/Streams/streamscreator.py
@@ -15,8 +15,8 @@ SWITCHER = [{'types': ['text', 'string',
{'types': ['uuid'], 'class': 'UUIDType'},
{'types': ['mac'], 'class': 'MACType'},
{'types': ['url'], 'class': 'URLType'},
- {'types': ['inet'], 'class': 'INet'},
- {'types': ['inet6'], 'class': 'INetSix'},
+ {'types': ['inet'], 'class': 'INetType'},
+ {'types': ['inet6'], 'class': 'INetSixType'},
{'types': ['regex'], 'class': 'RegexType'},
{'types': ['char', 'character', 'varchar', 'character varying'],
'class': 'LimitedTextType'},
{'types': ['enum'], 'class': 'EnumType'},
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list