Changeset: 9b8b8136fbc1 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9b8b8136fbc1 Modified Files: clients/iotapi/src/Settings/filesystem.py clients/iotapi/src/Settings/mapiconnection.py clients/iotapi/src/Streams/datatypes.py clients/iotapi/src/Streams/streampolling.py clients/iotapi/src/Streams/streams.py clients/iotapi/src/Streams/streamscontext.py clients/iotapi/src/Utilities/customthreading.py clients/iotapi/src/WebSockets/websockets.py clients/iotapi/src/main.py clients/iotclient/src/Settings/filesystem.py clients/iotclient/src/Settings/mapiconnection.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streampolling.py clients/iotclient/src/Streams/streamscreator.py clients/iotclient/tests/datatypesinsertstests.py clients/iotclient/tests/main.py Branch: iot Log Message:
Added interval type support, updated iotapi polling diffs (truncated from 1848 to 300 lines): diff --git a/clients/iotapi/src/Settings/filesystem.py b/clients/iotapi/src/Settings/filesystem.py --- a/clients/iotapi/src/Settings/filesystem.py +++ b/clients/iotapi/src/Settings/filesystem.py @@ -6,7 +6,7 @@ from .iotlogger import add_log Baskets_Location = None if sys.platform in ("linux", "linux2", "darwin"): - DEFAULT_FILESYSTEM = '/etc/iotapi' + DEFAULT_FILESYSTEM = '/var/iotapi' elif sys.platform == "win32": DEFAULT_FILESYSTEM = os.path.join(os.path.dirname(__file__), os.pardir) diff --git a/clients/iotapi/src/Settings/mapiconnection.py b/clients/iotapi/src/Settings/mapiconnection.py --- a/clients/iotapi/src/Settings/mapiconnection.py +++ b/clients/iotapi/src/Settings/mapiconnection.py @@ -1,50 +1,44 @@ -import sys - from monetdb.sql import connect -from ..Settings.iotlogger import add_log - -Connection = None +from .iotlogger import add_log def init_monetdb_connection(hostname, port, user_name, user_password, database): - global Connection + return connect(hostname=hostname, port=port, username=user_name, password=user_password, database=database) - try: # the autocommit is set to true so each statement will be independent - Connection = connect(hostname=hostname, port=port, username=user_name, password=user_password, - database=database, autocommit=True) - log_message = 'User %s connected successfully to database %s' % (user_name, database) - print log_message - add_log(20, log_message) - except BaseException as ex: - print ex - add_log(50, ex) - sys.exit(1) +def close_monetdb_connection(connection): + connection.close() -def close_monetdb_connection(): - Connection.close() - -def check_hugeint_type(): - Connection.execute("START TRANSACTION") - cursor = Connection.cursor() +def check_hugeint_type(connection): + cursor = connection.cursor() cursor.execute("SELECT COUNT(*) FROM sys.types WHERE sqlname='hugeint'") result = cursor.fetchall()[0][0] - Connection.commit() - return result + connection.commit() + return result > 0 -def fetch_streams(): +def mapi_get_database_streams(connection): try: - cursor = Connection.cursor() - sql_string = """SELECT schemas."name" as schema, tables."name" as table, columns."name" as column, - columns."type", columns."type_digits", columns."type_scale", columns."default", columns."null" FROM + cursor = connection.cursor() + sql_string = """SELECT tables."id", schemas."name" AS schema, tables."name" AS table FROM (SELECT "id", "name", "schema_id" FROM sys.tables WHERE type=4) AS tables INNER JOIN (SELECT "id", "name" - FROM sys.schemas) AS schemas ON (tables."schema_id"=schemas."id") INNER JOIN (SELECT "table_id", "name", - "type", "type_digits", "type_scale", "default", "null" FROM sys.columns) AS columns ON - (columns."table_id"=tables."id")""".replace('\n', ' ') + FROM sys.schemas) AS schemas ON (tables."schema_id"=schemas."id")""".replace('\n', ' ') cursor.execute(sql_string) - return cursor.fetchall() + tables = cursor.fetchall() + + cursor = connection.cursor() + sql_string = """SELECT columns."table_id", columns."name" AS column, columns."type", columns."type_digits", + columns."type_scale", columns."default", columns."null" FROM (SELECT "table_id", "name", "type", + "type_digits", "type_scale", "default", "null", "number" FROM sys.columns) AS columns INNER JOIN + (SELECT "id" FROM sys.tables WHERE type=4) AS tables ON (tables."id"=columns."table_id") + ORDER BY columns."table_id", columns."number" """.replace('\n', ' ') + cursor.execute(sql_string) + columns = cursor.fetchall() + + connection.commit() + return tables, columns except BaseException as ex: add_log(50, ex) + connection.rollback() raise diff --git a/clients/iotapi/src/Streams/datatypes.py b/clients/iotapi/src/Streams/datatypes.py --- a/clients/iotapi/src/Streams/datatypes.py +++ b/clients/iotapi/src/Streams/datatypes.py @@ -21,27 +21,25 @@ FLOAT_NAN = struct.unpack('f', '\xff\xff DOUBLE_NAN = struct.unpack('d', '\xff\xff\xff\xff\xff\xff\xef\xff')[0] -# elem[0] is column name, elem[1] is type, elem[2] is type_digits, elem[3] is type_scale elem[4] is default value -# elem[5] is nullable class StreamDataType(object): """MonetDB's data types for reading base class""" __metaclass__ = ABCMeta - def __init__(self, *args): - self._column_name = args[0] # name of the column - self._data_type = args[1] # SQL name of the type - self._default_value = args[4] # default value text - self._is_nullable = args[5] # is nullable + def __init__(self, **kwargs): + self._column_name = kwargs['name'] # name of the column + self._data_type = kwargs['type'] # SQL name of the type + self._default_value = kwargs['default'] # default value text + self._is_nullable = kwargs['nullable'] # is nullable def is_file_mode_binary(self): return True @abstractmethod - def skip_tuples(self, file_pointer, offset): + def skip_tuples(self, fp, offset): pass @abstractmethod - def read_next_batch(self, file_pointer, limit): + def read_next_batch(self, fp, limit): pass def read_next_tuples(self, file_name, offset, read_size): @@ -65,21 +63,21 @@ class StreamDataType(object): class TextType(StreamDataType): """Covers: CLOB and Url""" - def __init__(self, *args): - super(TextType, self).__init__(*args) + def __init__(self, **kwargs): + super(TextType, self).__init__(**kwargs) self._nullable_constant = NIL_STRING def is_file_mode_binary(self): return False - def skip_tuples(self, file_pointer, offset): + def skip_tuples(self, fp, offset): for _ in xrange(offset): - next(file_pointer) + next(fp) - def read_next_batch(self, file_pointer, limit): + def read_next_batch(self, fp, limit): array = [] for _ in xrange(limit): - next_line = next(file_pointer) + next_line = next(fp) if next_line == self._nullable_constant: array.append(None) else: @@ -90,9 +88,9 @@ class TextType(StreamDataType): class LimitedTextType(TextType): """Covers: CHAR and VARCHAR""" - def __init__(self, *args): - super(LimitedTextType, self).__init__(*args) - self._limit = args[2] + def __init__(self, **kwargs): + super(LimitedTextType, self).__init__(**kwargs) + self._limit = kwargs['digits'] def to_json_representation(self): json_value = super(LimitedTextType, self).to_json_representation() @@ -103,16 +101,16 @@ class LimitedTextType(TextType): class INetType(StreamDataType): """Covers: Inet""" - def __init__(self, *args): - super(INetType, self).__init__(*args) + def __init__(self, **kwargs): + super(INetType, self).__init__(**kwargs) - def skip_tuples(self, file_pointer, offset): - file_pointer.seek(offset << 3) + def skip_tuples(self, fp, offset): + fp.seek(offset << 3) - def read_next_batch(self, file_pointer, limit): + def read_next_batch(self, fp, limit): results = [] read_size = limit << 3 - array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', file_pointer.read(read_size)) + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', fp.read(read_size)) iterator = iter(array) for _ in xrange(limit): @@ -128,16 +126,16 @@ class INetType(StreamDataType): class UUIDType(StreamDataType): """Covers: UUID""" - def __init__(self, *args): - super(UUIDType, self).__init__(*args) + def __init__(self, **kwargs): + super(UUIDType, self).__init__(**kwargs) - def skip_tuples(self, file_pointer, offset): - file_pointer.seek(offset << 4) + def skip_tuples(self, fp, offset): + fp.seek(offset << 4) - def read_next_batch(self, file_pointer, limit): + def read_next_batch(self, fp, limit): results = [] read_size = limit << 4 - array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', file_pointer.read(read_size)) + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', fp.read(read_size)) iterator = iter(array) for _ in xrange(limit): @@ -158,50 +156,49 @@ class UUIDType(StreamDataType): class BooleanType(StreamDataType): """Covers: BOOLEAN""" - def __init__(self, *args): - super(BooleanType, self).__init__(*args) + def __init__(self, **kwargs): + super(BooleanType, self).__init__(**kwargs) self._nullable_constant = INT8_MIN - def skip_tuples(self, file_pointer, offset): - file_pointer.seek(offset) + def skip_tuples(self, fp, offset): + fp.seek(offset) - def read_next_batch(self, file_pointer, limit): - array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'b', file_pointer.read(limit)) + def read_next_batch(self, fp, limit): + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'b', fp.read(limit)) return map(lambda x: None if x == self._nullable_constant else bool(x), array) class SmallIntegerType(StreamDataType): """Covers: TINYINT, SMALLINT, INTEGER, BIGINT""" - def __init__(self, *args): - super(SmallIntegerType, self).__init__(*args) + def __init__(self, **kwargs): + super(SmallIntegerType, self).__init__(**kwargs) self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i', 'integer': 'i', 'bigint': 'q'} \ .get(self._data_type) self._size = struct.calcsize(self._pack_sym) self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN, 'int': INT32_MIN, 'integer': INT32_MIN, 'bigint': INT64_MIN}.get(self._data_type) - def skip_tuples(self, file_pointer, offset): - file_pointer.seek(offset * self._size) + def skip_tuples(self, fp, offset): + fp.seek(offset * self._size) - def read_next_batch(self, file_pointer, limit): - array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + self._pack_sym, - file_pointer.read(limit * self._size)) + def read_next_batch(self, fp, limit): + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + self._pack_sym, fp.read(limit * self._size)) return map(lambda x: None if x == self._nullable_constant else int(x), array) class HugeIntegerType(StreamDataType): """Covers: HUGEINT""" - def __init__(self, *args): - super(HugeIntegerType, self).__init__(*args) + def __init__(self, **kwargs): + super(HugeIntegerType, self).__init__(**kwargs) self._nullable_constant = INT128_MIN - def skip_tuples(self, file_pointer, offset): - file_pointer.seek(offset << 4) + def skip_tuples(self, fp, offset): + fp.seek(offset << 4) - def read_next_batch(self, file_pointer, limit): # [entry & INT64_MAX, (entry >> 64) & INT64_MAX] - array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'Q', file_pointer.read(limit << 4)) + def read_next_batch(self, fp, limit): # [entry & INT64_MAX, (entry >> 64) & INT64_MAX] + array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'Q', fp.read(limit << 4)) results = [] iterator = iter(array) # has to iterate two values at once, so use iterator for value in iterator: @@ -216,28 +213,27 @@ class HugeIntegerType(StreamDataType): class FloatType(StreamDataType): """Covers: REAL, DOUBLE""" - def __init__(self, *args): - super(FloatType, self).__init__(*args) + def __init__(self, **kwargs): + super(FloatType, self).__init__(**kwargs) self._pack_sym = {'real': 'f', 'float': 'd', 'double': 'd'}.get(self._data_type) self._size = struct.calcsize(self._pack_sym) self._nullable_constant = {'real': FLOAT_NAN, 'float': DOUBLE_NAN, 'double': DOUBLE_NAN}.get(self._data_type) - def skip_tuples(self, file_pointer, offset): - file_pointer.seek(offset * self._size) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list