Changeset: 82ab5c86a247 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=82ab5c86a247
Modified Files:
clients/iotapi/src/Settings/mapiconnection.py
clients/iotapi/src/Streams/streams.py
clients/iotapi/src/WebSockets/websockets.py
clients/iotclient/src/Flask/restresources.py
clients/iotclient/src/Settings/mapiconnection.py
clients/iotclient/src/Streams/datatypes.py
clients/iotclient/src/Streams/streams.py
clients/iotclient/src/Streams/streamscontext.py
clients/iotclient/src/main.py
Branch: iot
Log Message:
Several bugfixes
diffs (truncated from 301 to 300 lines):
diff --git a/clients/iotapi/src/Settings/mapiconnection.py
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -33,7 +33,7 @@ def fetch_streams():
try: # TODO paginate results?
cursor = Connection.cursor()
sql_string = """SELECT storage."schema", storage."table",
storage."column", storage."type", storage."typewidth"
- FROM (SELECT "schema", "table", "column", "type" FROM sys.storage)
AS storage
+ FROM (SELECT "schema", "table", "column", "type", "typewidth" FROM
sys.storage) AS storage
INNER JOIN (SELECT "name" FROM sys.tables WHERE type=4) AS tables ON
(storage."table"=tables."name")
INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON
(storage."schema"=schemas."name");"""\
.replace('\n', ' ')
diff --git a/clients/iotapi/src/Streams/streams.py
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -4,7 +4,7 @@ import struct
from datatypes import LITTLE_ENDIAN_ALIGNMENT
from Settings.filesystem import get_baskets_base_location
from Utilities.readwritelock import RWLock
-from WebSockets.websockets import notify_clients
+from WebSockets.websockets import notify_stream_inserts_to_clients
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
@@ -30,7 +30,7 @@ class StreamBasketsHandler(FileSystemEve
if isinstance(event, 'DirCreatedEvent'):
basket_string = os.path.basename(os.path.normpath(event.src_path))
count = self._stream.append_basket(basket_string)
- notify_clients(self._stream.get_schema_name(),
self._stream.get_stream_name(), count)
+ notify_stream_inserts_to_clients(self._stream.get_schema_name(),
self._stream.get_stream_name(), count)
def on_deleted(self, event):
if isinstance(event, 'DirDeletedEvent'):
diff --git a/clients/iotapi/src/WebSockets/websockets.py
b/clients/iotapi/src/WebSockets/websockets.py
--- a/clients/iotapi/src/WebSockets/websockets.py
+++ b/clients/iotapi/src/WebSockets/websockets.py
@@ -3,7 +3,6 @@ import sys
from Settings.iotlogger import add_log
from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
-from Streams.streamscontext import Streams_context, IOTStreams
from Utilities.readwritelock import RWLock
from jsonschema import Draft4Validator, FormatChecker
@@ -15,14 +14,6 @@ WebClients = [] # TODO this probably wo
WebClientsLock = RWLock()
-def notify_stream_inserts_to_clients(schema_name, stream_name, count):
- concatenated_name = IOTStreams.get_context_entry_name(schema_name,
stream_name)
- WebClientsLock.acquire_read()
- for client in WebClients:
- client.send_notification_message(concatenated_name, schema_name,
stream_name, count)
- WebClientsLock.release()
-
-
def unsubscribe_removed_streams(concatenated_names):
WebClientsLock.acquire_read()
for name in concatenated_names:
@@ -32,6 +23,16 @@ def unsubscribe_removed_streams(concaten
for name in concatenated_names:
add_log(20, ''.join(['Stream ', name, ' removed']))
+from Streams.streamscontext import Streams_context, IOTStreams # avoid
circular dependency
+
+
+def notify_stream_inserts_to_clients(schema_name, stream_name, count):
+ concatenated_name = IOTStreams.get_context_entry_name(schema_name,
stream_name)
+ WebClientsLock.acquire_read()
+ for client in WebClients:
+ client.send_notification_message(concatenated_name, schema_name,
stream_name, count)
+ WebClientsLock.release()
+
class IOTAPI(WebSocket):
"""Client WebSocket"""
diff --git a/clients/iotclient/src/Flask/restresources.py
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -8,7 +8,7 @@ from flask_restful import Resource
from jsonschema import Draft4Validator, FormatChecker
from tzlocal import get_localzone
from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA
-from Streams.streamscontext import IOTStreams
+from Streams.streamscontext import IOTStreamsContext
from Settings.iotlogger import add_log
Streams_Context = None
@@ -23,7 +23,7 @@ def init_rest_resources():
Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA,
format_checker=FormatChecker())
Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA,
format_checker=FormatChecker())
try:
- Streams_Context = IOTStreams()
+ Streams_Context = IOTStreamsContext()
except BaseException as ex:
print >> sys.stdout, ex
add_log(50, ex)
@@ -80,6 +80,8 @@ class StreamsHandling(Resource):
add_log(50, ex)
return ex, 400
else:
+ add_log(20, ''.join['The stream ', schema_to_validate['schema'],
'.', schema_to_validate['stream'],
+ ' was created'])
return 'The stream was created with success!', 201
def delete(self):
@@ -95,4 +97,6 @@ class StreamsHandling(Resource):
except BaseException as ex:
add_log(50, ex)
return ex, 404
+ add_log(20, ''.join['The stream ', schema_to_validate['schema'], '.',
schema_to_validate['stream'],
+ ' was deleted'])
return 'The stream was deleted with success!', 204
diff --git a/clients/iotclient/src/Settings/mapiconnection.py
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -1,4 +1,3 @@
-import getpass
import sys
import pymonetdb
@@ -7,13 +6,11 @@ from Settings.iotlogger import add_log
Connection = None
-def init_monetdb_connection(hostname, port, user_name, database):
+def init_monetdb_connection(hostname, port, user_name, connection_password,
database):
global Connection
- user_password = getpass.getpass(prompt='Insert password for user ' +
user_name + ':')
-
try: # the autocommit is set to true so each statement will be independent
- Connection = pymonetdb.connect(hostname=hostname, port=port,
username=user_name, password=user_password,
+ Connection = pymonetdb.connect(hostname=hostname, port=port,
username=user_name, password=connection_password,
database=database, autocommit=True)
log_message = 'User %s connected successfully to database %s' %
(user_name, database)
print >> sys.stdout, log_message
diff --git a/clients/iotclient/src/Streams/datatypes.py
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -4,11 +4,9 @@ import itertools
import math
import re
import struct
+
+from dateutil import parser
from abc import ABCMeta, abstractmethod
-
-import dateutil
-from dateutil import parser
-
from jsonschemas import UUID_REGEX, MAC_ADDRESS_REGEX, TIME_REGEX, IPV4_REGEX
# The null constants might change from system to system due to different CPU's
limits
@@ -704,7 +702,7 @@ class TimestampType(BaseDateTimeType):
schema[self._column_name]['format'] = 'date-time'
def parse_entry(self, entry):
- parsed_timestamp = dateutil.parser.parse(entry)
+ parsed_timestamp = parser.parse(entry)
if not self._has_timezone:
parsed_timestamp = parsed_timestamp.replace(tzinfo=None)
return parsed_timestamp
diff --git a/clients/iotclient/src/Streams/streams.py
b/clients/iotclient/src/Streams/streams.py
--- a/clients/iotclient/src/Streams/streams.py
+++ b/clients/iotclient/src/Streams/streams.py
@@ -120,8 +120,8 @@ class BaseIOTStream(object):
def flush_baskets(self, last=False): # the monitor has to be acquired in
write mode before running this method!!!
# write the tuple count in the basket
- basket_counter_file_pointer =
open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+")
- basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT
+ "1i", self._tuples_in_per_basket))
+ basket_counter_file_pointer =
open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+b")
+ basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT
+ "i", self._tuples_in_per_basket))
basket_counter_file_pointer.flush()
basket_counter_file_pointer.close()
mapi_flush_baskets(self._schema_name, self._stream_name,
self._current_base_path)
@@ -227,16 +227,19 @@ class TupleBasedStream(BaseIOTStream):
def validate_and_insert(self, new_data, timestamp):
super(TupleBasedStream, self).validate_and_insert(new_data, timestamp)
+ flag = False
self._monitor.acquire_write()
try:
if self._tuples_in_per_basket >= self._limit:
self.flush_baskets(last=False)
+ flag = True
except BaseException as ex:
self._monitor.release()
add_log(50, ex)
else:
self._monitor.release()
- add_log(20, 'Flushed stream %s.%s baskets' % (self._schema_name,
self._stream_name))
+ if flag:
+ add_log(20, 'Flushed stream %s.%s baskets' %
(self._schema_name, self._stream_name))
class TimeBasedStream(BaseIOTStream):
@@ -258,16 +261,19 @@ class TimeBasedStream(BaseIOTStream):
return {'base': 'time', 'unit': self._time_unit, 'interval':
self._interval}
def time_based_flush(self):
+ flag = False
self._monitor.acquire_write()
try:
- if self._tuples_in_per_basket > 0:
+ if self._tuples_in_per_basket > 0: # flush only when there are
tuples in the baskets
self.flush_baskets(last=False)
+ flag = True
except BaseException as ex:
self._monitor.release()
add_log(50, ex)
else:
self._monitor.release()
- add_log(20, 'Flushed stream %s.%s baskets' % (self._schema_name,
self._stream_name))
+ if flag:
+ add_log(20, 'Flushed stream %s.%s baskets' %
(self._schema_name, self._stream_name))
def start_stream(self):
self._local_thread.start() # start the time based flush on another
thread
diff --git a/clients/iotclient/src/Streams/streamscontext.py
b/clients/iotclient/src/Streams/streamscontext.py
--- a/clients/iotclient/src/Streams/streamscontext.py
+++ b/clients/iotclient/src/Streams/streamscontext.py
@@ -1,10 +1,11 @@
import json
+import collections
from Settings.filesystem import get_configfile_location
from Utilities.readwritelock import RWLock
-
+from jsonschema import Draft4Validator, FormatChecker
from jsonschemas import CONFIG_FILE_SCHEMA
-from streamscreator import *
+from streamscreator import validate_schema_and_create_stream
Config_File_Location = None
Config_File_Validator = None
diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py
--- a/clients/iotclient/src/main.py
+++ b/clients/iotclient/src/main.py
@@ -1,4 +1,5 @@
import getopt
+import getpass
import signal
import sys
import time
@@ -21,14 +22,27 @@ def signal_handler(signal, frame):
subprocess.terminate()
-def start_process(admin_host, admin_port, app_host, app_port):
+def start_process(admin_host, admin_port, app_host, app_port, host_identifier,
new_configfile_location,
+ connection_hostname, connection_port, connection_user,
connection_password, connection_database):
+ # WARNING The initiation order must be this!!!
+ init_logging() # init logging context
+ init_file_system(host_identifier, new_configfile_location) # init
filesystem
+ init_streams_hosts() # init hostname column for streams
+ # init mapi connection
+ init_monetdb_connection(connection_hostname, connection_port,
connection_user, connection_password,
+ connection_database)
+ init_streams_context() # init streams context
+ init_rest_resources() # init validators for RESTful requests
+
thread1 = Thread(target=start_flask_admin_app, args=(admin_host,
admin_port))
thread2 = Thread(target=start_flask_iot_app, args=(app_host, app_port))
thread1.start()
time.sleep(1) # problem while handling Flask's loggers, so it is used
this sleep
thread2.start()
+ add_log(20, 'Started IOT Stream Server')
thread1.join()
thread2.join()
+ add_log(20, 'Stopped IOT Stream Server')
def main(argv):
@@ -93,21 +107,13 @@ def main(argv):
if not use_host_identifier: # in case of the user sets the
host_identifier but not the use_host_identifier flag
host_identifier = None
- # WARNING The initiation order must be this!!!
- init_logging() # init logging context
- init_file_system(host_identifier, new_configfile_location) # init
filesystem
- init_streams_hosts() # init hostname column for streams
- # init mapi connection
- init_monetdb_connection(connection_hostname, connection_port,
connection_user, connection_database)
- init_streams_context() # init streams context
- init_rest_resources() # init validators for RESTful requests
-
- subprocess = Process(target=start_process, args=(admin_host, admin_port,
app_host, app_port))
+ connection_password = getpass.getpass(prompt='Insert password for user ' +
connection_user + ':')
+ subprocess = Process(target=start_process, args=(admin_host, admin_port,
app_host, app_port, host_identifier,
+ new_configfile_location,
connection_hostname, connection_port,
+ connection_user,
connection_password, connection_database))
subprocess.start()
- add_log(20, 'Started IOT Stream Server')
signal.signal(signal.SIGINT, signal_handler)
subprocess.join()
- add_log(20, 'Stopped IOT Stream Server')
if __name__ == "__main__":
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list