Changeset: 0718a6157981 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0718a6157981
Added Files:
        clients/iotapi/requirements.txt
        clients/iotapi/src/Settings/__init__.py
        clients/iotapi/src/Settings/filesystem.py
        clients/iotapi/src/Settings/iotlogger.py
        clients/iotapi/src/Settings/mapiconnection.py
        clients/iotapi/src/Streams/__init__.py
        clients/iotapi/src/Streams/datatypes.py
        clients/iotapi/src/Streams/streampolling.py
        clients/iotapi/src/Streams/streamscontext.py
        clients/iotapi/src/Utilities/__init__.py
        clients/iotapi/src/Utilities/customthreading.py
        clients/iotapi/src/__init__.py
        clients/iotapi/src/main.py
Modified Files:
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Settings/iotlogger.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/flushing.py
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/streamscreator.py
        clients/iotclient/src/Utilities/customthreading.py
Branch: iot
Log Message:

Started to work on Web API for iot


diffs (truncated from 770 to 300 lines):

diff --git a/clients/iotapi/requirements.txt b/clients/iotapi/requirements.txt
new file mode 100644
--- /dev/null
+++ b/clients/iotapi/requirements.txt
@@ -0,0 +1,7 @@
+git+https://github.com/dpallot/simple-websocket-server.git
+python-dateutil==2.5.3
+pytz==2016.4
+pymonetdb==0.1.1
+tzlocal==1.2.2
+Sphinx==1.4.1
+sphinx-rtd-theme==0.1.9
diff --git a/clients/iotapi/src/Settings/__init__.py 
b/clients/iotapi/src/Settings/__init__.py
new file mode 100644
diff --git a/clients/iotapi/src/Settings/filesystem.py 
b/clients/iotapi/src/Settings/filesystem.py
new file mode 100644
--- /dev/null
+++ b/clients/iotapi/src/Settings/filesystem.py
@@ -0,0 +1,28 @@
+import sys
+
+import os
+
+from iotlogger import add_log
+
+BASKETS_BASE_DIRECTORY = "baskets"
+CONFIG_FILE_DEFAULT_NAME = "config.json"
+
+if sys.platform in ("linux", "linux2", "darwin"):
+    filesystem_location = '/etc/iotcollector'
+elif sys.platform == "win32":
+    filesystem_location = os.path.join(os.path.dirname(__file__), os.pardir)
+
+
+def set_filesystem_location(new_location):
+    global filesystem_location
+    filesystem_location = new_location
+
+
+def init_file_system():
+    try:
+        if not os.path.exists(filesystem_location):
+            os.makedirs(filesystem_location)
+    except (Exception, OSError) as ex:
+        print >> sys.stdout, ex
+        add_log(50, ex)
+        sys.exit(1)
diff --git a/clients/iotapi/src/Settings/iotlogger.py 
b/clients/iotapi/src/Settings/iotlogger.py
new file mode 100644
--- /dev/null
+++ b/clients/iotapi/src/Settings/iotlogger.py
@@ -0,0 +1,38 @@
+import logging
+import sys
+
+import os
+
+if sys.platform in ("linux", "linux2", "darwin"):
+    logging_location = '/var/log/iot/iotapi.log'
+elif sys.platform == "win32":
+    logging_location = os.path.join(os.path.dirname(__file__), os.pardir, 
'iotapi.log')
+
+logger = logging.getLogger("IOTAPILog")
+
+
+def set_logging_location(new_location):
+    global logging_location
+    logging_location = new_location
+
+
+def init_logging():
+    global logger
+    try:
+        logger = logging.getLogger("IOTLog")
+        logger.setLevel(logging.DEBUG)
+        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s 
- %(message)s')
+
+        logging_path = os.path.dirname(logging_location)
+        if not os.path.exists(logging_path):
+            os.makedirs(logging_path)
+        log_handler = logging.FileHandler(logging_location, mode='a+')
+        log_handler.setFormatter(formatter)
+        logger.addHandler(log_handler)
+    except (Exception, OSError) as ex:
+        print >> sys.stdout, ex
+        sys.exit(1)
+
+
+def add_log(lvl, message, *args, **kwargs):
+    logger.log(lvl, message, *args, **kwargs)
diff --git a/clients/iotapi/src/Settings/mapiconnection.py 
b/clients/iotapi/src/Settings/mapiconnection.py
new file mode 100644
--- /dev/null
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -0,0 +1,50 @@
+import getpass
+import sys
+
+import pymonetdb
+from Settings.iotlogger import add_log
+
+Connection = None
+
+
+def init_monetdb_connection(hostname, port, user_name, database):
+    global Connection
+
+    user_password = getpass.getpass(prompt='Insert password for user ' + 
user_name + ':')
+
+    if user_password == '':
+        user_password = 'monetdb'
+
+    try:  # the autocommit is set to true so each statement will be independent
+        Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=user_password,
+                                       database=database, autocommit=True)
+        Connection.execute("SET SCHEMA iot;")
+        log_message = 'User %s connected successfully to database %s' % 
(user_name, database)
+        print >> sys.stdout, log_message
+        add_log(20, log_message)
+    except BaseException as ex:
+        print >> sys.stdout, ex
+        add_log(50, ex)
+        sys.exit(1)
+
+
+def close_monetdb_connection():
+    Connection.close()
+
+
+def fetch_streams():
+    try:  # TODO paginate results?
+        cursor = Connection.cursor()
+        sql_string = """
+          SELECT storage."schema", storage."table", storage."column", 
storage."type", storage."location",
+          storage."typewidth"
+          FROM (SELECT "schema", "table", "column", "type" FROM sys.storage) 
AS storage
+          INNER JOIN (SELECT "name" FROM sys.tables WHERE type=4) AS tables ON 
(storage."table"=tables."name")
+          INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON 
(storage."schema"=schemas."name");
+        """.replace('\n', ' ')
+        cursor.execute(sql_string)
+        return cursor.fetchall()
+    except BaseException as ex:
+        print >> sys.stdout, ex
+        add_log(50, ex)
+        sys.exit(1)
diff --git a/clients/iotapi/src/Streams/__init__.py 
b/clients/iotapi/src/Streams/__init__.py
new file mode 100644
diff --git a/clients/iotapi/src/Streams/datatypes.py 
b/clients/iotapi/src/Streams/datatypes.py
new file mode 100644
--- /dev/null
+++ b/clients/iotapi/src/Streams/datatypes.py
@@ -0,0 +1,205 @@
+import itertools
+import struct
+from abc import ABCMeta, abstractmethod
+from datetime import date, time, datetime
+
+from dateutil.relativedelta import relativedelta
+
+ALIGNMENT = '<'  # for now is little-endian for Intel CPU's
+
+NIL_STRING = "\200\n"  # added newline for performance
+
+INT8_MIN = 0x80
+INT16_MIN = 0x8000
+INT32_MIN = 0x80000000
+INT64_MIN = 0x8000000000000000
+INT64_MAX = 0xFFFFFFFFFFFFFFFF
+INT128_MIN = 0x80000000000000000000000000000000
+
+FLOAT_NAN = struct.unpack('f', '\xff\xff\x7f\xff')[0]
+DOUBLE_NAN = struct.unpack('d', '\xff\xff\xff\xff\xff\xff\xef\xff')[0]
+
+
+class StreamDataType(object):
+    """MonetDB's data types for validation base class"""
+    __metaclass__ = ABCMeta
+
+    def __init__(self, **kwargs):
+        self._column_name = kwargs['name']  # name of the column
+        self._data_type = kwargs['type']  # SQL name of the type
+        self._location = kwargs['location'] + '.tail'  # Location of the file
+
+    @abstractmethod
+    def read_next_batch(self, file_pointer, count):
+        return []
+
+    def to_json_representation(self):
+        return {'name': self._column_name, 'type': self._data_type}
+
+
+class TextType(StreamDataType):
+    """Covers: TEXT, STRING, CLOB and CHARACTER LARGE OBJECT"""
+
+    def __init__(self, **kwargs):
+        super(TextType, self).__init__(**kwargs)
+        self._nullable_constant = NIL_STRING
+
+    def read_next_batch(self, file_pointer, count):
+        array = list(itertools.islice(file_pointer, count))
+        return map(lambda x: None if x == self._nullable_constant else x[:-1], 
array)
+
+
+class BooleanType(StreamDataType):
+    """Covers: BOOLEAN"""
+
+    def __init__(self, **kwargs):
+        super(BooleanType, self).__init__(**kwargs)
+        self._nullable_constant = INT8_MIN
+
+    def read_next_batch(self, file_pointer, count):
+        array = struct.unpack(ALIGNMENT + str(count) + 'b', 
file_pointer.read(count))
+        return map(lambda x: None if x == self._nullable_constant else 
bool(x), array)
+
+
+class SmallIntegerType(StreamDataType):
+    """Covers: TINYINT, SMALLINT, INT[EGER], BIGINT"""
+
+    def __init__(self, **kwargs):
+        super(SmallIntegerType, self).__init__(**kwargs)
+        self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i', 
'integer': 'i', 'bigint': 'q'} \
+            .get(self._data_type)
+        self._size = struct.calcsize(self._pack_sym)
+        self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN, 
'int': INT32_MIN, 'integer': INT32_MIN,
+                                   'bigint': INT64_MIN}.get(self._data_type)
+
+    def read_next_batch(self, file_pointer, count):
+        array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, 
file_pointer.read(count * self._size))
+        return map(lambda x: None if x == self._nullable_constant else int(x), 
array)
+
+
+class HugeIntegerType(StreamDataType):
+    """Covers: HUGEINT"""
+
+    def __init__(self, **kwargs):
+        super(HugeIntegerType, self).__init__(**kwargs)
+        self._nullable_constant = INT128_MIN
+
+    def read_next_batch(self, file_pointer, count):  # [entry & INT64_MAX, 
(entry >> 64) & INT64_MAX]
+        array = struct.unpack(ALIGNMENT + str(count << 1) + 'Q', 
file_pointer.read(count << 3))
+        results = []
+        iterator = iter(array)  # has to iterate two values at once, so use 
iterator
+        for value in iterator:
+            next_huge = value + (next(iterator) << 64)
+            if next_huge == self._nullable_constant:
+                results.append(None)
+            else:
+                results.append(int(next_huge))
+        return results
+
+
+class FloatType(StreamDataType):
+    """Covers: REAL, FLOAT and DOUBLE"""
+
+    def __init__(self, **kwargs):
+        super(FloatType, self).__init__(**kwargs)
+        self._pack_sym = {'real': 'f', 'float': 'd', 'double': 
'd'}.get(self._data_type)
+        self._size = struct.calcsize(self._pack_sym)
+        self._nullable_constant = {'real': FLOAT_NAN, 'float': DOUBLE_NAN, 
'double': DOUBLE_NAN}.get(self._data_type)
+
+    def read_next_batch(self, file_pointer, count):
+        array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, 
file_pointer.read(count * self._size))
+        return map(lambda x: None if x == self._nullable_constant else 
float(x), array)
+
+
+class DecimalType(StreamDataType):
+    """Covers: DECIMAL and NUMERIC"""
+
+    def __init__(self, **kwargs):
+        super(DecimalType, self).__init__(**kwargs)
+
+        self._pack_sym = {'1': 'b', '2': 'h', '4': 'i', '8': 'q', '16': 
'Q'}.get(kwargs['typewidth'])
+        self._nullable_constant = {'1': INT8_MIN, '2': INT16_MIN, '4': 
INT32_MIN, '8': INT64_MIN, '16': INT128_MIN} \
+            .get(kwargs['typewidth'])
+        self._size = struct.calcsize(self._pack_sym)
+        if self._pack_sym == 'Q':
+            self._size <<= 1  # has to read two values at once
+
+    def read_next_batch(self, file_pointer, count):
+        array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, 
file_pointer.read(count * self._size))
+        if self._pack_sym != 'Q':
+            return map(lambda x: None if x == self._nullable_constant else 
float(x), array)
+
+        results = []
+        iterator = iter(array)  # has to iterate two values at once, so use 
iterator
+        for value in iterator:
+            next_huge_decimal = value + (next(iterator) << 64)
+            if next_huge_decimal == self._nullable_constant:
+                results.append(None)
+            else:
+                results.append(next_huge_decimal)
+        return results
+
+
+class DateType(StreamDataType):  # Stored as an uint with the number of days 
since day 1 of month 1 (Jan) from year 0
+    """Covers: DATE"""
+
+    def __init__(self, **kwargs):
+        super(DateType, self).__init__(**kwargs)
+        self._nullable_constant = INT32_MIN
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to