Changeset: dae88d156163 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=dae88d156163
Modified Files:
clients/iotclient/documentation/conf.py
clients/iotclient/src/Flask/app.py
clients/iotclient/src/Flask/restresources.py
clients/iotclient/src/Settings/filesystem.py
clients/iotclient/src/Settings/mapiconnection.py
clients/iotclient/src/Streams/streams.py
clients/iotclient/src/Streams/streamscreator.py
clients/iotclient/src/main.py
Branch: iot
Log Message:
Made important corrections
diffs (truncated from 302 to 300 lines):
diff --git a/clients/iotclient/documentation/conf.py
b/clients/iotclient/documentation/conf.py
--- a/clients/iotclient/documentation/conf.py
+++ b/clients/iotclient/documentation/conf.py
@@ -12,8 +12,8 @@
# All configuration values have a default; values that are commented out
# serve to show the default.
-import sys
-import os
+# import sys
+# import os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
@@ -110,7 +110,7 @@ pygments_style = 'sphinx'
# If true, keep warnings as "system message" paragraphs in the built documents.
#keep_warnings = False
-# If true, `todo` and `todoList` produce output, else they produce nothing.
+# If true, `ttodo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True
diff --git a/clients/iotclient/src/Flask/app.py
b/clients/iotclient/src/Flask/app.py
--- a/clients/iotclient/src/Flask/app.py
+++ b/clients/iotclient/src/Flask/app.py
@@ -1,6 +1,5 @@
from flask import Flask
from flask_restful import Api
-
from restresources import StreamInput, StreamsInfo, StreamsHandling
diff --git a/clients/iotclient/src/Flask/restresources.py
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -8,38 +8,37 @@ from flask_restful import Resource
from jsonschema import Draft4Validator, FormatChecker
from tzlocal import get_localzone
from Streams.jsonschemas import CREATE_STREAMS_SCHEMA, DELETE_STREAMS_SCHEMA
-from Streams.streamscontext import IOTStreamsException, IOTStreams
+from Streams.streamscontext import IOTStreams
from Settings.iotlogger import add_log
Streams_Context = None
Create_Streams_Validator = None
Delete_Streams_Validator = None
-Local_Timezone = None
+Local_Timezone = get_localzone() # for the correction of dates we must add
the system's timezone
def init_rest_resources():
global Streams_Context, Create_Streams_Validator,
Delete_Streams_Validator, Local_Timezone
- Local_Timezone = get_localzone() # for the correction of dates we must
add the system's timezone
Create_Streams_Validator = Draft4Validator(CREATE_STREAMS_SCHEMA,
format_checker=FormatChecker())
Delete_Streams_Validator = Draft4Validator(DELETE_STREAMS_SCHEMA,
format_checker=FormatChecker())
try:
Streams_Context = IOTStreams()
except BaseException as ex:
print >> sys.stdout, ex
- add_log(50, ex.message)
+ add_log(50, ex)
sys.exit(1)
-class StreamInput(Resource): # TODO these operations are not atomic!!!
+class StreamInput(Resource):
"""RESTful API for stream's input"""
def get(self, schema_name, stream_name): # check a single stream data
try: # check if stream exists, if not return 404
stream = Streams_Context.get_existing_stream(schema_name,
stream_name)
except BaseException as ex:
- add_log(50, ex.message)
- return ex.message, 404
+ add_log(50, ex)
+ return ex, 404
return stream.get_data_dictionary(include_number_tuples=True), 200
def post(self, schema_name, stream_name): # add data to a stream
@@ -48,14 +47,14 @@ class StreamInput(Resource): # TODO the
try: # check if stream exists, if not return 404
stream = Streams_Context.get_existing_stream(schema_name,
stream_name)
except BaseException as ex:
- add_log(50, ex.message)
- return ex.message, 404
+ add_log(50, ex)
+ return ex, 404
try: # validate and insert data, if not return 400
stream.validate_and_insert(json.loads(request.data), current_stamp)
except BaseException as ex:
- add_log(50, ex.message)
- return ex.message, 400
+ add_log(50, ex)
+ return ex, 400
return 'The insertions were made with success!', 201
@@ -78,8 +77,8 @@ class StreamsHandling(Resource):
Create_Streams_Validator.validate(schema_to_validate)
Streams_Context.add_new_stream(schema_to_validate)
except BaseException as ex:
- add_log(50, ex.message)
- return ex.message, 400
+ add_log(50, ex)
+ return ex, 400
else:
return 'The stream was created with success!', 201
@@ -88,12 +87,12 @@ class StreamsHandling(Resource):
schema_to_validate = json.loads(request.data)
Delete_Streams_Validator.validate(schema_to_validate)
except BaseException as ex:
- add_log(50, ex.message)
- return ex.message, 400
+ add_log(50, ex)
+ return ex, 400
try: # check if stream exists, if not return 404
Streams_Context.delete_existing_stream(schema_to_validate)
except BaseException as ex:
- add_log(50, ex.message)
- return ex.message, 404
+ add_log(50, ex)
+ return ex, 404
return 'The stream was deleted with success!', 204
diff --git a/clients/iotclient/src/Settings/filesystem.py
b/clients/iotclient/src/Settings/filesystem.py
--- a/clients/iotclient/src/Settings/filesystem.py
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -39,7 +39,7 @@ def init_file_system(host_identifier=Non
Host_Identifier = host_identifier
except (Exception, OSError) as ex:
print >> sys.stdout, ex
- add_log(50, ex.message)
+ add_log(50, ex)
sys.exit(1)
diff --git a/clients/iotclient/src/Settings/mapiconnection.py
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -22,8 +22,8 @@ def init_monetdb_connection(hostname, po
print >> sys.stdout, log_message
add_log(20, log_message)
except BaseException as ex:
- print >> sys.stdout, ex.message
- add_log(50, ex.message)
+ print >> sys.stdout, ex
+ add_log(50, ex)
sys.exit(1)
@@ -38,15 +38,21 @@ def mapi_create_stream(schema, stream, c
pass
try: # attempt to create te stream table
- Connection.execute(''.join(["CREATE STREAM TABLE ", schema, ".",
stream, " (", columns, ");"]))
+ Connection.execute("SET SCHEMA " + schema + ";")
+ Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (",
columns, ");"]))
except BaseException as ex:
- add_log(40, ex.message)
+ add_log(40, ex)
pass
def mapi_flush_baskets(schema, stream, baskets):
try:
- Connection.execute(''.join(["CALL iot.push(\"", schema, "\",\"",
stream, "\",\"", baskets, "\");"]))
+ Connection.execute("SET SCHEMA iot;")
+ except:
+ pass
+
+ try:
+ Connection.execute(''.join(["CALL iot.basket('", schema, "','",
stream, "','", baskets, "');"]))
except BaseException as ex:
- add_log(40, ex.message)
+ add_log(40, ex)
pass
diff --git a/clients/iotclient/src/Streams/streams.py
b/clients/iotclient/src/Streams/streams.py
--- a/clients/iotclient/src/Streams/streams.py
+++ b/clients/iotclient/src/Streams/streams.py
@@ -38,8 +38,8 @@ def init_streams_hosts():
class StreamException(Exception):
"""Exception fired when the validation of a stream insert fails"""
- def __init__(self, messages):
- self.message = messages # dictionary of column -> list of error
messages
+ def __init__(self, message):
+ self.message = message # dictionary of column -> list of error
messages
class DataCellStream(object):
@@ -134,7 +134,7 @@ class DataCellStream(object):
if self._tuples_in_per_basket > 0:
self.flush_baskets(last)
except BaseException as ex:
- add_log(50, ex.message)
+ add_log(50, ex)
self._monitor.release()
def validate_and_insert(self, new_data, timestamp):
@@ -170,7 +170,7 @@ class DataCellStream(object):
try:
transposed_data[key] = data_type.process_values(values) #
convert into binary
except DataValidationException as ex:
- batch_errors[key] = ex.message
+ batch_errors[key] = ex
if batch_errors:
raise StreamException(message=batch_errors)
@@ -213,8 +213,9 @@ class DataCellStream(object):
self._tuples_in_per_basket += total_tuples
if is_flushing_tuple_based and self._tuples_in_per_basket >=
self._flush_method.limit:
self.flush_baskets(last=False)
-
+ except BaseException as ex:
+ self._monitor.release()
+ add_log(50, ex)
+ else:
+ self._monitor.release()
add_log(20, 'Inserted %d tuples to stream %s.%s' % (total_tuples,
self._schema_name, self._stream_name))
- except BaseException as ex:
- add_log(50, ex.message)
- self._monitor.release()
diff --git a/clients/iotclient/src/Streams/streamscreator.py
b/clients/iotclient/src/Streams/streamscreator.py
--- a/clients/iotclient/src/Streams/streamscreator.py
+++ b/clients/iotclient/src/Streams/streamscreator.py
@@ -53,7 +53,7 @@ def validate_schema_and_create_stream(sc
new_column.set_nullable(column['nullable'])
validated_columns[next_name] = new_column
except Exception as ex:
- errors[next_name] = ex.message
+ errors[next_name] = ex
break
if errors:
diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py
--- a/clients/iotclient/src/main.py
+++ b/clients/iotclient/src/main.py
@@ -1,8 +1,10 @@
import getopt
import signal
import sys
+import time
from multiprocessing import Process
+from threading import Thread
from uuid import getnode as get_mac
from Flask.app import start_flask_iot_app, start_flask_admin_app
from Flask.restresources import init_rest_resources
@@ -12,17 +14,25 @@ from Settings.mapiconnection import init
from Streams.streamscontext import init_streams_context
from Streams.streams import init_streams_hosts
-subprocess1 = None
-subprocess2 = None
+subprocess = None
def signal_handler(signal, frame):
- subprocess1.terminate()
- subprocess2.terminate()
+ subprocess.terminate()
+
+
+def start_process(admin_host, admin_port, app_host, app_port):
+ thread1 = Thread(target=start_flask_admin_app, args=(admin_host,
admin_port))
+ thread2 = Thread(target=start_flask_iot_app, args=(app_host, app_port))
+ thread1.start()
+ time.sleep(1) # problem while handling Flask's loggers, so it is used
this sleep
+ thread2.start()
+ thread1.join()
+ thread2.join()
def main(argv):
- global subprocess1, subprocess2
+ global subprocess
try:
opts, args = getopt.getopt(argv[1:], 'f:l:c:ui:in:ih:ip:ah:ap:h:p:d:u',
@@ -92,15 +102,11 @@ def main(argv):
init_streams_context() # init streams context
init_rest_resources() # init validators for RESTful requests
- subprocess1 = Process(target=start_flask_admin_app, args=(admin_host,
admin_port))
- subprocess2 = Process(target=start_flask_iot_app, args=(app_host,
app_port))
- subprocess1.start()
- subprocess2.start()
+ subprocess = Process(target=start_process, args=(admin_host, admin_port,
app_host, app_port))
+ subprocess.start()
add_log(20, 'Started IOT Stream Server')
signal.signal(signal.SIGINT, signal_handler)
-
- subprocess1.join()
- subprocess2.join()
+ subprocess.join()
add_log(20, 'Stopped IOT Stream Server')
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list