Changeset: dde8adbf4d2a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=dde8adbf4d2a
Modified Files:
clients/iotapi/src/Settings/mapiconnection.py
clients/iotapi/src/Streams/streampolling.py
clients/iotapi/src/Utilities/customthreading.py
clients/iotapi/src/main.py
clients/iotclient/src/main.py
Branch: iot
Log Message:
Created subprocess for iot api
diffs (147 lines):
diff --git a/clients/iotapi/src/Settings/mapiconnection.py
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -7,11 +7,9 @@ from Settings.iotlogger import add_log
Connection = None
-def init_monetdb_connection(hostname, port, user_name, database):
+def init_monetdb_connection(hostname, port, user_name, user_password,
database):
global Connection
- user_password = getpass.getpass(prompt='Insert password for user ' +
user_name + ':')
-
try: # the autocommit is set to true so each statement will be independent
Connection = pymonetdb.connect(hostname=hostname, port=port,
username=user_name, password=user_password,
database=database, autocommit=True)
diff --git a/clients/iotapi/src/Streams/streampolling.py
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -39,9 +39,9 @@ def stream_polling():
for entry in SWITCHER: # allocate the proper type wrapper
if elem[3] in entry['types']:
reflection_class = globals()[entry['class']] # import
everything from datatypes!!!
- new_column = reflection_class({'name': elem[2],
'type': elem[3], 'typewidth': elem[4]})
+ new_column = reflection_class(**{'name': elem[2],
'type': elem[3], 'typewidth': elem[4]})
columns[elem[2]] = new_column
- new_streams[key] = IOTStream(key, columns)
+ new_streams[key] = IOTStream(elem[0], elem[1],
**columns)
break
else:
retained_streams.append(key)
diff --git a/clients/iotapi/src/Utilities/customthreading.py
b/clients/iotapi/src/Utilities/customthreading.py
--- a/clients/iotapi/src/Utilities/customthreading.py
+++ b/clients/iotapi/src/Utilities/customthreading.py
@@ -20,13 +20,12 @@ class StoppableThread(Thread):
class PeriodicalThread(StoppableThread):
"""Thread working with a timed interval basis"""
- def __init__(self, interval, worker_func, func_args=None, *args, **kwargs):
+ def __init__(self, interval, worker_func, *args, **kwargs):
super(PeriodicalThread, self).__init__(*args, **kwargs)
self._interval = interval # in seconds
self._worker_func = worker_func # function/method to execute
periodically
- self._worker_func_args = func_args
def run(self):
while not self.stop_event.is_set():
- self._worker_func(self._worker_func_args)
+ self._worker_func()
time.sleep(self._interval)
diff --git a/clients/iotapi/src/main.py b/clients/iotapi/src/main.py
--- a/clients/iotapi/src/main.py
+++ b/clients/iotapi/src/main.py
@@ -1,19 +1,46 @@
import getopt
+import getpass
import signal
import sys
+from multiprocessing import Process
+from threading import Thread
from Settings.filesystem import init_file_system, set_filesystem_location
from Settings.iotlogger import init_logging, add_log, set_logging_location
-from Settings.mapiconnection import init_monetdb_connection
+from Settings.mapiconnection import init_monetdb_connection,
close_monetdb_connection
from Streams.streampolling import init_stream_polling_thread
from WebSockets.websockets import init_websockets, terminate_websockets
+subprocess = None
-def close_sig_handler(signal, frame):
+
+def signal_handler(signal, frame):
+ subprocess.terminate()
+
+
+def start_process(sockets_host, sockets_port, connection_hostname,
connection_port, connection_user,
+ connection_password, connection_database):
+ # WARNING The initiation order must be this!!!
+ init_logging() # init logging context
+ init_file_system() # init filesystem
+ # init mapi connection
+ init_monetdb_connection(connection_hostname, connection_port,
connection_user, connection_password,
+ connection_database)
+ init_stream_polling_thread(60) # start polling
+
+ thread1 = Thread(target=init_websockets, args=(sockets_host, sockets_port))
+ thread1.start()
+ add_log(20, 'Started IOT API Server')
+ thread1.join()
+
terminate_websockets()
+ close_monetdb_connection()
+ add_log(20, 'Stopped IOT API Server')
def main(argv):
+ global subprocess
+
try:
opts, args = getopt.getopt(argv[1:], 'f:l:sh:sp:h:p:d:u',
['filesystem=', 'log=', 'shost=', 'sport=',
'host=', 'port=', 'database=', 'user='])
@@ -48,18 +75,12 @@ def main(argv):
elif opt in ('-d', '--database'):
connection_database = arg
- # WARNING The initiation order must be this!!!
- init_logging() # init logging context
- init_file_system() # init filesystem
- # init mapi connection
- init_monetdb_connection(connection_hostname, connection_port,
connection_user, connection_database)
- init_stream_polling_thread(60) # start polling
-
- add_log(20, 'Started IOT API Server')
- signal.signal(signal.SIGINT, close_sig_handler)
- init_websockets(sockets_host, sockets_port)
- add_log(20, 'Stopped IOT API Server')
-
+ connection_password = getpass.getpass(prompt='Insert password for user ' +
connection_user + ':')
+ subprocess = Process(target=start_process, args=(sockets_host,
sockets_port, connection_hostname, connection_port,
+ connection_user,
connection_password, connection_database))
+ subprocess.start()
+ signal.signal(signal.SIGINT, signal_handler)
+ subprocess.join()
if __name__ == "__main__":
main(sys.argv)
diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py
--- a/clients/iotclient/src/main.py
+++ b/clients/iotclient/src/main.py
@@ -11,7 +11,7 @@ from Flask.app import start_flask_iot_ap
from Flask.restresources import init_rest_resources
from Settings.filesystem import init_file_system, set_filesystem_location
from Settings.iotlogger import init_logging, add_log, set_logging_location
-from Settings.mapiconnection import init_monetdb_connection
+from Settings.mapiconnection import init_monetdb_connection,
close_monetdb_connection
from Streams.streamscontext import init_streams_context
from Streams.streams import init_streams_hosts
@@ -42,6 +42,7 @@ def start_process(admin_host, admin_port
add_log(20, 'Started IOT Stream Server')
thread1.join()
thread2.join()
+ close_monetdb_connection()
add_log(20, 'Stopped IOT Stream Server')
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list