Changeset: 9b8b8136fbc1 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9b8b8136fbc1
Modified Files:
        clients/iotapi/src/Settings/filesystem.py
        clients/iotapi/src/Settings/mapiconnection.py
        clients/iotapi/src/Streams/datatypes.py
        clients/iotapi/src/Streams/streampolling.py
        clients/iotapi/src/Streams/streams.py
        clients/iotapi/src/Streams/streamscontext.py
        clients/iotapi/src/Utilities/customthreading.py
        clients/iotapi/src/WebSockets/websockets.py
        clients/iotapi/src/main.py
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/streampolling.py
        clients/iotclient/src/Streams/streamscreator.py
        clients/iotclient/tests/datatypesinsertstests.py
        clients/iotclient/tests/main.py
Branch: iot
Log Message:

Added interval type support, updated iotapi polling


diffs (truncated from 1848 to 300 lines):

diff --git a/clients/iotapi/src/Settings/filesystem.py 
b/clients/iotapi/src/Settings/filesystem.py
--- a/clients/iotapi/src/Settings/filesystem.py
+++ b/clients/iotapi/src/Settings/filesystem.py
@@ -6,7 +6,7 @@ from .iotlogger import add_log
 Baskets_Location = None
 
 if sys.platform in ("linux", "linux2", "darwin"):
-    DEFAULT_FILESYSTEM = '/etc/iotapi'
+    DEFAULT_FILESYSTEM = '/var/iotapi'
 elif sys.platform == "win32":
     DEFAULT_FILESYSTEM = os.path.join(os.path.dirname(__file__), os.pardir)
 
diff --git a/clients/iotapi/src/Settings/mapiconnection.py 
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -1,50 +1,44 @@
-import sys
-
 from monetdb.sql import connect
-from ..Settings.iotlogger import add_log
-
-Connection = None
+from .iotlogger import add_log
 
 
 def init_monetdb_connection(hostname, port, user_name, user_password, 
database):
-    global Connection
+    return connect(hostname=hostname, port=port, username=user_name, 
password=user_password, database=database)
 
-    try:  # the autocommit is set to true so each statement will be independent
-        Connection = connect(hostname=hostname, port=port, username=user_name, 
password=user_password,
-                             database=database, autocommit=True)
-        log_message = 'User %s connected successfully to database %s' % 
(user_name, database)
-        print log_message
-        add_log(20, log_message)
-    except BaseException as ex:
-        print ex
-        add_log(50, ex)
-        sys.exit(1)
 
+def close_monetdb_connection(connection):
+    connection.close()
 
-def close_monetdb_connection():
-    Connection.close()
 
-
-def check_hugeint_type():
-    Connection.execute("START TRANSACTION")
-    cursor = Connection.cursor()
+def check_hugeint_type(connection):
+    cursor = connection.cursor()
     cursor.execute("SELECT COUNT(*) FROM sys.types WHERE sqlname='hugeint'")
     result = cursor.fetchall()[0][0]
-    Connection.commit()
-    return result
+    connection.commit()
+    return result > 0
 
 
-def fetch_streams():
+def mapi_get_database_streams(connection):
     try:
-        cursor = Connection.cursor()
-        sql_string = """SELECT schemas."name" as schema, tables."name" as 
table, columns."name" as column,
-             columns."type", columns."type_digits", columns."type_scale", 
columns."default", columns."null" FROM
+        cursor = connection.cursor()
+        sql_string = """SELECT tables."id", schemas."name" AS schema, 
tables."name" AS table FROM
              (SELECT "id", "name", "schema_id" FROM sys.tables WHERE type=4) 
AS tables INNER JOIN (SELECT "id", "name"
-             FROM sys.schemas) AS schemas ON (tables."schema_id"=schemas."id") 
INNER JOIN (SELECT "table_id", "name",
-             "type", "type_digits", "type_scale", "default", "null" FROM 
sys.columns) AS columns ON
-             (columns."table_id"=tables."id")""".replace('\n', ' ')
+             FROM sys.schemas) AS schemas ON 
(tables."schema_id"=schemas."id")""".replace('\n', ' ')
         cursor.execute(sql_string)
-        return cursor.fetchall()
+        tables = cursor.fetchall()
+
+        cursor = connection.cursor()
+        sql_string = """SELECT columns."table_id", columns."name" AS column, 
columns."type", columns."type_digits",
+            columns."type_scale", columns."default", columns."null" FROM 
(SELECT "table_id", "name", "type",
+            "type_digits", "type_scale", "default", "null", "number" FROM 
sys.columns) AS columns INNER JOIN
+            (SELECT "id" FROM sys.tables WHERE type=4) AS tables ON 
(tables."id"=columns."table_id")
+            ORDER BY columns."table_id", columns."number" """.replace('\n', ' 
')
+        cursor.execute(sql_string)
+        columns = cursor.fetchall()
+
+        connection.commit()
+        return tables, columns
     except BaseException as ex:
         add_log(50, ex)
+        connection.rollback()
         raise
diff --git a/clients/iotapi/src/Streams/datatypes.py 
b/clients/iotapi/src/Streams/datatypes.py
--- a/clients/iotapi/src/Streams/datatypes.py
+++ b/clients/iotapi/src/Streams/datatypes.py
@@ -21,27 +21,25 @@ FLOAT_NAN = struct.unpack('f', '\xff\xff
 DOUBLE_NAN = struct.unpack('d', '\xff\xff\xff\xff\xff\xff\xef\xff')[0]
 
 
-# elem[0] is column name, elem[1] is type, elem[2] is type_digits, elem[3] is 
type_scale elem[4] is default value
-# elem[5] is nullable
 class StreamDataType(object):
     """MonetDB's data types for reading base class"""
     __metaclass__ = ABCMeta
 
-    def __init__(self, *args):
-        self._column_name = args[0]  # name of the column
-        self._data_type = args[1]  # SQL name of the type
-        self._default_value = args[4]  # default value text
-        self._is_nullable = args[5]  # is nullable
+    def __init__(self, **kwargs):
+        self._column_name = kwargs['name']  # name of the column
+        self._data_type = kwargs['type']  # SQL name of the type
+        self._default_value = kwargs['default']  # default value text
+        self._is_nullable = kwargs['nullable']  # is nullable
 
     def is_file_mode_binary(self):
         return True
 
     @abstractmethod
-    def skip_tuples(self, file_pointer, offset):
+    def skip_tuples(self, fp, offset):
         pass
 
     @abstractmethod
-    def read_next_batch(self, file_pointer, limit):
+    def read_next_batch(self, fp, limit):
         pass
 
     def read_next_tuples(self, file_name, offset, read_size):
@@ -65,21 +63,21 @@ class StreamDataType(object):
 class TextType(StreamDataType):
     """Covers: CLOB and Url"""
 
-    def __init__(self, *args):
-        super(TextType, self).__init__(*args)
+    def __init__(self, **kwargs):
+        super(TextType, self).__init__(**kwargs)
         self._nullable_constant = NIL_STRING
 
     def is_file_mode_binary(self):
         return False
 
-    def skip_tuples(self, file_pointer, offset):
+    def skip_tuples(self, fp, offset):
         for _ in xrange(offset):
-            next(file_pointer)
+            next(fp)
 
-    def read_next_batch(self, file_pointer, limit):
+    def read_next_batch(self, fp, limit):
         array = []
         for _ in xrange(limit):
-            next_line = next(file_pointer)
+            next_line = next(fp)
             if next_line == self._nullable_constant:
                 array.append(None)
             else:
@@ -90,9 +88,9 @@ class TextType(StreamDataType):
 class LimitedTextType(TextType):
     """Covers: CHAR and VARCHAR"""
 
-    def __init__(self, *args):
-        super(LimitedTextType, self).__init__(*args)
-        self._limit = args[2]
+    def __init__(self, **kwargs):
+        super(LimitedTextType, self).__init__(**kwargs)
+        self._limit = kwargs['digits']
 
     def to_json_representation(self):
         json_value = super(LimitedTextType, self).to_json_representation()
@@ -103,16 +101,16 @@ class LimitedTextType(TextType):
 class INetType(StreamDataType):
     """Covers: Inet"""
 
-    def __init__(self, *args):
-        super(INetType, self).__init__(*args)
+    def __init__(self, **kwargs):
+        super(INetType, self).__init__(**kwargs)
 
-    def skip_tuples(self, file_pointer, offset):
-        file_pointer.seek(offset << 3)
+    def skip_tuples(self, fp, offset):
+        fp.seek(offset << 3)
 
-    def read_next_batch(self, file_pointer, limit):
+    def read_next_batch(self, fp, limit):
         results = []
         read_size = limit << 3
-        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', 
file_pointer.read(read_size))
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', 
fp.read(read_size))
         iterator = iter(array)
 
         for _ in xrange(limit):
@@ -128,16 +126,16 @@ class INetType(StreamDataType):
 class UUIDType(StreamDataType):
     """Covers: UUID"""
 
-    def __init__(self, *args):
-        super(UUIDType, self).__init__(*args)
+    def __init__(self, **kwargs):
+        super(UUIDType, self).__init__(**kwargs)
 
-    def skip_tuples(self, file_pointer, offset):
-        file_pointer.seek(offset << 4)
+    def skip_tuples(self, fp, offset):
+        fp.seek(offset << 4)
 
-    def read_next_batch(self, file_pointer, limit):
+    def read_next_batch(self, fp, limit):
         results = []
         read_size = limit << 4
-        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', 
file_pointer.read(read_size))
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', 
fp.read(read_size))
         iterator = iter(array)
 
         for _ in xrange(limit):
@@ -158,50 +156,49 @@ class UUIDType(StreamDataType):
 class BooleanType(StreamDataType):
     """Covers: BOOLEAN"""
 
-    def __init__(self, *args):
-        super(BooleanType, self).__init__(*args)
+    def __init__(self, **kwargs):
+        super(BooleanType, self).__init__(**kwargs)
         self._nullable_constant = INT8_MIN
 
-    def skip_tuples(self, file_pointer, offset):
-        file_pointer.seek(offset)
+    def skip_tuples(self, fp, offset):
+        fp.seek(offset)
 
-    def read_next_batch(self, file_pointer, limit):
-        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'b', 
file_pointer.read(limit))
+    def read_next_batch(self, fp, limit):
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'b', 
fp.read(limit))
         return map(lambda x: None if x == self._nullable_constant else 
bool(x), array)
 
 
 class SmallIntegerType(StreamDataType):
     """Covers: TINYINT, SMALLINT, INTEGER, BIGINT"""
 
-    def __init__(self, *args):
-        super(SmallIntegerType, self).__init__(*args)
+    def __init__(self, **kwargs):
+        super(SmallIntegerType, self).__init__(**kwargs)
         self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i', 
'integer': 'i', 'bigint': 'q'} \
             .get(self._data_type)
         self._size = struct.calcsize(self._pack_sym)
         self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN, 
'int': INT32_MIN, 'integer': INT32_MIN,
                                    'bigint': INT64_MIN}.get(self._data_type)
 
-    def skip_tuples(self, file_pointer, offset):
-        file_pointer.seek(offset * self._size)
+    def skip_tuples(self, fp, offset):
+        fp.seek(offset * self._size)
 
-    def read_next_batch(self, file_pointer, limit):
-        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 
self._pack_sym,
-                              file_pointer.read(limit * self._size))
+    def read_next_batch(self, fp, limit):
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 
self._pack_sym, fp.read(limit * self._size))
         return map(lambda x: None if x == self._nullable_constant else int(x), 
array)
 
 
 class HugeIntegerType(StreamDataType):
     """Covers: HUGEINT"""
 
-    def __init__(self, *args):
-        super(HugeIntegerType, self).__init__(*args)
+    def __init__(self, **kwargs):
+        super(HugeIntegerType, self).__init__(**kwargs)
         self._nullable_constant = INT128_MIN
 
-    def skip_tuples(self, file_pointer, offset):
-        file_pointer.seek(offset << 4)
+    def skip_tuples(self, fp, offset):
+        fp.seek(offset << 4)
 
-    def read_next_batch(self, file_pointer, limit):  # [entry & INT64_MAX, 
(entry >> 64) & INT64_MAX]
-        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'Q', 
file_pointer.read(limit << 4))
+    def read_next_batch(self, fp, limit):  # [entry & INT64_MAX, (entry >> 64) 
& INT64_MAX]
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'Q', 
fp.read(limit << 4))
         results = []
         iterator = iter(array)  # has to iterate two values at once, so use 
iterator
         for value in iterator:
@@ -216,28 +213,27 @@ class HugeIntegerType(StreamDataType):
 class FloatType(StreamDataType):
     """Covers: REAL, DOUBLE"""
 
-    def __init__(self, *args):
-        super(FloatType, self).__init__(*args)
+    def __init__(self, **kwargs):
+        super(FloatType, self).__init__(**kwargs)
         self._pack_sym = {'real': 'f', 'float': 'd', 'double': 
'd'}.get(self._data_type)
         self._size = struct.calcsize(self._pack_sym)
         self._nullable_constant = {'real': FLOAT_NAN, 'float': DOUBLE_NAN, 
'double': DOUBLE_NAN}.get(self._data_type)
 
-    def skip_tuples(self, file_pointer, offset):
-        file_pointer.seek(offset * self._size)
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to