Changeset: 0718a6157981 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0718a6157981 Added Files: clients/iotapi/requirements.txt clients/iotapi/src/Settings/__init__.py clients/iotapi/src/Settings/filesystem.py clients/iotapi/src/Settings/iotlogger.py clients/iotapi/src/Settings/mapiconnection.py clients/iotapi/src/Streams/__init__.py clients/iotapi/src/Streams/datatypes.py clients/iotapi/src/Streams/streampolling.py clients/iotapi/src/Streams/streamscontext.py clients/iotapi/src/Utilities/__init__.py clients/iotapi/src/Utilities/customthreading.py clients/iotapi/src/__init__.py clients/iotapi/src/main.py Modified Files: clients/iotclient/src/Settings/filesystem.py clients/iotclient/src/Settings/iotlogger.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/flushing.py clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streamscreator.py clients/iotclient/src/Utilities/customthreading.py Branch: iot Log Message:
Started to work on Web API for iot diffs (truncated from 770 to 300 lines): diff --git a/clients/iotapi/requirements.txt b/clients/iotapi/requirements.txt new file mode 100644 --- /dev/null +++ b/clients/iotapi/requirements.txt @@ -0,0 +1,7 @@ +git+https://github.com/dpallot/simple-websocket-server.git +python-dateutil==2.5.3 +pytz==2016.4 +pymonetdb==0.1.1 +tzlocal==1.2.2 +Sphinx==1.4.1 +sphinx-rtd-theme==0.1.9 diff --git a/clients/iotapi/src/Settings/__init__.py b/clients/iotapi/src/Settings/__init__.py new file mode 100644 diff --git a/clients/iotapi/src/Settings/filesystem.py b/clients/iotapi/src/Settings/filesystem.py new file mode 100644 --- /dev/null +++ b/clients/iotapi/src/Settings/filesystem.py @@ -0,0 +1,28 @@ +import sys + +import os + +from iotlogger import add_log + +BASKETS_BASE_DIRECTORY = "baskets" +CONFIG_FILE_DEFAULT_NAME = "config.json" + +if sys.platform in ("linux", "linux2", "darwin"): + filesystem_location = '/etc/iotcollector' +elif sys.platform == "win32": + filesystem_location = os.path.join(os.path.dirname(__file__), os.pardir) + + +def set_filesystem_location(new_location): + global filesystem_location + filesystem_location = new_location + + +def init_file_system(): + try: + if not os.path.exists(filesystem_location): + os.makedirs(filesystem_location) + except (Exception, OSError) as ex: + print >> sys.stdout, ex + add_log(50, ex) + sys.exit(1) diff --git a/clients/iotapi/src/Settings/iotlogger.py b/clients/iotapi/src/Settings/iotlogger.py new file mode 100644 --- /dev/null +++ b/clients/iotapi/src/Settings/iotlogger.py @@ -0,0 +1,38 @@ +import logging +import sys + +import os + +if sys.platform in ("linux", "linux2", "darwin"): + logging_location = '/var/log/iot/iotapi.log' +elif sys.platform == "win32": + logging_location = os.path.join(os.path.dirname(__file__), os.pardir, 'iotapi.log') + +logger = logging.getLogger("IOTAPILog") + + +def set_logging_location(new_location): + global logging_location + logging_location = new_location + + +def init_logging(): + global logger + try: + logger = logging.getLogger("IOTLog") + logger.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + logging_path = os.path.dirname(logging_location) + if not os.path.exists(logging_path): + os.makedirs(logging_path) + log_handler = logging.FileHandler(logging_location, mode='a+') + log_handler.setFormatter(formatter) + logger.addHandler(log_handler) + except (Exception, OSError) as ex: + print >> sys.stdout, ex + sys.exit(1) + + +def add_log(lvl, message, *args, **kwargs): + logger.log(lvl, message, *args, **kwargs) diff --git a/clients/iotapi/src/Settings/mapiconnection.py b/clients/iotapi/src/Settings/mapiconnection.py new file mode 100644 --- /dev/null +++ b/clients/iotapi/src/Settings/mapiconnection.py @@ -0,0 +1,50 @@ +import getpass +import sys + +import pymonetdb +from Settings.iotlogger import add_log + +Connection = None + + +def init_monetdb_connection(hostname, port, user_name, database): + global Connection + + user_password = getpass.getpass(prompt='Insert password for user ' + user_name + ':') + + if user_password == '': + user_password = 'monetdb' + + try: # the autocommit is set to true so each statement will be independent + Connection = pymonetdb.connect(hostname=hostname, port=port, username=user_name, password=user_password, + database=database, autocommit=True) + Connection.execute("SET SCHEMA iot;") + log_message = 'User %s connected successfully to database %s' % (user_name, database) + print >> sys.stdout, log_message + add_log(20, log_message) + except BaseException as ex: + print >> sys.stdout, ex + add_log(50, ex) + sys.exit(1) + + +def close_monetdb_connection(): + Connection.close() + + +def fetch_streams(): + try: # TODO paginate results? + cursor = Connection.cursor() + sql_string = """ + SELECT storage."schema", storage."table", storage."column", storage."type", storage."location", + storage."typewidth" + FROM (SELECT "schema", "table", "column", "type" FROM sys.storage) AS storage + INNER JOIN (SELECT "name" FROM sys.tables WHERE type=4) AS tables ON (storage."table"=tables."name") + INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON (storage."schema"=schemas."name"); + """.replace('\n', ' ') + cursor.execute(sql_string) + return cursor.fetchall() + except BaseException as ex: + print >> sys.stdout, ex + add_log(50, ex) + sys.exit(1) diff --git a/clients/iotapi/src/Streams/__init__.py b/clients/iotapi/src/Streams/__init__.py new file mode 100644 diff --git a/clients/iotapi/src/Streams/datatypes.py b/clients/iotapi/src/Streams/datatypes.py new file mode 100644 --- /dev/null +++ b/clients/iotapi/src/Streams/datatypes.py @@ -0,0 +1,205 @@ +import itertools +import struct +from abc import ABCMeta, abstractmethod +from datetime import date, time, datetime + +from dateutil.relativedelta import relativedelta + +ALIGNMENT = '<' # for now is little-endian for Intel CPU's + +NIL_STRING = "\200\n" # added newline for performance + +INT8_MIN = 0x80 +INT16_MIN = 0x8000 +INT32_MIN = 0x80000000 +INT64_MIN = 0x8000000000000000 +INT64_MAX = 0xFFFFFFFFFFFFFFFF +INT128_MIN = 0x80000000000000000000000000000000 + +FLOAT_NAN = struct.unpack('f', '\xff\xff\x7f\xff')[0] +DOUBLE_NAN = struct.unpack('d', '\xff\xff\xff\xff\xff\xff\xef\xff')[0] + + +class StreamDataType(object): + """MonetDB's data types for validation base class""" + __metaclass__ = ABCMeta + + def __init__(self, **kwargs): + self._column_name = kwargs['name'] # name of the column + self._data_type = kwargs['type'] # SQL name of the type + self._location = kwargs['location'] + '.tail' # Location of the file + + @abstractmethod + def read_next_batch(self, file_pointer, count): + return [] + + def to_json_representation(self): + return {'name': self._column_name, 'type': self._data_type} + + +class TextType(StreamDataType): + """Covers: TEXT, STRING, CLOB and CHARACTER LARGE OBJECT""" + + def __init__(self, **kwargs): + super(TextType, self).__init__(**kwargs) + self._nullable_constant = NIL_STRING + + def read_next_batch(self, file_pointer, count): + array = list(itertools.islice(file_pointer, count)) + return map(lambda x: None if x == self._nullable_constant else x[:-1], array) + + +class BooleanType(StreamDataType): + """Covers: BOOLEAN""" + + def __init__(self, **kwargs): + super(BooleanType, self).__init__(**kwargs) + self._nullable_constant = INT8_MIN + + def read_next_batch(self, file_pointer, count): + array = struct.unpack(ALIGNMENT + str(count) + 'b', file_pointer.read(count)) + return map(lambda x: None if x == self._nullable_constant else bool(x), array) + + +class SmallIntegerType(StreamDataType): + """Covers: TINYINT, SMALLINT, INT[EGER], BIGINT""" + + def __init__(self, **kwargs): + super(SmallIntegerType, self).__init__(**kwargs) + self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i', 'integer': 'i', 'bigint': 'q'} \ + .get(self._data_type) + self._size = struct.calcsize(self._pack_sym) + self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN, 'int': INT32_MIN, 'integer': INT32_MIN, + 'bigint': INT64_MIN}.get(self._data_type) + + def read_next_batch(self, file_pointer, count): + array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, file_pointer.read(count * self._size)) + return map(lambda x: None if x == self._nullable_constant else int(x), array) + + +class HugeIntegerType(StreamDataType): + """Covers: HUGEINT""" + + def __init__(self, **kwargs): + super(HugeIntegerType, self).__init__(**kwargs) + self._nullable_constant = INT128_MIN + + def read_next_batch(self, file_pointer, count): # [entry & INT64_MAX, (entry >> 64) & INT64_MAX] + array = struct.unpack(ALIGNMENT + str(count << 1) + 'Q', file_pointer.read(count << 3)) + results = [] + iterator = iter(array) # has to iterate two values at once, so use iterator + for value in iterator: + next_huge = value + (next(iterator) << 64) + if next_huge == self._nullable_constant: + results.append(None) + else: + results.append(int(next_huge)) + return results + + +class FloatType(StreamDataType): + """Covers: REAL, FLOAT and DOUBLE""" + + def __init__(self, **kwargs): + super(FloatType, self).__init__(**kwargs) + self._pack_sym = {'real': 'f', 'float': 'd', 'double': 'd'}.get(self._data_type) + self._size = struct.calcsize(self._pack_sym) + self._nullable_constant = {'real': FLOAT_NAN, 'float': DOUBLE_NAN, 'double': DOUBLE_NAN}.get(self._data_type) + + def read_next_batch(self, file_pointer, count): + array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, file_pointer.read(count * self._size)) + return map(lambda x: None if x == self._nullable_constant else float(x), array) + + +class DecimalType(StreamDataType): + """Covers: DECIMAL and NUMERIC""" + + def __init__(self, **kwargs): + super(DecimalType, self).__init__(**kwargs) + + self._pack_sym = {'1': 'b', '2': 'h', '4': 'i', '8': 'q', '16': 'Q'}.get(kwargs['typewidth']) + self._nullable_constant = {'1': INT8_MIN, '2': INT16_MIN, '4': INT32_MIN, '8': INT64_MIN, '16': INT128_MIN} \ + .get(kwargs['typewidth']) + self._size = struct.calcsize(self._pack_sym) + if self._pack_sym == 'Q': + self._size <<= 1 # has to read two values at once + + def read_next_batch(self, file_pointer, count): + array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, file_pointer.read(count * self._size)) + if self._pack_sym != 'Q': + return map(lambda x: None if x == self._nullable_constant else float(x), array) + + results = [] + iterator = iter(array) # has to iterate two values at once, so use iterator + for value in iterator: + next_huge_decimal = value + (next(iterator) << 64) + if next_huge_decimal == self._nullable_constant: + results.append(None) + else: + results.append(next_huge_decimal) + return results + + +class DateType(StreamDataType): # Stored as an uint with the number of days since day 1 of month 1 (Jan) from year 0 + """Covers: DATE""" + + def __init__(self, **kwargs): + super(DateType, self).__init__(**kwargs) + self._nullable_constant = INT32_MIN _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list