Changeset: a4516136d717 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a4516136d717
Added Files:
        clients/iotclient/src/Streams/streampolling.py
Modified Files:
        clients/iotapi/documentation/api_server_arguments.rst
        clients/iotapi/src/main.py
        clients/iotclient/documentation/iot_server_arguments.rst
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/main.py
        clients/iotclient/tests/datatypesinsertstests.py
        clients/iotclient/tests/main.py
        sql/backends/monet5/iot/50_iot.sql
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/sql.c
Branch: iot
Log Message:

Starting MonetDB integration


diffs (truncated from 523 to 300 lines):

diff --git a/clients/iotapi/documentation/api_server_arguments.rst 
b/clients/iotapi/documentation/api_server_arguments.rst
--- a/clients/iotapi/documentation/api_server_arguments.rst
+++ b/clients/iotapi/documentation/api_server_arguments.rst
@@ -75,6 +75,6 @@ Name of database to use. By default is :
 Help
 ----
 
-**-he  - -help**
+**-?  - -help**
 
-Display arguments help.
+Display help.
diff --git a/clients/iotapi/src/main.py b/clients/iotapi/src/main.py
--- a/clients/iotapi/src/main.py
+++ b/clients/iotapi/src/main.py
@@ -77,7 +77,7 @@ def main():
                         help='Database listening port (default: 50000)', 
metavar='PORT')
     parser.add_argument('-d', '--database', nargs='?', default='iotdb', 
help='Database name (default: iotdb)')
     parser.add_argument('-u', '--user', nargs='?', default='monetdb', 
help='Database user (default: monetdb)')
-    parser.add_argument('-he', '--help', action='store_true', help='Display 
this help')
+    parser.add_argument('-?', '--help', action='store_true', help='Display 
this help')
 
     try:
         args = vars(parser.parse_args())
diff --git a/clients/iotclient/documentation/iot_server_arguments.rst 
b/clients/iotclient/documentation/iot_server_arguments.rst
--- a/clients/iotclient/documentation/iot_server_arguments.rst
+++ b/clients/iotclient/documentation/iot_server_arguments.rst
@@ -92,6 +92,6 @@ Name of database to use. By default is :
 Help
 ----
 
-**-he  - -help**
+**-?  - -help**
 
-Display arguments help.
+Display help.
diff --git a/clients/iotclient/src/Settings/filesystem.py 
b/clients/iotclient/src/Settings/filesystem.py
--- a/clients/iotclient/src/Settings/filesystem.py
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -5,7 +5,6 @@ from iotlogger import add_log
 from Utilities.filecreator import create_file_if_not_exists
 
 Baskets_Location = None
-Configfile_Location = None
 
 if sys.platform in ("linux", "linux2", "darwin"):
     DEFAULT_FILESYSTEM = '/etc/iotserver'
@@ -14,14 +13,12 @@ elif sys.platform == "win32":
 
 
 def init_file_system(filesystem_location):
-    global Baskets_Location, Configfile_Location
+    global Baskets_Location
 
     try:
         Baskets_Location = os.path.join(filesystem_location, "baskets")
         if not os.path.exists(Baskets_Location):
             os.makedirs(Baskets_Location)
-        Configfile_Location = 
create_file_if_not_exists(os.path.join(filesystem_location, "config.json"),
-                                                        init_text='[]')
     except (Exception, OSError) as ex:
         print ex
         add_log(50, ex)
@@ -30,7 +27,3 @@ def init_file_system(filesystem_location
 
 def get_baskets_location():
     return Baskets_Location
-
-
-def get_configfile_location():
-    return Configfile_Location
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -11,9 +11,9 @@ def init_monetdb_connection(hostname, po
 
     try:  # the autocommit is set to true so each statement will be independent
         Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=user_password,
-                                       database=database, autocommit=True)
+                                       database=database, autocommit=False)
         log_message = 'User %s connected successfully to database %s' % 
(user_name, database)
-        print >> sys.stdout, log_message
+        print log_message
         add_log(20, log_message)
     except BaseException as ex:
         print ex
@@ -24,52 +24,67 @@ def init_monetdb_connection(hostname, po
 def close_monetdb_connection():
     Connection.close()
 
-""""
-def mapi_fetch_all_streams():
+
+def mapi_get_webserver_streams():
     try:
+        Connection.execute("BEGIN TRANSACTION")
         cursor = Connection.cursor()
-        sql = ''.join(['SELECT storage."schema", storage."table", 
storage."column", storage."type", storage',
-                       '."typewidth" FROM (SELECT "schema", "table", "column", 
"type", "typewidth" FROM sys.storage)',
-                       ' AS storage INNER JOIN (SELECT "name" FROM sys.tables 
WHERE type=4) AS tables ON'
-                       ' (storage."table"=tables."name") INNER JOIN (SELECT 
"name" FROM sys.schemas)',
-                       ' AS schemas ON (storage."schema"=schemas."name");'])
-        cursor.execute(sql)
-        return cursor.fetchall()
+        sql_string = """SELECT tables."id", tables."name", schemas."name" as 
schema, tables."name" as table,
+            flushing."flushing", flushing."unit", flushing."interval" FROM 
(SELECT "id", "name", "schema_id"
+            FROM sys.tables) AS tables INNER JOIN (SELECT "id", "name" FROM 
sys.schemas) AS schemas
+            ON (tables."schema_id"=schemas."id") INNER JOIN (SELECT 
"table_id", "flushing", "unit", "interval"
+            FROM iot.webserverflushing) AS flushing ON 
(tables."id"=flushing."table_id")""".replace('\n', ' ')
+        cursor.execute(sql_string)
+        tables = cursor.fetchall()
+
+        cursor = Connection.cursor()
+        sql_string = """SELECT columns."table_id", columns."name" as column, 
columns."type", columns."type_digits",
+            columns."type_scale", columns."default", columns."null", 
extras."special", extras."validation1",
+            extras."validation2" FROM (SELECT "id", "table_id", "name", 
"type", "type_digits", "type_scale", "default",
+            "null" FROM sys.columns) AS columns INNER JOIN (SELECT 
"column_id", "special", "validation1", "validation2"
+            FROM iot.webservervalidation) AS extras ON 
(columns."id"=extras."column_id")""".replace('\n', ' ')
+        cursor.execute(sql_string)
+        columns = cursor.fetchall()
+
+        Connection.commit()
+        return tables, columns
     except BaseException as ex:
         add_log(50, ex)
-        raise
+        return [], []
 
 
-def mapi_check_stream_data(schema, stream):
+def mapi_create_stream(schema, stream, columns):
     try:
-        cursor = Connection.cursor()
-        sql = ''.join(['SELECT storage."schema", storage."table", 
storage."column", storage."type", storage.',
-                       '"typewidth" FROM (SELECT "schema", "table", "column", 
"type", "typewidth" FROM sys.storage)',
-                       ' AS storage INNER JOIN (SELECT "name" FROM sys.tables 
WHERE type=4 AND "name"="', stream,
-                       '") AS tables ON (storage."table"=tables."name") INNER 
JOIN (SELECT "name" FROM sys.schemas',
-                       ' WHERE "name"="',schema, '") AS schemas ON 
(storage."schema"=schemas."name");'])
-        cursor.execute(sql)
-        return cursor.fetchall()
+        Connection.execute("BEGIN TRANSACTION")
+        try:  # create schema if not exists, ignore the error if already exists
+            Connection.execute("CREATE SCHEMA " + schema + ";")
+        except:
+            pass
+        Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (", 
columns, ")"]))  # TODO concat!!
+        # TODO insert on the tables
+        return Connection.commit()
     except BaseException as ex:
         add_log(50, ex)
-"""""
+        return 0
 
 
-def mapi_create_stream(schema, stream, columns):
-    try:  # create schema if not exists, ignore the error if already exists
-        Connection.execute("CREATE SCHEMA " + schema + ";")
-    except:
-        pass
-    Connection.execute("SET SCHEMA " + schema + ";")
-    Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (", columns, 
");"]))
+def mapi_delete_stream(schema, stream, stream_id, columns_ids):
+    try:
+        Connection.execute("BEGIN TRANSACTION")
+        Connection.execute("DROP TABLE " + stream)  # TODO concat!!
+        Connection.execute("DELETE FROM iot.webserverflushing WHERE table_id=" 
+ stream_id)
+        Connection.execute("DELETE FROM iot.webservervalidation WHERE 
column_id IN (" + ','.join(columns_ids) + ")")
+        return Connection.commit()
+    except BaseException as ex:
+        add_log(50, ex)
+        return 0
 
 
 def mapi_flush_baskets(schema, stream, baskets):
     try:
-        Connection.execute("SET SCHEMA iot;")
-    except:
-        pass
-    try:
+        Connection.execute("BEGIN TRANSACTION")
         Connection.execute(''.join(["CALL iot.basket('", schema, "','", 
stream, "','", baskets, "');"]))
+        return Connection.commit()
     except BaseException as ex:
         add_log(40, ex)
+        return 0
diff --git a/clients/iotapi/src/Streams/streampolling.py 
b/clients/iotclient/src/Streams/streampolling.py
copy from clients/iotapi/src/Streams/streampolling.py
copy to clients/iotclient/src/Streams/streampolling.py
diff --git a/clients/iotclient/src/Streams/streamscontext.py 
b/clients/iotclient/src/Streams/streamscontext.py
--- a/clients/iotclient/src/Streams/streamscontext.py
+++ b/clients/iotclient/src/Streams/streamscontext.py
@@ -1,25 +1,7 @@
-import json
 import collections
 
-from Settings.filesystem import get_configfile_location
 from Utilities.readwritelock import RWLock
 from streamscreator import validate_schema_and_create_stream
-from jsonschema import Draft4Validator, FormatChecker
-from jsonschemas import CONFIG_FILE_SCHEMA
-
-
-Config_File_Location = None
-Config_File_Validator = None
-
-
-def init_streams_context():
-    global Config_File_Location, Config_File_Validator
-    Config_File_Location = get_configfile_location()
-    Config_File_Validator = Draft4Validator(CONFIG_FILE_SCHEMA, 
format_checker=FormatChecker())
-
-
-class IOTStreamsException(Exception):
-    pass
 
 
 class IOTStreamsContext(object):
@@ -31,42 +13,16 @@ class IOTStreamsContext(object):
         self._locker = RWLock()
         self._context = collections.OrderedDict()  # dictionary of schema_name 
+ '.' + stream_name -> DataCellStream
 
-    def reload_config_file(self):  # the write lock must be set before running 
this method!!! (except on the beginning)
-        with open(get_configfile_location(), 'r') as infile:  # read the 
config searching for existing streams
-            data = json.load(infile)  # if the configuration file is invalid, 
then the context is left untouched
-            Config_File_Validator.validate(data)
-
-        for value in self._context.values():  # stop the current streams
-            value.stop_stream()
-        self._context = collections.OrderedDict()
-
-        stream_dic = collections.OrderedDict()
-        for entry in data:
-            next_stream = validate_schema_and_create_stream(entry, 
created=False)
-            next_name = 
IOTStreamsContext.get_context_entry_name(next_stream.get_schema_name(),
-                                                                 
next_stream.get_stream_name())
-            stream_dic[next_name] = next_stream
-        self._context = stream_dic  # dictionary of schema_name + '.' + 
stream_name -> DataCellStream
-
-        for value in self._context.values():  # start the new streams
-            value.start_stream()
-
-    def update_config_file(self):  # the write lock must be set before running 
this method!!!
-        data = [value.get_data_dictionary() for value in 
self._context.values()]
-        with open(get_configfile_location(), 'w') as outfile:  # re-write the 
whole config file
-            json.dump(data, outfile)
-
     def add_new_stream(self, validating_schema):
         concat_name = 
IOTStreamsContext.get_context_entry_name(validating_schema['schema'], 
validating_schema['stream'])
         self._locker.acquire_write()
         if concat_name in self._context:
             self._locker.release()
-            raise IOTStreamsException('The stream ' + 
validating_schema['stream'] + ' in schema ' +
-                                      validating_schema['schema'] + ' already 
exists!')
+            raise Exception('The stream ' + validating_schema['stream'] + ' in 
schema ' + validating_schema['schema']
+                            + ' already exists!')
         try:
             new_stream = validate_schema_and_create_stream(validating_schema, 
created=True)
             self._context[concat_name] = new_stream
-            self.update_config_file()  # update config file after adding a new 
stream
             new_stream.start_stream()
         except:
             self._locker.release()
@@ -78,12 +34,11 @@ class IOTStreamsContext(object):
         self._locker.acquire_write()
         if concat_name not in self._context:
             self._locker.release()
-            raise IOTStreamsException('The stream ' + 
validating_schema['stream'] + ' in schema ' +
-                                      validating_schema['schema'] + ' does not 
exist!')
+            raise Exception('The stream ' + validating_schema['stream'] + ' in 
schema ' + validating_schema['schema'] +
+                            ' does not exist!')
         try:
             old_stream = self._context[concat_name]
             del self._context[concat_name]
-            self.update_config_file()
             old_stream.stop_stream()
         except:
             self._locker.release()
@@ -95,7 +50,7 @@ class IOTStreamsContext(object):
         self._locker.acquire_read()
         if concat_name not in self._context:
             self._locker.release()
-            raise IOTStreamsException('The stream ' + stream_name + ' in 
schema ' + schema_name + ' does not exist!')
+            raise Exception('The stream ' + stream_name + ' in schema ' + 
schema_name + ' does not exist!')
         res = self._context[concat_name]
         self._locker.release()
         return res
diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py
--- a/clients/iotclient/src/main.py
+++ b/clients/iotclient/src/main.py
@@ -75,6 +75,8 @@ def main():
                         help='Baskets location directory (default: %s)' % 
DEFAULT_FILESYSTEM, metavar='DIRECTORY')
     parser.add_argument('-l', '--log', type=check_path, nargs='?', 
default=DEFAULT_LOGGING,
                         help='Logging file location (default: %s)' % 
DEFAULT_LOGGING, metavar='FILE_PATH')
+    parser.add_argument('-po', '--polling', type=check_positive_int, 
nargs='?', default=60,
+                        help='Polling interval in seconds to the database for 
streams updates (default: 60)')
     parser.add_argument('-i', '--identifier', action='store_true',
                         help='Add a host identifier to the created streams. By 
default will not be added')
     parser.add_argument('-n', '--name', nargs='?',
@@ -94,7 +96,7 @@ def main():
                         help='Database listening port (default: 50000)', 
metavar='PORT')
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to