Changeset: ee944c8e9c97 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ee944c8e9c97
Removed Files:
clients/iotclient/src/Streams/flushing.py
Modified Files:
clients/iotapi/src/Settings/mapiconnection.py
clients/iotapi/src/Streams/streampolling.py
clients/iotapi/src/Streams/streams.py
clients/iotapi/src/Streams/streamscontext.py
clients/iotapi/src/WebSockets/jsonschemas.py
clients/iotclient/src/Settings/filesystem.py
clients/iotclient/src/Settings/mapiconnection.py
clients/iotclient/src/Streams/datatypes.py
clients/iotclient/src/Streams/streams.py
clients/iotclient/src/Streams/streamscreator.py
clients/iotclient/src/Utilities/filecreator.py
Branch: iot
Log Message:
Cleaned code
diffs (truncated from 578 to 300 lines):
diff --git a/clients/iotapi/src/Settings/mapiconnection.py
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -12,9 +12,6 @@ def init_monetdb_connection(hostname, po
user_password = getpass.getpass(prompt='Insert password for user ' +
user_name + ':')
- if user_password == '':
- user_password = 'monetdb'
-
try: # the autocommit is set to true so each statement will be independent
Connection = pymonetdb.connect(hostname=hostname, port=port,
username=user_name, password=user_password,
database=database, autocommit=True)
@@ -35,15 +32,12 @@ def close_monetdb_connection():
def fetch_streams():
try: # TODO paginate results?
cursor = Connection.cursor()
- sql_string = """
- SELECT storage."schema", storage."table", storage."column",
storage."type", storage."location",
- storage."typewidth"
+ sql_string = """SELECT storage."schema", storage."table",
storage."column", storage."type", storage."typewidth"
FROM (SELECT "schema", "table", "column", "type" FROM sys.storage)
AS storage
INNER JOIN (SELECT "name" FROM sys.tables WHERE type=4) AS tables ON
(storage."table"=tables."name")
- INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON
(storage."schema"=schemas."name");
- """.replace('\n', ' ')
+ INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON
(storage."schema"=schemas."name");"""\
+ .replace('\n', ' ')
cursor.execute(sql_string)
return cursor.fetchall()
except BaseException as ex:
- print >> sys.stdout, ex
add_log(50, ex)
diff --git a/clients/iotapi/src/Streams/streampolling.py
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -2,8 +2,9 @@ from itertools import groupby
from Settings.mapiconnection import fetch_streams
from Utilities.customthreading import PeriodicalThread
-
-from streamscontext import Streams_context, DataCellStream
+from datatypes import *
+from streams import IOTStream
+from streamscontext import Streams_context
SWITCHER = [{'types': ['clob', 'char', 'varchar', 'url'], 'class': 'TextType'},
{'types': ['tinyint', 'smallint', 'int', 'bigint'], 'class':
'SmallIntegerType'},
@@ -23,7 +24,7 @@ def init_stream_polling_thread(interval)
thread.start()
-# elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type,
elem[4] is location, elem[5] is typewidth
+# elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type,
elem[4] is typewidth
def stream_polling():
current_streams = Streams_context.get_existing_streams()
retained_streams = []
@@ -35,11 +36,13 @@ def stream_polling():
columns = {}
for elem in group:
- reflection_class = globals()[elem[3]] # import everything
from datatypes!!!
- kwargs = {'name': elem[2], 'type': elem[3], 'location':
elem[4], 'typewidth': elem[5]}
- new_column = reflection_class(kwargs)
- columns[elem[2]] = new_column
- new_streams[key] = DataCellStream(key, columns)
+ for entry in SWITCHER: # allocate the proper type wrapper
+ if elem[3] in entry['types']:
+ reflection_class = globals()[entry['class']] # import
everything from datatypes!!!
+ new_column = reflection_class({'name': elem[2],
'type': elem[3], 'typewidth': elem[4]})
+ columns[elem[2]] = new_column
+ new_streams[key] = IOTStream(key, columns)
+ break
else:
retained_streams.append(key)
diff --git a/clients/iotapi/src/Streams/streams.py
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -1,6 +1,7 @@
import os
import struct
+from datatypes import LITTLE_ENDIAN_ALIGNMENT
from Settings.filesystem import get_baskets_base_location
from Utilities.readwritelock import RWLock
from WebSockets.websockets import notify_clients
@@ -63,7 +64,7 @@ class IOTStream(object):
def append_basket(self, path):
if represents_int(path):
with open(os.path.join(self._base_path, path)) as f:
- count = struct.unpack('i', f.read(4))[0]
+ count = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + '1i',
f.read(4))[0]
self._lock.acquire_write()
self._baskets[int(path)] = count
self._lock.release()
diff --git a/clients/iotapi/src/Streams/streamscontext.py
b/clients/iotapi/src/Streams/streamscontext.py
--- a/clients/iotapi/src/Streams/streamscontext.py
+++ b/clients/iotapi/src/Streams/streamscontext.py
@@ -1,7 +1,5 @@
-import collections
-
from Utilities.readwritelock import RWLock
-from WebSockets.websockets import desubscribe_removed_streams
+from WebSockets.websockets import unsubscribe_removed_streams
class IOTStreams(object):
@@ -12,7 +10,7 @@ class IOTStreams(object):
return schema_name + '.' + stream_name
def __init__(self):
- self._context = collections.OrderedDict() # dictionary of schema_name
+ '.' + stream_name -> DataCellStream
+ self._context = {} # dictionary of schema_name + '.' + stream_name ->
DataCellStream
self._locker = RWLock()
def get_existing_streams(self):
@@ -28,7 +26,7 @@ class IOTStreams(object):
del self._context[k]
self._context.update(new_streams)
self._locker.release()
- desubscribe_removed_streams(removed_streams)
+ unsubscribe_removed_streams(removed_streams)
def get_existing_stream(self, concatenated_name):
self._locker.acquire_read()
diff --git a/clients/iotapi/src/WebSockets/jsonschemas.py
b/clients/iotapi/src/WebSockets/jsonschemas.py
--- a/clients/iotapi/src/WebSockets/jsonschemas.py
+++ b/clients/iotapi/src/WebSockets/jsonschemas.py
@@ -19,17 +19,11 @@ CLIENTS_INPUTS_SCHEMA = {
"additionalProperties": False
}, {
"properties": {
- "action": {"type": "string", "enum": ["info"]}
- },
- "required": ["action"],
- "additionalProperties": False
- }, {
- "properties": {
"schema": {"type": "string"},
"stream": {"type": "string"},
"action": {"type": "string", "enum": READ_OPTS},
"basket": {"type": "integer", "minimum": 1, "default": 1},
- "limit": {"type": "integer", "minimum": 0, "default": 0},
+ "limit": {"type": "integer", "minimum": 0, "default": 100},
"offset": {"type": "integer", "minimum": 0, "default": 0}
},
"required": ["schema", "stream", "action"],
diff --git a/clients/iotclient/src/Settings/filesystem.py
b/clients/iotclient/src/Settings/filesystem.py
--- a/clients/iotclient/src/Settings/filesystem.py
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -32,10 +32,10 @@ def init_file_system(host_identifier=Non
os.makedirs(Baskets_Location)
if new_configfile_location is not None:
- Config_File_Location =
create_file_if_not_exists(new_configfile_location, hidden=False, init_text='[]')
+ Config_File_Location =
create_file_if_not_exists(new_configfile_location, init_text='[]')
else:
Config_File_Location = create_file_if_not_exists(
- os.path.join(Filesystem_Location, CONFIG_FILE_DEFAULT_NAME),
hidden=False, init_text='[]')
+ os.path.join(Filesystem_Location, CONFIG_FILE_DEFAULT_NAME),
init_text='[]')
Host_Identifier = host_identifier
except (Exception, OSError) as ex:
diff --git a/clients/iotclient/src/Settings/mapiconnection.py
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -12,9 +12,6 @@ def init_monetdb_connection(hostname, po
user_password = getpass.getpass(prompt='Insert password for user ' +
user_name + ':')
- if user_password == '':
- user_password = 'monetdb'
-
try: # the autocommit is set to true so each statement will be independent
Connection = pymonetdb.connect(hostname=hostname, port=port,
username=user_name, password=user_password,
database=database, autocommit=True)
diff --git a/clients/iotclient/src/Streams/datatypes.py
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -541,7 +541,7 @@ class DecimalType(NumberBaseType):
def check_value_precision(self, value, text):
number_digits = int(math.ceil(math.log10(abs(value))))
if number_digits > self._precision:
- raise Exception('Too many digits on %s value: %s > %s!' % (text,
number_digits, self._precision))
+ raise Exception('Too many digits on %s: %s > %s!' % (text,
number_digits, self._precision))
def add_json_schema_entry(self, schema):
super(DecimalType, self).add_json_schema_entry(schema)
diff --git a/clients/iotclient/src/Streams/flushing.py
b/clients/iotclient/src/Streams/flushing.py
deleted file mode 100644
--- a/clients/iotclient/src/Streams/flushing.py
+++ /dev/null
@@ -1,52 +0,0 @@
-from Utilities.customthreading import PeriodicalThread
-from abc import ABCMeta, abstractmethod
-
-
-class StreamFlushingMethod(object):
- """Base class for flushing"""
-
- __metaclass__ = ABCMeta
-
- def __init__(self):
- pass
-
- @abstractmethod
- def get_dictionary_info(self):
- pass
-
-
-class TimeBasedFlushing(StreamFlushingMethod):
- """Time based flushing"""
-
- def __init__(self, interval, time_unit):
- super(TimeBasedFlushing, self).__init__()
- self._interval = interval
- self._time_unit = time_unit
- self._local_thread = None
-
- def init_local_thread(self, stream):
- if self._time_unit == "s":
- interval = self._interval
- elif self._time_unit == "m":
- interval = self._interval * 60
- else:
- interval = self._interval * 3600
- self._local_thread = PeriodicalThread(interval,
stream.time_based_flush)
- self._local_thread.start()
-
- def stop_local_thread(self):
- self._local_thread.stop()
-
- def get_dictionary_info(self):
- return {'base': 'time', 'unit': self._time_unit, 'interval':
self._interval}
-
-
-class TupleBasedFlushing(StreamFlushingMethod):
- """Tuple based flushing"""
-
- def __init__(self, limit):
- super(TupleBasedFlushing, self).__init__()
- self.limit = limit
-
- def get_dictionary_info(self):
- return {'base': 'tuple', 'number': self.limit}
diff --git a/clients/iotclient/src/Streams/streams.py
b/clients/iotclient/src/Streams/streams.py
--- a/clients/iotclient/src/Streams/streams.py
+++ b/clients/iotclient/src/Streams/streams.py
@@ -1,14 +1,24 @@
import os
+import struct
+from abc import ABCMeta, abstractmethod
from collections import defaultdict, OrderedDict
from Settings.filesystem import get_baskets_base_location, get_host_identifier
from Settings.iotlogger import add_log
from Settings.mapiconnection import mapi_create_stream, mapi_flush_baskets
-from Utilities.filecreator import create_file_if_not_exists,
get_hidden_file_name
+from Utilities.filecreator import create_file_if_not_exists
from Utilities.readwritelock import RWLock
-from datatypes import TimestampType, TextType, DataValidationException
-from flushing import TimeBasedFlushing, TupleBasedFlushing
+from Utilities.customthreading import PeriodicalThread
+from datatypes import TimestampType, TextType, DataValidationException,
LITTLE_ENDIAN_ALIGNMENT
+
+IMPLICIT_TIMESTAMP_COLUMN_NAME = 'implicit_timestamp'
+Timestamps_Handler = TimestampType(name=IMPLICIT_TIMESTAMP_COLUMN_NAME,
type="timestamp") # timestamp
+Extra_columns_SQL = [Timestamps_Handler.create_stream_sql()] # array for SQL
creation
+
+HOST_IDENTIFIER_COLUMN_NAME = 'host_identifier'
+Hostname_Bin_Value = None
+BASKETS_COUNT_FILE = 'count'
def represents_int(s):
@@ -18,13 +28,6 @@ def represents_int(s):
except ValueError:
return False
-IMPLICIT_TIMESTAMP_COLUMN_NAME = 'implicit_timestamp'
-Timestamps_Handler = TimestampType(name=IMPLICIT_TIMESTAMP_COLUMN_NAME,
type="timestamp") # timestamp
-Extra_columns_SQL = [Timestamps_Handler.create_stream_sql()] # array for SQL
creation
-
-HOST_IDENTIFIER_COLUMN_NAME = 'host_identifier'
-Hostname_Bin_Value = None
-
def init_streams_hosts():
global Hostname_Bin_Value
@@ -43,14 +46,14 @@ class StreamException(Exception):
self.message = message # dictionary of column -> list of error
messages
-class IOTStream(object):
- """Representation of the stream for validation"""
+class BaseIOTStream(object):
+ """Representation of a stream for validation"""
+ __metaclass__ = ABCMeta
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list