Changeset: a4516136d717 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a4516136d717
Added Files:
clients/iotclient/src/Streams/streampolling.py
Modified Files:
clients/iotapi/documentation/api_server_arguments.rst
clients/iotapi/src/main.py
clients/iotclient/documentation/iot_server_arguments.rst
clients/iotclient/src/Settings/filesystem.py
clients/iotclient/src/Settings/mapiconnection.py
clients/iotclient/src/Streams/streamscontext.py
clients/iotclient/src/main.py
clients/iotclient/tests/datatypesinsertstests.py
clients/iotclient/tests/main.py
sql/backends/monet5/iot/50_iot.sql
sql/backends/monet5/iot/basket.c
sql/backends/monet5/sql.c
Branch: iot
Log Message:
Starting MonetDB integration
diffs (truncated from 523 to 300 lines):
diff --git a/clients/iotapi/documentation/api_server_arguments.rst
b/clients/iotapi/documentation/api_server_arguments.rst
--- a/clients/iotapi/documentation/api_server_arguments.rst
+++ b/clients/iotapi/documentation/api_server_arguments.rst
@@ -75,6 +75,6 @@ Name of database to use. By default is :
Help
----
-**-he - -help**
+**-? - -help**
-Display arguments help.
+Display help.
diff --git a/clients/iotapi/src/main.py b/clients/iotapi/src/main.py
--- a/clients/iotapi/src/main.py
+++ b/clients/iotapi/src/main.py
@@ -77,7 +77,7 @@ def main():
help='Database listening port (default: 50000)',
metavar='PORT')
parser.add_argument('-d', '--database', nargs='?', default='iotdb',
help='Database name (default: iotdb)')
parser.add_argument('-u', '--user', nargs='?', default='monetdb',
help='Database user (default: monetdb)')
- parser.add_argument('-he', '--help', action='store_true', help='Display
this help')
+ parser.add_argument('-?', '--help', action='store_true', help='Display
this help')
try:
args = vars(parser.parse_args())
diff --git a/clients/iotclient/documentation/iot_server_arguments.rst
b/clients/iotclient/documentation/iot_server_arguments.rst
--- a/clients/iotclient/documentation/iot_server_arguments.rst
+++ b/clients/iotclient/documentation/iot_server_arguments.rst
@@ -92,6 +92,6 @@ Name of database to use. By default is :
Help
----
-**-he - -help**
+**-? - -help**
-Display arguments help.
+Display help.
diff --git a/clients/iotclient/src/Settings/filesystem.py
b/clients/iotclient/src/Settings/filesystem.py
--- a/clients/iotclient/src/Settings/filesystem.py
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -5,7 +5,6 @@ from iotlogger import add_log
from Utilities.filecreator import create_file_if_not_exists
Baskets_Location = None
-Configfile_Location = None
if sys.platform in ("linux", "linux2", "darwin"):
DEFAULT_FILESYSTEM = '/etc/iotserver'
@@ -14,14 +13,12 @@ elif sys.platform == "win32":
def init_file_system(filesystem_location):
- global Baskets_Location, Configfile_Location
+ global Baskets_Location
try:
Baskets_Location = os.path.join(filesystem_location, "baskets")
if not os.path.exists(Baskets_Location):
os.makedirs(Baskets_Location)
- Configfile_Location =
create_file_if_not_exists(os.path.join(filesystem_location, "config.json"),
- init_text='[]')
except (Exception, OSError) as ex:
print ex
add_log(50, ex)
@@ -30,7 +27,3 @@ def init_file_system(filesystem_location
def get_baskets_location():
return Baskets_Location
-
-
-def get_configfile_location():
- return Configfile_Location
diff --git a/clients/iotclient/src/Settings/mapiconnection.py
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -11,9 +11,9 @@ def init_monetdb_connection(hostname, po
try: # the autocommit is set to true so each statement will be independent
Connection = pymonetdb.connect(hostname=hostname, port=port,
username=user_name, password=user_password,
- database=database, autocommit=True)
+ database=database, autocommit=False)
log_message = 'User %s connected successfully to database %s' %
(user_name, database)
- print >> sys.stdout, log_message
+ print log_message
add_log(20, log_message)
except BaseException as ex:
print ex
@@ -24,52 +24,67 @@ def init_monetdb_connection(hostname, po
def close_monetdb_connection():
Connection.close()
-""""
-def mapi_fetch_all_streams():
+
+def mapi_get_webserver_streams():
try:
+ Connection.execute("BEGIN TRANSACTION")
cursor = Connection.cursor()
- sql = ''.join(['SELECT storage."schema", storage."table",
storage."column", storage."type", storage',
- '."typewidth" FROM (SELECT "schema", "table", "column",
"type", "typewidth" FROM sys.storage)',
- ' AS storage INNER JOIN (SELECT "name" FROM sys.tables
WHERE type=4) AS tables ON'
- ' (storage."table"=tables."name") INNER JOIN (SELECT
"name" FROM sys.schemas)',
- ' AS schemas ON (storage."schema"=schemas."name");'])
- cursor.execute(sql)
- return cursor.fetchall()
+ sql_string = """SELECT tables."id", tables."name", schemas."name" as
schema, tables."name" as table,
+ flushing."flushing", flushing."unit", flushing."interval" FROM
(SELECT "id", "name", "schema_id"
+ FROM sys.tables) AS tables INNER JOIN (SELECT "id", "name" FROM
sys.schemas) AS schemas
+ ON (tables."schema_id"=schemas."id") INNER JOIN (SELECT
"table_id", "flushing", "unit", "interval"
+ FROM iot.webserverflushing) AS flushing ON
(tables."id"=flushing."table_id")""".replace('\n', ' ')
+ cursor.execute(sql_string)
+ tables = cursor.fetchall()
+
+ cursor = Connection.cursor()
+ sql_string = """SELECT columns."table_id", columns."name" as column,
columns."type", columns."type_digits",
+ columns."type_scale", columns."default", columns."null",
extras."special", extras."validation1",
+ extras."validation2" FROM (SELECT "id", "table_id", "name",
"type", "type_digits", "type_scale", "default",
+ "null" FROM sys.columns) AS columns INNER JOIN (SELECT
"column_id", "special", "validation1", "validation2"
+ FROM iot.webservervalidation) AS extras ON
(columns."id"=extras."column_id")""".replace('\n', ' ')
+ cursor.execute(sql_string)
+ columns = cursor.fetchall()
+
+ Connection.commit()
+ return tables, columns
except BaseException as ex:
add_log(50, ex)
- raise
+ return [], []
-def mapi_check_stream_data(schema, stream):
+def mapi_create_stream(schema, stream, columns):
try:
- cursor = Connection.cursor()
- sql = ''.join(['SELECT storage."schema", storage."table",
storage."column", storage."type", storage.',
- '"typewidth" FROM (SELECT "schema", "table", "column",
"type", "typewidth" FROM sys.storage)',
- ' AS storage INNER JOIN (SELECT "name" FROM sys.tables
WHERE type=4 AND "name"="', stream,
- '") AS tables ON (storage."table"=tables."name") INNER
JOIN (SELECT "name" FROM sys.schemas',
- ' WHERE "name"="',schema, '") AS schemas ON
(storage."schema"=schemas."name");'])
- cursor.execute(sql)
- return cursor.fetchall()
+ Connection.execute("BEGIN TRANSACTION")
+ try: # create schema if not exists, ignore the error if already exists
+ Connection.execute("CREATE SCHEMA " + schema + ";")
+ except:
+ pass
+ Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (",
columns, ")"])) # TODO concat!!
+ # TODO insert on the tables
+ return Connection.commit()
except BaseException as ex:
add_log(50, ex)
-"""""
+ return 0
-def mapi_create_stream(schema, stream, columns):
- try: # create schema if not exists, ignore the error if already exists
- Connection.execute("CREATE SCHEMA " + schema + ";")
- except:
- pass
- Connection.execute("SET SCHEMA " + schema + ";")
- Connection.execute(''.join(["CREATE STREAM TABLE ", stream, " (", columns,
");"]))
+def mapi_delete_stream(schema, stream, stream_id, columns_ids):
+ try:
+ Connection.execute("BEGIN TRANSACTION")
+ Connection.execute("DROP TABLE " + stream) # TODO concat!!
+ Connection.execute("DELETE FROM iot.webserverflushing WHERE table_id="
+ stream_id)
+ Connection.execute("DELETE FROM iot.webservervalidation WHERE
column_id IN (" + ','.join(columns_ids) + ")")
+ return Connection.commit()
+ except BaseException as ex:
+ add_log(50, ex)
+ return 0
def mapi_flush_baskets(schema, stream, baskets):
try:
- Connection.execute("SET SCHEMA iot;")
- except:
- pass
- try:
+ Connection.execute("BEGIN TRANSACTION")
Connection.execute(''.join(["CALL iot.basket('", schema, "','",
stream, "','", baskets, "');"]))
+ return Connection.commit()
except BaseException as ex:
add_log(40, ex)
+ return 0
diff --git a/clients/iotapi/src/Streams/streampolling.py
b/clients/iotclient/src/Streams/streampolling.py
copy from clients/iotapi/src/Streams/streampolling.py
copy to clients/iotclient/src/Streams/streampolling.py
diff --git a/clients/iotclient/src/Streams/streamscontext.py
b/clients/iotclient/src/Streams/streamscontext.py
--- a/clients/iotclient/src/Streams/streamscontext.py
+++ b/clients/iotclient/src/Streams/streamscontext.py
@@ -1,25 +1,7 @@
-import json
import collections
-from Settings.filesystem import get_configfile_location
from Utilities.readwritelock import RWLock
from streamscreator import validate_schema_and_create_stream
-from jsonschema import Draft4Validator, FormatChecker
-from jsonschemas import CONFIG_FILE_SCHEMA
-
-
-Config_File_Location = None
-Config_File_Validator = None
-
-
-def init_streams_context():
- global Config_File_Location, Config_File_Validator
- Config_File_Location = get_configfile_location()
- Config_File_Validator = Draft4Validator(CONFIG_FILE_SCHEMA,
format_checker=FormatChecker())
-
-
-class IOTStreamsException(Exception):
- pass
class IOTStreamsContext(object):
@@ -31,42 +13,16 @@ class IOTStreamsContext(object):
self._locker = RWLock()
self._context = collections.OrderedDict() # dictionary of schema_name
+ '.' + stream_name -> DataCellStream
- def reload_config_file(self): # the write lock must be set before running
this method!!! (except on the beginning)
- with open(get_configfile_location(), 'r') as infile: # read the
config searching for existing streams
- data = json.load(infile) # if the configuration file is invalid,
then the context is left untouched
- Config_File_Validator.validate(data)
-
- for value in self._context.values(): # stop the current streams
- value.stop_stream()
- self._context = collections.OrderedDict()
-
- stream_dic = collections.OrderedDict()
- for entry in data:
- next_stream = validate_schema_and_create_stream(entry,
created=False)
- next_name =
IOTStreamsContext.get_context_entry_name(next_stream.get_schema_name(),
-
next_stream.get_stream_name())
- stream_dic[next_name] = next_stream
- self._context = stream_dic # dictionary of schema_name + '.' +
stream_name -> DataCellStream
-
- for value in self._context.values(): # start the new streams
- value.start_stream()
-
- def update_config_file(self): # the write lock must be set before running
this method!!!
- data = [value.get_data_dictionary() for value in
self._context.values()]
- with open(get_configfile_location(), 'w') as outfile: # re-write the
whole config file
- json.dump(data, outfile)
-
def add_new_stream(self, validating_schema):
concat_name =
IOTStreamsContext.get_context_entry_name(validating_schema['schema'],
validating_schema['stream'])
self._locker.acquire_write()
if concat_name in self._context:
self._locker.release()
- raise IOTStreamsException('The stream ' +
validating_schema['stream'] + ' in schema ' +
- validating_schema['schema'] + ' already
exists!')
+ raise Exception('The stream ' + validating_schema['stream'] + ' in
schema ' + validating_schema['schema']
+ + ' already exists!')
try:
new_stream = validate_schema_and_create_stream(validating_schema,
created=True)
self._context[concat_name] = new_stream
- self.update_config_file() # update config file after adding a new
stream
new_stream.start_stream()
except:
self._locker.release()
@@ -78,12 +34,11 @@ class IOTStreamsContext(object):
self._locker.acquire_write()
if concat_name not in self._context:
self._locker.release()
- raise IOTStreamsException('The stream ' +
validating_schema['stream'] + ' in schema ' +
- validating_schema['schema'] + ' does not
exist!')
+ raise Exception('The stream ' + validating_schema['stream'] + ' in
schema ' + validating_schema['schema'] +
+ ' does not exist!')
try:
old_stream = self._context[concat_name]
del self._context[concat_name]
- self.update_config_file()
old_stream.stop_stream()
except:
self._locker.release()
@@ -95,7 +50,7 @@ class IOTStreamsContext(object):
self._locker.acquire_read()
if concat_name not in self._context:
self._locker.release()
- raise IOTStreamsException('The stream ' + stream_name + ' in
schema ' + schema_name + ' does not exist!')
+ raise Exception('The stream ' + stream_name + ' in schema ' +
schema_name + ' does not exist!')
res = self._context[concat_name]
self._locker.release()
return res
diff --git a/clients/iotclient/src/main.py b/clients/iotclient/src/main.py
--- a/clients/iotclient/src/main.py
+++ b/clients/iotclient/src/main.py
@@ -75,6 +75,8 @@ def main():
help='Baskets location directory (default: %s)' %
DEFAULT_FILESYSTEM, metavar='DIRECTORY')
parser.add_argument('-l', '--log', type=check_path, nargs='?',
default=DEFAULT_LOGGING,
help='Logging file location (default: %s)' %
DEFAULT_LOGGING, metavar='FILE_PATH')
+ parser.add_argument('-po', '--polling', type=check_positive_int,
nargs='?', default=60,
+ help='Polling interval in seconds to the database for
streams updates (default: 60)')
parser.add_argument('-i', '--identifier', action='store_true',
help='Add a host identifier to the created streams. By
default will not be added')
parser.add_argument('-n', '--name', nargs='?',
@@ -94,7 +96,7 @@ def main():
help='Database listening port (default: 50000)',
metavar='PORT')
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list