Changeset: dde8adbf4d2a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=dde8adbf4d2a
Modified Files:
        clients/iotapi/src/Settings/mapiconnection.py
        clients/iotapi/src/Streams/streampolling.py
        clients/iotapi/src/Utilities/customthreading.py
        clients/iotapi/src/main.py
        clients/iotclient/src/main.py
Branch: iot
Log Message:

Created subprocess for iot api


diffs (147 lines):

diff --git a/clients/iotapi/src/Settings/mapiconnection.py 
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -7,11 +7,9 @@ from Settings.iotlogger import add_log
 Connection = None
 
 
-def init_monetdb_connection(hostname, port, user_name, database):
+def init_monetdb_connection(hostname, port, user_name, user_password, 
database):
     global Connection
 
-    user_password = getpass.getpass(prompt='Insert password for user ' + 
user_name + ':')
-
     try:  # the autocommit is set to true so each statement will be independent
         Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=user_password,
                                        database=database, autocommit=True)
diff --git a/clients/iotapi/src/Streams/streampolling.py 
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -39,9 +39,9 @@ def stream_polling():
                 for entry in SWITCHER:  # allocate the proper type wrapper
                     if elem[3] in entry['types']:
                         reflection_class = globals()[entry['class']]  # import 
everything from datatypes!!!
-                        new_column = reflection_class({'name': elem[2], 
'type': elem[3], 'typewidth': elem[4]})
+                        new_column = reflection_class(**{'name': elem[2], 
'type': elem[3], 'typewidth': elem[4]})
                         columns[elem[2]] = new_column
-                        new_streams[key] = IOTStream(key, columns)
+                        new_streams[key] = IOTStream(elem[0], elem[1], 
**columns)
                     break
         else:
             retained_streams.append(key)
diff --git a/clients/iotapi/src/Utilities/customthreading.py 
b/clients/iotapi/src/Utilities/customthreading.py
--- a/clients/iotapi/src/Utilities/customthreading.py
+++ b/clients/iotapi/src/Utilities/customthreading.py
@@ -20,13 +20,12 @@ class StoppableThread(Thread):
 class PeriodicalThread(StoppableThread):
     """Thread working with a timed interval basis"""
 
-    def __init__(self, interval, worker_func, func_args=None, *args, **kwargs):
+    def __init__(self, interval, worker_func, *args, **kwargs):
         super(PeriodicalThread, self).__init__(*args, **kwargs)
         self._interval = interval  # in seconds
         self._worker_func = worker_func  # function/method to execute 
periodically
-        self._worker_func_args = func_args
 
     def run(self):
         while not self.stop_event.is_set():
-            self._worker_func(self._worker_func_args)
+            self._worker_func()
             time.sleep(self._interval)
diff --git a/clients/iotapi/src/main.py b/clients/iotapi/src/main.py
--- a/clients/iotapi/src/main.py
+++ b/clients/iotapi/src/main.py
@@ -1,19 +1,46 @@
 import getopt
+import getpass
 import signal
 import sys
 
+from multiprocessing import Process
+from threading import Thread
 from Settings.filesystem import init_file_system, set_filesystem_location
 from Settings.iotlogger import init_logging, add_log, set_logging_location
-from Settings.mapiconnection import init_monetdb_connection
+from Settings.mapiconnection import init_monetdb_connection, 
close_monetdb_connection
 from Streams.streampolling import init_stream_polling_thread
 from WebSockets.websockets import init_websockets, terminate_websockets
 
+subprocess = None
 
-def close_sig_handler(signal, frame):
+
+def signal_handler(signal, frame):
+    subprocess.terminate()
+
+
+def start_process(sockets_host, sockets_port, connection_hostname, 
connection_port, connection_user,
+                  connection_password, connection_database):
+    # WARNING The initiation order must be this!!!
+    init_logging()  # init logging context
+    init_file_system()  # init filesystem
+    # init mapi connection
+    init_monetdb_connection(connection_hostname, connection_port, 
connection_user, connection_password,
+                            connection_database)
+    init_stream_polling_thread(60)  # start polling
+
+    thread1 = Thread(target=init_websockets, args=(sockets_host, sockets_port))
+    thread1.start()
+    add_log(20, 'Started IOT API Server')
+    thread1.join()
+
     terminate_websockets()
+    close_monetdb_connection()
+    add_log(20, 'Stopped IOT API Server')
 
 
 def main(argv):
+    global subprocess
+
     try:
         opts, args = getopt.getopt(argv[1:], 'f:l:sh:sp:h:p:d:u',
                                    ['filesystem=', 'log=', 'shost=', 'sport=', 
'host=', 'port=', 'database=', 'user='])
@@ -48,18 +75,12 @@ def main(argv):
         elif opt in ('-d', '--database'):
             connection_database = arg
 
-    # WARNING The initiation order must be this!!!
-    init_logging()  # init logging context
-    init_file_system()  # init filesystem
-    # init mapi connection
-    init_monetdb_connection(connection_hostname, connection_port, 
connection_user, connection_database)
-    init_stream_polling_thread(60)  # start polling
-
-    add_log(20, 'Started IOT API Server')
-    signal.signal(signal.SIGINT, close_sig_handler)
-    init_websockets(sockets_host, sockets_port)
-    add_log(20, 'Stopped IOT API Server')
-
+    connection_password = getpass.getpass(prompt='Insert password for user ' + 
connection_user + ':')
+    subprocess = Process(target=start_process, args=(sockets_host, 
sockets_port, connection_hostname, connection_port,
+                                                     connection_user, 
connection_password, connection_database))
+    subprocess.start()
+    signal.signal(signal.SIGINT, signal_handler)
+    subprocess.join()
 
 if __name__ == "__main__":
     main(sys.argv)
diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py
--- a/clients/iotclient/src/main.py
+++ b/clients/iotclient/src/main.py
@@ -11,7 +11,7 @@ from Flask.app import start_flask_iot_ap
 from Flask.restresources import init_rest_resources
 from Settings.filesystem import init_file_system, set_filesystem_location
 from Settings.iotlogger import init_logging, add_log, set_logging_location
-from Settings.mapiconnection import init_monetdb_connection
+from Settings.mapiconnection import init_monetdb_connection, 
close_monetdb_connection
 from Streams.streamscontext import init_streams_context
 from Streams.streams import init_streams_hosts
 
@@ -42,6 +42,7 @@ def start_process(admin_host, admin_port
     add_log(20, 'Started IOT Stream Server')
     thread1.join()
     thread2.join()
+    close_monetdb_connection()
     add_log(20, 'Stopped IOT Stream Server')
 
 
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to