Changeset: a91cdecbf145 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a91cdecbf145
Modified Files:
clients/iotapi/src/Streams/datatypes.py
clients/iotapi/src/Streams/streams.py
clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:
Finished data reconstruction
diffs (truncated from 334 to 300 lines):
diff --git a/clients/iotapi/src/Streams/datatypes.py
b/clients/iotapi/src/Streams/datatypes.py
--- a/clients/iotapi/src/Streams/datatypes.py
+++ b/clients/iotapi/src/Streams/datatypes.py
@@ -1,12 +1,13 @@
import struct
+from abc import ABCMeta, abstractmethod
from datetime import date, time, datetime
-from abc import ABCMeta, abstractmethod
from dateutil.relativedelta import relativedelta
-ALIGNMENT = '<' # for now is little-endian for Intel CPU's
+LITTLE_ENDIAN_ALIGNMENT = '<' # for now is little-endian for Intel CPU's
NIL_STRING = "\200\n" # added newline for performance
+NIL_UUID = "00000000-0000-0000-0000-000000000000"
INT8_MIN = 0x80
INT16_MIN = 0x8000
@@ -20,24 +21,40 @@ DOUBLE_NAN = struct.unpack('d', '\xff\xf
class StreamDataType(object):
- """MonetDB's data types for validation base class"""
+ """MonetDB's data types for reading base class"""
__metaclass__ = ABCMeta
def __init__(self, **kwargs):
self._column_name = kwargs['name'] # name of the column
self._data_type = kwargs['type'] # SQL name of the type
- # self._location = kwargs['location'] + '.tail' # Location of the file
def is_file_mode_binary(self):
return True
@abstractmethod
- def read_next_batch(self, file_pointer, count):
- return []
+ def skip_tuples(self, file_pointer, offset):
+ pass
+
+ @abstractmethod
+ def read_next_batch(self, file_pointer, limit):
+ pass
+
+ def read_next_tuples(self, file_name, offset, read_size):
+ open_string = 'r'
+ if not self.is_file_mode_binary():
+ open_string += 'u'
+ file_pointer = open(file_name, open_string)
+
+ if offset > 0:
+ self.skip_tuples(file_pointer, offset)
+
+ results = self.read_next_batch(file_pointer, read_size)
+ file_pointer.close()
+ return results
class TextType(StreamDataType):
- """Covers: CHAR, VARCHAR, CLOB"""
+ """Covers: CHAR, VARCHAR, CLOB and URL"""
def __init__(self, **kwargs):
super(TextType, self).__init__(**kwargs)
@@ -46,9 +63,69 @@ class TextType(StreamDataType):
def is_file_mode_binary(self):
return False
- def read_next_batch(self, file_pointer, count):
- array = file_pointer.readlines()
- return map(lambda x: None if x == self._nullable_constant else x[:-1],
array)
+ def skip_tuples(self, file_pointer, offset):
+ for _ in xrange(offset):
+ next(file_pointer)
+
+ def read_next_batch(self, file_pointer, limit):
+ array = []
+ for _ in xrange(limit):
+ next_line = next(file_pointer)
+ if next_line == self._nullable_constant:
+ array.append(None)
+ else:
+ array.append(next_line[:-1]) # remove newline
+ return array
+
+
+class INetType(StreamDataType):
+ """Covers: Inet"""
+
+ def __init__(self, **kwargs):
+ super(INetType, self).__init__(**kwargs)
+
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset << 3)
+
+ def read_next_batch(self, file_pointer, limit):
+ results = []
+ read_size = limit << 3
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B',
file_pointer.read(read_size))
+ iterator = iter(array)
+
+ for _ in xrange(limit):
+ next_ipv4 = [next(iterator) for _ in xrange(8)]
+ if next_ipv4[7] == 1: # check nil value
+ results.append(None)
+ else:
+ parsed_ip = '.'.join([str(next_ipv4[0]), str(next_ipv4[1]),
str(next_ipv4[2]), str(next_ipv4[3])])
+ results.append(parsed_ip + '/' + str(next_ipv4[4]))
+ return results
+
+
+class UUIDType(StreamDataType):
+ """Covers: UUID"""
+
+ def __init__(self, **kwargs):
+ super(UUIDType, self).__init__(**kwargs)
+
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset << 4)
+
+ def read_next_batch(self, file_pointer, limit):
+ results = []
+ read_size = limit << 4
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B',
file_pointer.read(read_size))
+ iterator = iter(array)
+
+ for _ in xrange(limit):
+ next_uuid = ''.join(map(lambda x: "%02x" % x, [next(iterator) for
_ in xrange(16)]))
+ next_uuid = ''.join([next_uuid[:8], '-', next_uuid[8:12], '-',
next_uuid[12:16], '-', next_uuid[16:20],
+ '-', next_uuid[20:]])
+ if next_uuid == NIL_UUID:
+ next_uuid = None
+ results.append(next_uuid)
+ return results
class BooleanType(StreamDataType):
@@ -58,8 +135,11 @@ class BooleanType(StreamDataType):
super(BooleanType, self).__init__(**kwargs)
self._nullable_constant = INT8_MIN
- def read_next_batch(self, file_pointer, count):
- array = struct.unpack(ALIGNMENT + str(count) + 'b',
file_pointer.read(count))
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset)
+
+ def read_next_batch(self, file_pointer, limit):
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'b',
file_pointer.read(limit))
return map(lambda x: None if x == self._nullable_constant else
bool(x), array)
@@ -74,8 +154,12 @@ class SmallIntegerType(StreamDataType):
self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN,
'int': INT32_MIN, 'integer': INT32_MIN,
'bigint': INT64_MIN}.get(self._data_type)
- def read_next_batch(self, file_pointer, count):
- array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym,
file_pointer.read(count * self._size))
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset * self._size)
+
+ def read_next_batch(self, file_pointer, limit):
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) +
self._pack_sym,
+ file_pointer.read(limit * self._size))
return map(lambda x: None if x == self._nullable_constant else int(x),
array)
@@ -86,8 +170,11 @@ class HugeIntegerType(StreamDataType):
super(HugeIntegerType, self).__init__(**kwargs)
self._nullable_constant = INT128_MIN
- def read_next_batch(self, file_pointer, count): # [entry & INT64_MAX,
(entry >> 64) & INT64_MAX]
- array = struct.unpack(ALIGNMENT + str(count << 1) + 'Q',
file_pointer.read(count << 3))
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset << 4)
+
+ def read_next_batch(self, file_pointer, limit): # [entry & INT64_MAX,
(entry >> 64) & INT64_MAX]
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'Q',
file_pointer.read(limit << 4))
results = []
iterator = iter(array) # has to iterate two values at once, so use
iterator
for value in iterator:
@@ -108,8 +195,12 @@ class FloatType(StreamDataType):
self._size = struct.calcsize(self._pack_sym)
self._nullable_constant = {'real': FLOAT_NAN, 'float': DOUBLE_NAN,
'double': DOUBLE_NAN}.get(self._data_type)
- def read_next_batch(self, file_pointer, count):
- array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym,
file_pointer.read(count * self._size))
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset * self._size)
+
+ def read_next_batch(self, file_pointer, limit):
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) +
self._pack_sym,
+ file_pointer.read(limit * self._size))
return map(lambda x: None if x == self._nullable_constant else
float(x), array)
@@ -126,20 +217,24 @@ class DecimalType(StreamDataType):
if self._pack_sym == 'Q':
self._size <<= 1 # has to read two values at once
- def read_next_batch(self, file_pointer, count):
- array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym,
file_pointer.read(count * self._size))
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset * self._size)
+
+ def read_next_batch(self, file_pointer, limit):
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) +
self._pack_sym,
+ file_pointer.read(limit * self._size))
if self._pack_sym != 'Q':
return map(lambda x: None if x == self._nullable_constant else
float(x), array)
-
- results = []
- iterator = iter(array) # has to iterate two values at once, so use
iterator
- for value in iterator:
- next_huge_decimal = value + (next(iterator) << 64)
- if next_huge_decimal == self._nullable_constant:
- results.append(None)
- else:
- results.append(next_huge_decimal)
- return results
+ else:
+ results = []
+ iterator = iter(array) # has to iterate two values at once, so
use iterator
+ for value in iterator:
+ next_huge_decimal = value + (next(iterator) << 64)
+ if next_huge_decimal == self._nullable_constant:
+ results.append(None)
+ else:
+ results.append(next_huge_decimal)
+ return results
class DateType(StreamDataType): # Stored as an uint with the number of days
since day 1 of month 1 (Jan) from year 0
@@ -149,8 +244,11 @@ class DateType(StreamDataType): # Store
super(DateType, self).__init__(**kwargs)
self._nullable_constant = INT32_MIN
- def read_next_batch(self, file_pointer, count):
- array = struct.unpack(ALIGNMENT + str(count) + 'I',
file_pointer.read(count << 2))
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset << 2)
+
+ def read_next_batch(self, file_pointer, limit):
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'I',
file_pointer.read(limit << 2))
results = []
for value in array:
if value == self._nullable_constant:
@@ -167,8 +265,11 @@ class TimeType(StreamDataType): # Store
super(TimeType, self).__init__(**kwargs)
self._nullable_constant = INT32_MIN
- def read_next_batch(self, file_pointer, count):
- array = struct.unpack(ALIGNMENT + str(count) + 'I',
file_pointer.read(count << 2))
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset << 2)
+
+ def read_next_batch(self, file_pointer, limit):
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'I',
file_pointer.read(limit << 2))
results = []
for value in array:
if value == self._nullable_constant:
@@ -188,8 +289,11 @@ class TimestampType(StreamDataType): #
def __init__(self, **kwargs):
super(TimestampType, self).__init__(**kwargs)
- def read_next_batch(self, file_pointer, count):
- array = struct.unpack(ALIGNMENT + str(count << 1) + 'I',
file_pointer.read(count << 3))
+ def skip_tuples(self, file_pointer, offset):
+ file_pointer.seek(offset << 3)
+
+ def read_next_batch(self, file_pointer, limit):
+ array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'I',
file_pointer.read(limit << 3))
results = []
iterator = iter(array) # has to iterate two values at once, so use
iterator
diff --git a/clients/iotapi/src/Streams/streams.py
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -1,6 +1,6 @@
+import os
import struct
-import os
from Settings.filesystem import get_baskets_base_location
from Utilities.readwritelock import RWLock
from WebSockets.websockets import notify_clients
@@ -120,11 +120,7 @@ class IOTStream(object):
for key, column in self._columns.iteritems():
next_file_name = os.path.join(next_path, key)
- open_string = 'r'
- if not column.is_file_mode_binary():
- open_string += 'u'
- file_pointer = open(next_file_name, open_string)
- results[key].append(column.read_next_batch(file_pointer,
offset, next_read_size))
+
results[key].append(column.read_next_tuples(next_file_name, offset,
next_read_size))
read_tuples += next_read_size
offset = 0
diff --git a/clients/iotclient/src/Streams/streamscreator.py
b/clients/iotclient/src/Streams/streamscreator.py
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list