Changeset: 2dd269ec052a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2dd269ec052a
Modified Files:
clients/iotclient/src/Streams/datatypes.py
clients/iotclient/src/Streams/streams.py
clients/iotclient/src/Streams/streamscontext.py
clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:
merger
diffs (truncated from 551 to 300 lines):
diff --git a/clients/iotclient/src/Streams/datatypes.py
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -104,7 +104,6 @@ class StreamDataType(object):
def create_stream_sql(self): # get column creation statement on SQL
array = [self._column_name, " ", self._data_type]
self.process_sql_parameters(array) # add extra parameters to the SQL
statement
-
if self._default_value is not None:
array.extend([" DEFAULT '", str(self._default_value), "'"])
if not self._is_nullable:
@@ -237,11 +236,11 @@ class RegexType(TextType):
def add_json_schema_entry(self, schema):
super(RegexType, self).add_json_schema_entry(schema)
- schema[self._column_name]['pattern'] = self._regex
+ schema[self._column_name]['pattern'] = self._regex.pattern
def to_json_representation(self):
json_value = super(RegexType, self).to_json_representation()
- json_value['regex'] = self._regex
+ json_value['regex'] = self._regex.pattern
return json_value
def process_sql_parameters(self, array):
@@ -253,7 +252,7 @@ class LimitedTextType(TextType):
def __init__(self, **kwargs):
super(LimitedTextType, self).__init__(**kwargs)
- self._limit = int(kwargs['limit'])
+ self._limit = kwargs['limit']
def add_json_schema_entry(self, schema):
super(LimitedTextType, self).add_json_schema_entry(schema)
@@ -263,7 +262,8 @@ class LimitedTextType(TextType):
str_value = str(default_value)
parsed_len = len(str_value)
if parsed_len > self._limit:
- raise Exception('The default string is higher than the limit: %d >
%d' % (parsed_len, self._limit))
+ raise Exception('The default string\'s length is longer than the
limit: %d > %d!'
+ % (parsed_len, self._limit))
self._default_value = str_value
def to_json_representation(self):
@@ -340,9 +340,9 @@ class NumberBaseType(StreamDataType):
def __init__(self, **kwargs):
super(NumberBaseType, self).__init__(**kwargs)
if 'minimum' in kwargs:
- self._minimum = self.process_next_value(kwargs['minimum'], 0, {},
{})
+ self._minimum = kwargs['minimum']
if 'maximum' in kwargs:
- self._maximum = self.process_next_value(kwargs['maximum'], 0, {},
{})
+ self._maximum = kwargs['maximum']
if hasattr(self, '_minimum') and hasattr(self, '_maximum') and
self._minimum > self._maximum:
raise Exception('The minimum value is higher than the maximum!')
@@ -353,21 +353,16 @@ class NumberBaseType(StreamDataType):
if hasattr(self, '_maximum'):
schema[self._column_name]['maximum'] = self._maximum
- @abstractmethod
- def process_default_value(self, value):
- return value
-
def set_default_value(self, default_value):
- parsed_val = self.process_default_value(default_value)
- if hasattr(self, '_minimum') and not hasattr(self, '_maximum') and
parsed_val < self._minimum:
- raise Exception('The default value is less than the minimum: %d <
%d' % (parsed_val, self._minimum))
- elif hasattr(self, '_maximum') and not hasattr(self, '_minimum') and
parsed_val > self._maximum:
- raise Exception('The default value is higher than the maximum: %d
> %d' % (parsed_val, self._maximum))
- elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and
parsed_val < self._minimum:
- raise Exception('The default value is out of range: %d < %d' %
(parsed_val, self._minimum))
- elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and
parsed_val > self._maximum:
- raise Exception('The default value is out of range: %d > %d' %
(parsed_val, self._maximum))
- self._default_value = parsed_val
+ if hasattr(self, '_minimum') and not hasattr(self, '_maximum') and
default_value < self._minimum:
+ raise Exception('The default value is less than the minimum: %s <
%s!' % (default_value, self._minimum))
+ elif hasattr(self, '_maximum') and not hasattr(self, '_minimum') and
default_value > self._maximum:
+ raise Exception('The default value is higher than the maximum: %s
> %s!' % (default_value, self._maximum))
+ elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and
default_value < self._minimum:
+ raise Exception('The default value is out of range: %s < %s!' %
(default_value, self._minimum))
+ elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and
default_value > self._maximum:
+ raise Exception('The default value is out of range: %s > %s!' %
(default_value, self._maximum))
+ self._default_value = default_value
def to_json_representation(self):
json_value = super(NumberBaseType, self).to_json_representation()
@@ -379,15 +374,15 @@ class NumberBaseType(StreamDataType):
def create_stream_sql(self):
string = super(NumberBaseType, self).create_stream_sql()
- array = []
+
if hasattr(self, '_minimum') and not hasattr(self, '_maximum'):
- array.extend([" CHECK (", self._column_name, " > ",
str(self._minimum), ")"])
+ return string + ''.join([" CHECK (", self._column_name, " > ",
str(self._minimum), ")"])
elif hasattr(self, '_maximum') and not hasattr(self, '_minimum'):
- array.extend([" CHECK (", self._column_name, " < ",
str(self._maximum), ")"])
+ return string + ''.join([" CHECK (", self._column_name, " < ",
str(self._maximum), ")"])
elif hasattr(self, '_maximum') and hasattr(self, '_minimum'):
- array.extend([" CHECK (", self._column_name, " BETWEEN ",
str(self._minimum),
- " AND ", str(self._maximum), ")"])
- return string.join(array)
+ return string + ''.join([" CHECK (", self._column_name, " BETWEEN
", str(self._minimum),
+ " AND ", str(self._maximum), ")"])
+ return string
class SmallIntegerType(NumberBaseType):
@@ -395,10 +390,10 @@ class SmallIntegerType(NumberBaseType):
def __init__(self, **kwargs):
super(SmallIntegerType, self).__init__(**kwargs)
- this_type = kwargs['type']
- self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i',
'integer': 'i', 'bigint': 'q'}.get(this_type)
+ self._pack_sym = {'tinyint': 'b', 'smallint': 'h', 'int': 'i',
'integer': 'i', 'bigint': 'q'} \
+ .get(kwargs['type'])
self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN,
'int': INT32_MIN, 'integer': INT32_MIN,
- 'bigint': INT64_MIN}.get(this_type)
+ 'bigint': INT64_MIN}.get(kwargs['type'])
def add_json_schema_entry(self, schema):
super(SmallIntegerType, self).add_json_schema_entry(schema)
@@ -407,14 +402,11 @@ class SmallIntegerType(NumberBaseType):
def get_nullable_constant(self):
return self._nullable_constant
- def process_default_value(self, value):
- return int(value)
-
def process_next_value(self, entry, counter, parameters, errors):
return int(entry)
def pack_parsed_values(self, extracted_values, counter, parameters):
- return struct.pack(ALIGNMENT + str(counter) + self._pack_sym,
extracted_values)
+ return struct.pack(ALIGNMENT + str(counter) + self._pack_sym,
*extracted_values)
class HugeIntegerType(NumberBaseType):
@@ -430,9 +422,6 @@ class HugeIntegerType(NumberBaseType):
def get_nullable_constant(self):
return INT128_MIN
- def process_default_value(self, value):
- return int(value)
-
def process_next_value(self, entry, counter, parameters, errors):
return [entry & INT64_MAX, (entry >> 64) & INT64_MAX]
@@ -457,9 +446,6 @@ class FloatType(NumberBaseType):
def get_nullable_constant(self):
return self._nullable_constant
- def process_default_value(self, value):
- return float(value)
-
def process_next_value(self, entry, counter, parameters, errors):
return float(entry)
@@ -473,17 +459,24 @@ class DecimalType(NumberBaseType):
def __init__(self, **kwargs):
super(DecimalType, self).__init__(**kwargs)
if 'precision' in kwargs:
- self._precision = int(kwargs['precision'])
+ self._precision = kwargs['precision']
else:
self._precision = 18
if 'scale' in kwargs:
- self._scale = int(kwargs['scale'])
+ self._scale = kwargs['scale']
else:
self._scale = 0
if self._scale > self._precision:
raise Exception('The scale must be between 0 and the precision!')
+ if self._default_value is not None:
+ self.check_value_precision(self._default_value, 'default')
+ if hasattr(self, '_minimum'):
+ self.check_value_precision(self._minimum, 'minimum')
+ if hasattr(self, '_maximum'):
+ self.check_value_precision(self._maximum, 'maximum')
+
if self._precision <= 2: # calculate the number of bytes to use
according to the precision
self._pack_sym = 'b'
elif 2 < self._precision <= 4:
@@ -498,6 +491,11 @@ class DecimalType(NumberBaseType):
self._nullable_constant = {'b': INT8_MIN, 'h': INT16_MIN, 'i':
INT32_MIN, 'q': INT64_MIN, 'Q': INT128_MIN} \
.get(self._pack_sym)
+ def check_value_precision(self, value, text):
+ number_digits = int(math.ceil(math.log10(abs(value))))
+ if number_digits > self._precision:
+ raise Exception('Too many digits on %s value: %s > %s!' % (text,
number_digits, self._precision))
+
def add_json_schema_entry(self, schema):
super(DecimalType, self).add_json_schema_entry(schema)
schema[self._column_name]['type'] = 'number'
@@ -505,16 +503,8 @@ class DecimalType(NumberBaseType):
def get_nullable_constant(self):
return self._nullable_constant
- def process_default_value(self, value):
- number_digits = int(math.ceil(math.log10(abs(value))))
- if number_digits > self._precision:
- raise Exception('Too many digits on default value: %s > %s' %
(number_digits, self._precision))
- return int(value)
-
def process_next_value(self, entry, counter, parameters, errors):
- number_digits = int(math.ceil(math.log10(abs(entry))))
- if number_digits > self._precision:
- errors[counter] = 'Too many digits: %s > %s' % (number_digits,
self._precision)
+ self.check_value_precision(entry, 'entry')
parsed_value = int(entry)
if self._pack_sym != 'Q':
return parsed_value
@@ -524,7 +514,7 @@ class DecimalType(NumberBaseType):
def pack_parsed_values(self, extracted_values, counter, parameters):
if self._pack_sym == 'Q':
extracted_values = list(itertools.chain(*extracted_values))
- counter <<= 1
+ counter <<= 1 # duplicate the counter for packing
return struct.pack(ALIGNMENT + str(counter) + self._pack_sym,
*extracted_values)
def to_json_representation(self):
@@ -550,60 +540,56 @@ class BaseDateTimeType(StreamDataType):
self._maximum = self.parse_entry(kwargs['maximum'])
if hasattr(self, '_minimum') and hasattr(self, '_maximum') and
self._minimum > self._maximum:
raise Exception('The minimum value is higher than the maximum!')
- self._default_value_text = None # needed later for the SQL creation
statement
+
+ def get_nullable_constant(self):
+ return "0"
+
+ @abstractmethod
+ def parse_entry(self, entry):
+ pass
def set_default_value(self, default_value):
- parsed_val = self.parse_entry(default_value) # Process the default
value as others
+ parsed_val = self.parse_entry(default_value) # Process the default
value as the others
if hasattr(self, '_minimum') and not hasattr(self, '_maximum') and
parsed_val < self._minimum:
- raise Exception('The default value is less than the minimum: %s <
%s' % (default_value, self._minimum_text))
+ raise Exception('The default value is less than the minimum: %s <
%s!'
+ % (default_value, self._minimum_text))
elif hasattr(self, '_maximum') and not hasattr(self, '_minimum') and
parsed_val > self._maximum:
- raise Exception('The default value is higher than the maximum: %s
> %s'
+ raise Exception('The default value is higher than the maximum: %s
> %s!'
% (default_value, self._maximum_text))
elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and
parsed_val < self._minimum:
- raise Exception('The default value is out of range: %s < %s' %
(default_value, self._minimum_text))
+ raise Exception('The default value is out of range: %s < %s!' %
(default_value, self._minimum_text))
elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and
parsed_val > self._maximum:
- raise Exception('The default value is out of range: %s > %s' %
(default_value, self._maximum_text))
- self._default_value = parsed_val
- self._default_value_text = default_value
+ raise Exception('The default value is out of range: %s > %s!' %
(default_value, self._maximum_text))
+ self._default_value = default_value
+
+ @abstractmethod
+ def pack_next_value(self, parsed, counter, parameters, errors):
+ pass
+
+ def process_next_value(self, entry, counter, parameters, errors):
+ if entry == self.get_nullable_constant(): # have to do this trick due
to Python's datetime limitations
+ return self.pack_next_value(None, counter, parameters, errors)
+ parsed = self.parse_entry(entry)
+ if hasattr(self, '_minimum') and not hasattr(self, '_maximum') and
parsed < self._minimum:
+ errors[counter] = 'The value is higher than the minimum: %s < %s!'
% (self._minimum_text, parsed)
+ elif hasattr(self, '_maximum') and not hasattr(self, '_minimum') and
parsed > self._maximum:
+ errors[counter] = 'The value is higher than the maximum: %s > %s!'
% (parsed, self._maximum_text)
+ elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and
parsed < self._minimum:
+ errors[counter] = 'The value is out of range: %s < %s!' %
(self._minimum_text, parsed)
+ elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and
parsed > self._maximum:
+ errors[counter] = 'The value is out of range: %s > %s!' % (parsed,
self._maximum_text)
+ return self.pack_next_value(parsed, counter, parameters, errors)
def to_json_representation(self):
json_value = super(BaseDateTimeType, self).to_json_representation()
- json_value['type'] = 'string'
if hasattr(self, '_minimum'):
json_value['minimum'] = self._minimum_text
if hasattr(self, '_maximum'):
json_value['maximum'] = self._maximum_text
return json_value
- def create_stream_sql(self):
- array = [self._column_name, " ", self._data_type]
- if self._default_value is not None:
- array.extend([" DEFAULT '", str(self._default_value_text), "'"])
- if not self._is_nullable:
- array.append(" NOT NULL")
- return ''.join(array)
- @abstractmethod
- def parse_entry(self, entry):
- return 0
-
- @abstractmethod
- def pack_next_value(self, parsed, counter, parameters, errors):
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list