http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/bc678328/predictionio/__init__.py ---------------------------------------------------------------------- diff --git a/predictionio/__init__.py b/predictionio/__init__.py index 45a379f..f4eb87a 100644 --- a/predictionio/__init__.py +++ b/predictionio/__init__.py @@ -4,21 +4,17 @@ The PredictoinIO Python SDK provides easy-to-use functions for integrating Python applications with PredictionIO REST API services. """ - -__version__ = "0.8.3" - -# import deprecated libraries. -from predictionio.obsolete import Client +__version__ = "0.8.5" # import packages import re + try: - import httplib + import httplib except ImportError: - # pylint: disable=F0401 - # http is a Python3 module, replacing httplib - from http import client as httplib -import json + # pylint: disable=F0401 + # http is a Python3 module, replacing httplib + from http import client as httplib import urllib from datetime import datetime @@ -26,433 +22,434 @@ import pytz from predictionio.connection import Connection from predictionio.connection import AsyncRequest -from predictionio.connection import AsyncResponse +from predictionio.connection import AsyncResponse # noqa from predictionio.connection import PredictionIOAPIError class NotCreatedError(PredictionIOAPIError): - pass + pass class NotFoundError(PredictionIOAPIError): - pass + pass def event_time_validation(t): - """ Validate event_time according to EventAPI Specification. - """ + """ Validate event_time according to EventAPI Specification. + """ - if t is None: - return datetime.now(pytz.utc) + if t is None: + return datetime.now(pytz.utc) - if type(t) != datetime: - raise AttributeError("event_time must be datetime.datetime") + if type(t) != datetime: + raise AttributeError("event_time must be datetime.datetime") - if t.tzinfo is None: - raise AttributeError("event_time must have tzinfo") + if t.tzinfo is None: + raise AttributeError("event_time must have tzinfo") - return t + return t class BaseClient(object): - def __init__(self, url, threads=1, qsize=0, timeout=5): - """Constructor of Client object. + def __init__(self, url, threads=1, qsize=0, timeout=5): + """Constructor of Client object. + + """ + self.threads = threads + self.url = url + self.qsize = qsize + self.timeout = timeout + + # check connection type + https_pattern = r'^https://(.*)' + http_pattern = r'^http://(.*)' + m = re.match(https_pattern, url) + self.https = True + if m is None: # not matching https + m = re.match(http_pattern, url) + self.https = False + if m is None: # not matching http either + raise InvalidArgumentError("url is not valid: %s" % url) # noqa + self.host = m.group(1) + + self._uid = None # identified uid + self._connection = Connection(host=self.host, threads=self.threads, + qsize=self.qsize, https=self.https, + timeout=self.timeout) + + def close(self): + """Close this client and the connection. + + Call this method when you want to completely terminate the connection + with PredictionIO. + It will wait for all pending requests to finish. + """ + self._connection.close() + + def pending_requests(self): + """Return the number of pending requests. + + :returns: + The number of pending requests of this client. + """ + return self._connection.pending_requests() + + def get_status(self): + """Get the status of the PredictionIO API Server + + :returns: + status message. + + :raises: + ServerStatusError. + """ + path = "/" + request = AsyncRequest("GET", path) + request.set_rfunc(self._aget_resp) + self._connection.make_request(request) + result = request.get_response() + return result + + def _acreate_resp(self, response): + if response.error is not None: + raise NotCreatedError("Exception happened: %s for request %s" % + (response.error, response.request)) + elif response.status != httplib.CREATED: + raise NotCreatedError("request: %s status: %s body: %s" % + (response.request, response.status, + response.body)) + + return response + + def _aget_resp(self, response): + if response.error is not None: + raise NotFoundError("Exception happened: %s for request %s" % + (response.error, response.request)) + elif response.status != httplib.OK: + raise NotFoundError("request: %s status: %s body: %s" % + (response.request, response.status, + response.body)) + + return response.json_body + + def _adelete_resp(self, response): + if response.error is not None: + raise NotFoundError("Exception happened: %s for request %s" % + (response.error, response.request)) + elif response.status != httplib.OK: + raise NotFoundError("request: %s status: %s body: %s" % + (response.request, response.status, + response.body)) + + return response.body - """ - self.threads = threads - self.url = url - self.qsize = qsize - self.timeout = timeout - - # check connection type - https_pattern = r'^https://(.*)' - http_pattern = r'^http://(.*)' - m = re.match(https_pattern, url) - self.https = True - if m is None: # not matching https - m = re.match(http_pattern, url) - self.https = False - if m is None: # not matching http either - raise InvalidArgumentError("url is not valid: %s" % url) - self.host = m.group(1) - - self._uid = None # identified uid - self._connection = Connection(host=self.host, threads=self.threads, - qsize=self.qsize, https=self.https, - timeout=self.timeout) - - def close(self): - """Close this client and the connection. - - Call this method when you want to completely terminate the connection - with PredictionIO. - It will wait for all pending requests to finish. - """ - self._connection.close() - def pending_requests(self): - """Return the number of pending requests. - - :returns: - The number of pending requests of this client. +class EventClient(BaseClient): + """Client for importing data into PredictionIO Event Server. + + Notice that app_id has been deprecated as of 0.8.2. Please use access_token + instead. + + :param access_key: the access key for your application. + :param url: the url of PredictionIO Event Server. + :param threads: number of threads to handle PredictionIO API requests. + Must be >= 1. + :param qsize: the max size of the request queue (optional). + The asynchronous request becomes blocking once this size has been + reached, until the queued requests are handled. + Default value is 0, which means infinite queue size. + :param timeout: timeout for HTTP connection attempts and requests in + seconds (optional). + Default value is 5. """ - return self._connection.pending_requests() - def get_status(self): - """Get the status of the PredictionIO API Server + def __init__(self, access_key, + url="http://localhost:7070", + threads=1, qsize=0, timeout=5): + assert type(access_key) is str, ("access_key must be string. " + "Notice that app_id has been deprecated in Prediction.IO 0.8.2. " + "Please use access_key instead.") + + super(EventClient, self).__init__(url, threads, qsize, timeout) + + if len(access_key) <= 8: + raise DeprecationWarning( + "It seems like you are specifying an app_id. It is deprecated in " + "Prediction.IO 0.8.2. Please use access_key instead. Or, " + "you may use an earlier version of this sdk.") + + self.access_key = access_key + + def acreate_event(self, event, entity_type, entity_id, + target_entity_type=None, target_entity_id=None, properties=None, + event_time=None): + """Asynchronously create an event. + + :param event: event name. type str. + :param entity_type: entity type. It is the namespace of the entityId and + analogous to the table name of a relational database. The entityId must be + unique within same entityType. type str. + :param entity_id: entity id. *entity_type-entity_id* becomes the unique + identifier of the entity. For example, you may have entityType named user, + and different entity IDs, say 1 and 2. In this case, user-1 and user-2 + uniquely identifies entities. type str + :param target_entity_type: target entity type. type str. + :param target_entity_id: target entity id. type str. + :param properties: a custom dict associated with an event. type dict. + :param event_time: the time of the event. type datetime, must contain + timezone info. + + :returns: + AsyncRequest object. You can call the get_response() method using this + object to get the final resuls or status of this asynchronous request. + """ + data = { + "event": event, + "entityType": entity_type, + "entityId": entity_id, + } - :returns: - status message. + if target_entity_type is not None: + data["targetEntityType"] = target_entity_type + + if target_entity_id is not None: + data["targetEntityId"] = target_entity_id + + if properties is not None: + data["properties"] = properties + + et = event_time_validation(event_time) + # EventServer uses milliseconds, but python datetime class uses micro. Hence + # need to skip the last three digits. + et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z") + data["eventTime"] = et_str + + path = "/events.json?accessKey=%s" % (self.access_key, ) + request = AsyncRequest("POST", path, **data) + request.set_rfunc(self._acreate_resp) + self._connection.make_request(request) + return request + + def create_event(self, event, entity_type, entity_id, + target_entity_type=None, target_entity_id=None, properties=None, + event_time=None): + """Synchronously (blocking) create an event.""" + return self.acreate_event(event, entity_type, entity_id, + target_entity_type, target_entity_id, properties, + event_time).get_response() + + def aget_event(self, event_id): + """Asynchronouly get an event from Event Server. + + :param event_id: event id returned by the EventServer when creating the + event. + + :returns: + AsyncRequest object. + """ + enc_event_id = urllib.quote(event_id, "") # replace special char with %xx + path = "/events/%s.json" % enc_event_id + request = AsyncRequest("GET", path, accessKey=self.access_key) + request.set_rfunc(self._aget_resp) + self._connection.make_request(request) + return request + + def get_event(self, event_id): + """Synchronouly get an event from Event Server.""" + return self.aget_event(event_id).get_response() + + def adelete_event(self, event_id): + """Asynchronouly delete an event from Event Server. + + :param event_id: event id returned by the EventServer when creating the + event. + + :returns: + AsyncRequest object. + """ + enc_event_id = urllib.quote(event_id, "") # replace special char with %xx + path = "/events/%s.json" % (enc_event_id, ) + request = AsyncRequest("DELETE", path, accessKey=self.access_key) + request.set_rfunc(self._adelete_resp) + self._connection.make_request(request) + return request + + def delete_event(self, event_id): + """Synchronouly delete an event from Event Server.""" + return self.adelete_event(event_id).get_response() + + # Below are helper functions + + def aset_user(self, uid, properties={}, event_time=None): + """Set properties of a user. + + Wrapper of acreate_event function, setting event to "$set" and entity_type + to "user". + """ + return self.acreate_event( + event="$set", + entity_type="user", + entity_id=uid, + properties=properties, + event_time=event_time, + ) - :raises: - ServerStatusError. - """ - path = "/" - request = AsyncRequest("GET", path) - request.set_rfunc(self._aget_resp) - self._connection.make_request(request) - result = request.get_response() - return result - - def _acreate_resp(self, response): - if response.error is not None: - raise NotCreatedError("Exception happened: %s for request %s" % - (response.error, response.request)) - elif response.status != httplib.CREATED: - raise NotCreatedError("request: %s status: %s body: %s" % - (response.request, response.status, - response.body)) - - return response - - def _aget_resp(self, response): - if response.error is not None: - raise NotFoundError("Exception happened: %s for request %s" % - (response.error, response.request)) - elif response.status != httplib.OK: - raise NotFoundError("request: %s status: %s body: %s" % - (response.request, response.status, - response.body)) - - return response.json_body - - def _adelete_resp(self, response): - if response.error is not None: - raise NotFoundError("Exception happened: %s for request %s" % - (response.error, response.request)) - elif response.status != httplib.OK: - raise NotFoundError("request: %s status: %s body: %s" % - (response.request, response.status, - response.body)) - - return response.body + def set_user(self, uid, properties={}, event_time=None): + """Set properties of a user""" + return self.aset_user(uid, properties, event_time).get_response() + + def aunset_user(self, uid, properties, event_time=None): + """Unset properties of an user. + + Wrapper of acreate_event function, setting event to "$unset" and entity_type + to "user". + """ + # check properties={}, it cannot be empty + return self.acreate_event( + event="$unset", + entity_type="user", + entity_id=uid, + properties=properties, + event_time=event_time, + ) + def unset_user(self, uid, properties, event_time=None): + """Set properties of a user""" + return self.aunset_user(uid, properties, event_time).get_response() + + def adelete_user(self, uid, event_time=None): + """Delete a user. + + Wrapper of acreate_event function, setting event to "$delete" and entity_type + to "user". + """ + return self.acreate_event( + event="$delete", + entity_type="user", + entity_id=uid, + event_time=event_time) + + def delete_user(self, uid, event_time=None): + """Delete a user.""" + return self.adelete_user(uid, event_time).get_response() + + def aset_item(self, iid, properties={}, event_time=None): + """Set properties of an item. + + Wrapper of acreate_event function, setting event to "$set" and entity_type + to "item". + """ + return self.acreate_event( + event="$set", + entity_type="item", + entity_id=iid, + properties=properties, + event_time=event_time) + + def set_item(self, iid, properties={}, event_time=None): + """Set properties of an item.""" + return self.aset_item(iid, properties, event_time).get_response() + + def aunset_item(self, iid, properties={}, event_time=None): + """Unset properties of an item. + + Wrapper of acreate_event function, setting event to "$unset" and entity_type + to "item". + """ + return self.acreate_event( + event="$unset", + entity_type="item", + entity_id=iid, + properties=properties, + event_time=event_time) + + def unset_item(self, iid, properties={}, event_time=None): + """Unset properties of an item.""" + return self.aunset_item(iid, properties, event_time).get_response() + + def adelete_item(self, iid, event_time=None): + """Delete an item. + + Wrapper of acreate_event function, setting event to "$delete" and entity_type + to "item". + """ + return self.acreate_event( + event="$delete", + entity_type="item", + entity_id=iid, + event_time=event_time) + + def delete_item(self, iid, event_time=None): + """Delete an item.""" + return self.adelete_item(iid, event_time).get_response() + + def arecord_user_action_on_item(self, action, uid, iid, properties={}, + event_time=None): + """Create a user-to-item action. + + Wrapper of acreate_event function, setting entity_type to "user" and + target_entity_type to "item". + """ + return self.acreate_event( + event=action, + entity_type="user", + entity_id=uid, + target_entity_type="item", + target_entity_id=iid, + properties=properties, + event_time=event_time) + + def record_user_action_on_item(self, action, uid, iid, properties={}, + event_time=None): + """Create a user-to-item action.""" + return self.arecord_user_action_on_item( + action, uid, iid, properties, event_time).get_response() -class EventClient(BaseClient): - """Client for importing data into PredictionIO Event Server. - - Notice that app_id has been deprecated as of 0.8.2. Please use access_token - instead. - - :param access_key: the access key for your application. - :param url: the url of PredictionIO Event Server. - :param threads: number of threads to handle PredictionIO API requests. - Must be >= 1. - :param qsize: the max size of the request queue (optional). - The asynchronous request becomes blocking once this size has been - reached, until the queued requests are handled. - Default value is 0, which means infinite queue size. - :param timeout: timeout for HTTP connection attempts and requests in - seconds (optional). - Default value is 5. - """ - - def __init__(self, access_key, - url="http://localhost:7070", - threads=1, qsize=0, timeout=5): - assert type(access_key) is str, ("access_key must be string. " - "Notice that app_id has been deprecated in Prediction.IO 0.8.2. " - "Please use access_key instead.") - - super(EventClient, self).__init__(url, threads, qsize, timeout) - - if len(access_key) <= 8: - raise DeprecationWarning( - "It seems like you are specifying an app_id. It is deprecated in " - "Prediction.IO 0.8.2. Please use access_key instead. Or, " - "you may use an earlier version of this sdk.") - - self.access_key = access_key - - def acreate_event(self, event, entity_type, entity_id, - target_entity_type=None, target_entity_id=None, properties=None, - event_time=None): - """Asynchronously create an event. - - :param event: event name. type str. - :param entity_type: entity type. It is the namespace of the entityId and - analogous to the table name of a relational database. The entityId must be - unique within same entityType. type str. - :param entity_id: entity id. *entity_type-entity_id* becomes the unique - identifier of the entity. For example, you may have entityType named user, - and different entity IDs, say 1 and 2. In this case, user-1 and user-2 - uniquely identifies entities. type str - :param target_entity_type: target entity type. type str. - :param target_entity_id: target entity id. type str. - :param properties: a custom dict associated with an event. type dict. - :param event_time: the time of the event. type datetime, must contain - timezone info. - - :returns: - AsyncRequest object. You can call the get_response() method using this - object to get the final resuls or status of this asynchronous request. - """ - data = { - "event": event, - "entityType": entity_type, - "entityId": entity_id, - } - if target_entity_type is not None: - data["targetEntityType"] = target_entity_type - - if target_entity_id is not None: - data["targetEntityId"] = target_entity_id - - if properties is not None: - data["properties"] = properties - - et = event_time_validation(event_time) - # EventServer uses milliseconds, but python datetime class uses micro. Hence - # need to skip the last three digits. - et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z") - data["eventTime"] = et_str - - path = "/events.json?accessKey=%s" % (self.access_key, ) - request = AsyncRequest("POST", path, **data) - request.set_rfunc(self._acreate_resp) - self._connection.make_request(request) - return request - - def create_event(self, event, entity_type, entity_id, - target_entity_type=None, target_entity_id=None, properties=None, - event_time=None): - """Synchronously (blocking) create an event.""" - return self.acreate_event(event, entity_type, entity_id, - target_entity_type, target_entity_id, properties, - event_time).get_response() - - def aget_event(self, event_id): - """Asynchronouly get an event from Event Server. - - :param event_id: event id returned by the EventServer when creating the - event. - - :returns: - AsyncRequest object. +class EngineClient(BaseClient): + """Client for extracting prediction results from an PredictionIO Engine + Instance. + + :param url: the url of the PredictionIO Engine Instance. + :param threads: number of threads to handle PredictionIO API requests. + Must be >= 1. + :param qsize: the max size of the request queue (optional). + The asynchronous request becomes blocking once this size has been + reached, until the queued requests are handled. + Default value is 0, which means infinite queue size. + :param timeout: timeout for HTTP connection attempts and requests in + seconds (optional). + Default value is 5. + """ - enc_event_id = urllib.quote(event_id, "") # replace special char with %xx - path = "/events/%s.json" % enc_event_id - request = AsyncRequest("GET", path, accessKey=self.access_key) - request.set_rfunc(self._aget_resp) - self._connection.make_request(request) - return request - def get_event(self, event_id): - """Synchronouly get an event from Event Server.""" - return self.aget_event(event_id).get_response() + def __init__(self, url="http://localhost:8000", threads=1, + qsize=0, timeout=5): + super(EngineClient, self).__init__(url, threads, qsize, timeout) - def adelete_event(self, event_id): - """Asynchronouly delete an event from Event Server. + def asend_query(self, data): + """Asynchronously send a request to the engine instance with data as the + query. - :param event_id: event id returned by the EventServer when creating the - event. + :param data: the query: It is coverted to an json object using json.dumps + method. type dict. - :returns: - AsyncRequest object. - """ - enc_event_id = urllib.quote(event_id, "") # replace special char with %xx - path = "/events/%s.json" % (enc_event_id, ) - request = AsyncRequest("DELETE", path, accessKey=self.access_key) - request.set_rfunc(self._adelete_resp) - self._connection.make_request(request) - return request - - def delete_event(self, event_id): - """Synchronouly delete an event from Event Server.""" - return self.adelete_event(event_id).get_response() - - ## Below are helper functions - - def aset_user(self, uid, properties={}, event_time=None): - """Set properties of a user. - - Wrapper of acreate_event function, setting event to "$set" and entity_type - to "user". - """ - return self.acreate_event( - event="$set", - entity_type="user", - entity_id=uid, - properties=properties, - event_time=event_time, - ) - - def set_user(self, uid, properties={}, event_time=None): - """Set properties of a user""" - return self.aset_user(uid, properties, event_time).get_response() - - def aunset_user(self, uid, properties, event_time=None): - """Unset properties of an user. - - Wrapper of acreate_event function, setting event to "$unset" and entity_type - to "user". - """ - # check properties={}, it cannot be empty - return self.acreate_event( - event="$unset", - entity_type="user", - entity_id=uid, - properties=properties, - event_time=event_time, - ) + :returns: + AsyncRequest object. You can call the get_response() method using this + object to get the final resuls or status of this asynchronous request. + """ + path = "/queries.json" + request = AsyncRequest("POST", path, **data) + request.set_rfunc(self._aget_resp) + self._connection.make_request(request) + return request - def unset_user(self, uid, properties, event_time=None): - """Set properties of a user""" - return self.aunset_user(uid, properties, event_time).get_response() - - def adelete_user(self, uid, event_time=None): - """Delete a user. - - Wrapper of acreate_event function, setting event to "$delete" and entity_type - to "user". - """ - return self.acreate_event( - event="$delete", - entity_type="user", - entity_id=uid, - event_time=event_time) - - def delete_user(self, uid, event_time=None): - """Delete a user.""" - return self.adelete_user(uid, event_time).get_response() - - def aset_item(self, iid, properties={}, event_time=None): - """Set properties of an item. - - Wrapper of acreate_event function, setting event to "$set" and entity_type - to "item". - """ - return self.acreate_event( - event="$set", - entity_type="item", - entity_id=iid, - properties=properties, - event_time=event_time) - - def set_item(self, iid, properties={}, event_time=None): - """Set properties of an item.""" - return self.aset_item(iid, properties, event_time).get_response() - - def aunset_item(self, iid, properties={}, event_time=None): - """Unset properties of an item. - - Wrapper of acreate_event function, setting event to "$unset" and entity_type - to "item". - """ - return self.acreate_event( - event="$unset", - entity_type="item", - entity_id=iid, - properties=properties, - event_time=event_time) - - def unset_item(self, iid, properties={}, event_time=None): - """Unset properties of an item.""" - return self.aunset_item(iid, properties, event_time).get_response() - - def adelete_item(self, iid, event_time=None): - """Delete an item. - - Wrapper of acreate_event function, setting event to "$delete" and entity_type - to "item". - """ - return self.acreate_event( - event="$delete", - entity_type="item", - entity_id=iid, - event_time=event_time) - - def delete_item(self, iid, event_time=None): - """Delete an item.""" - return self.adelete_item(iid, event_time).get_response() - - def arecord_user_action_on_item(self, action, uid, iid, properties={}, - event_time=None): - """Create a user-to-item action. - - Wrapper of acreate_event function, setting entity_type to "user" and - target_entity_type to "item". - """ - return self.acreate_event( - event=action, - entity_type="user", - entity_id=uid, - target_entity_type="item", - target_entity_id=iid, - properties=properties, - event_time=event_time) - - def record_user_action_on_item(self, action, uid, iid, properties={}, - event_time=None): - """Create a user-to-item action.""" - return self.arecord_user_action_on_item( - action, uid, iid, properties, event_time).get_response() + def send_query(self, data): + """Synchronously send a request. + :param data: the query: It is coverted to an json object using json.dumps + method. type dict. -class EngineClient(BaseClient): - """Client for extracting prediction results from an PredictionIO Engine - Instance. - - :param url: the url of the PredictionIO Engine Instance. - :param threads: number of threads to handle PredictionIO API requests. - Must be >= 1. - :param qsize: the max size of the request queue (optional). - The asynchronous request becomes blocking once this size has been - reached, until the queued requests are handled. - Default value is 0, which means infinite queue size. - :param timeout: timeout for HTTP connection attempts and requests in - seconds (optional). - Default value is 5. - - """ - def __init__(self, url="http://localhost:8000", threads=1, - qsize=0, timeout=5): - super(EngineClient, self).__init__(url, threads, qsize, timeout) - - def asend_query(self, data): - """Asynchronously send a request to the engine instance with data as the - query. - - :param data: the query: It is coverted to an json object using json.dumps - method. type dict. - - :returns: - AsyncRequest object. You can call the get_response() method using this - object to get the final resuls or status of this asynchronous request. - """ - path = "/queries.json" - request = AsyncRequest("POST", path, **data) - request.set_rfunc(self._aget_resp) - self._connection.make_request(request) - return request - - def send_query(self, data): - """Synchronously send a request. - - :param data: the query: It is coverted to an json object using json.dumps - method. type dict. - - :returns: the prediction. - """ - return self.asend_query(data).get_response() + :returns: the prediction. + """ + return self.asend_query(data).get_response()
http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/bc678328/predictionio/connection.py ---------------------------------------------------------------------- diff --git a/predictionio/connection.py b/predictionio/connection.py index 2c79d64..8c231f4 100644 --- a/predictionio/connection.py +++ b/predictionio/connection.py @@ -1,23 +1,22 @@ - try: - import Queue + import Queue except ImportError: - # pylint: disable=F0401 - # http is a Python3 module, replacing httplib. Ditto. - import queue as Queue + # pylint: disable=F0401 + # http is a Python3 module, replacing httplib. Ditto. + import queue as Queue import threading try: - import httplib + import httplib except ImportError: - # pylint: disable=F0401 - from http import client as httplib + # pylint: disable=F0401 + from http import client as httplib try: - from urllib import urlencode + from urllib import urlencode except ImportError: - # pylint: disable=F0401,E0611 - from urllib.parse import urlencode + # pylint: disable=F0401,E0611 + from urllib.parse import urlencode import datetime import json @@ -25,9 +24,9 @@ import logging # use generators for python2 and python3 try: - xrange + xrange except NameError: - xrange = range + xrange = range # some constants MAX_RETRY = 1 # 0 means no retry @@ -39,332 +38,329 @@ DEBUG_LOG = False def enable_log(filename=None): - global logger - global DEBUG_LOG - timestamp = datetime.datetime.today() - if not filename: - logfile = "./log/predictionio_%s.log" % timestamp.strftime( - "%Y-%m-%d_%H:%M:%S.%f") - else: - logfile = filename - logging.basicConfig(filename=logfile, - filemode='w', - level=logging.DEBUG, - format='[%(levelname)s] %(name)s (%(threadName)s) %(message)s') - logger = logging.getLogger(__name__) - DEBUG_LOG = True + global logger + global DEBUG_LOG + timestamp = datetime.datetime.today() + if not filename: + logfile = "./log/predictionio_%s.log" % timestamp.strftime( + "%Y-%m-%d_%H:%M:%S.%f") + else: + logfile = filename + logging.basicConfig(filename=logfile, + filemode='w', + level=logging.DEBUG, + format='[%(levelname)s] %(name)s (%(threadName)s) %(message)s') + logger = logging.getLogger(__name__) + DEBUG_LOG = True class PredictionIOAPIError(Exception): - pass + pass class NotSupportMethodError(PredictionIOAPIError): - pass + pass class ProgramError(PredictionIOAPIError): - pass + pass class AsyncRequest(object): + """AsyncRequest object - """AsyncRequest object + """ - """ + def __init__(self, method, path, **params): + self.method = method # "GET" "POST" etc + # the sub path eg. POST /v1/users.json GET /v1/users/1.json + self.path = path + # dictionary format eg. {"appkey" : 123, "id" : 3} + self.params = params + # use queue to implement response, store AsyncResponse object + self.response_q = Queue.Queue(1) + self.qpath = "%s?%s" % (self.path, urlencode(self.params)) + self._response = None + # response function to be called to handle the response + self.rfunc = None - def __init__(self, method, path, **params): - self.method = method # "GET" "POST" etc - # the sub path eg. POST /v1/users.json GET /v1/users/1.json - self.path = path - # dictionary format eg. {"appkey" : 123, "id" : 3} - self.params = params - # use queue to implement response, store AsyncResponse object - self.response_q = Queue.Queue(1) - self.qpath = "%s?%s" % (self.path, urlencode(self.params)) - self._response = None - # response function to be called to handle the response - self.rfunc = None + def __str__(self): + return "%s %s %s %s" % (self.method, self.path, self.params, + self.qpath) - def __str__(self): - return "%s %s %s %s" % (self.method, self.path, self.params, - self.qpath) + def set_rfunc(self, func): + self.rfunc = func - def set_rfunc(self, func): - self.rfunc = func + def set_response(self, response): + """ store the response - def set_response(self, response): - """ store the response + NOTE: Must be only called once + """ + self.response_q.put(response) - NOTE: Must be only called once - """ - self.response_q.put(response) + def get_response(self): + """Get the response. Blocking. - def get_response(self): - """Get the response. Blocking. + :returns: self.rfunc's return type. + """ + if self._response is None: + tmp_response = self.response_q.get(True) # NOTE: blocking + if self.rfunc is None: + self._response = tmp_response + else: + self._response = self.rfunc(tmp_response) + + return self._response + + +class AsyncResponse(object): + """Store the response of asynchronous request - :returns: self.rfunc's return type. + When get the response, user should check if error is None (which means no + Exception happens). + If error is None, then should check if the status is expected. """ - if self._response is None: - tmp_response = self.response_q.get(True) # NOTE: blocking - if self.rfunc is None: - self._response = tmp_response - else: - self._response = self.rfunc(tmp_response) - return self._response + def __init__(self): + #: exception object if any happens + self.error = None + + self.version = None + self.status = None + self.reason = None + #: Response header. str + self.headers = None + #: Response body. str + self.body = None + #: Jsonified response body. Remains None if conversion is unsuccessful. + self.json_body = None + #: Point back to the AsyncRequest object + self.request = None + + def __str__(self): + return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version, + self.status, self.reason, + self.headers, self.body) + + def set_resp(self, version, status, reason, headers, body): + self.version = version + self.status = status + self.reason = reason + self.headers = headers + self.body = body + # Try to extract the json. + try: + self.json_body = json.loads(body) + except ValueError as ex: # noqa + self.json_body = None + def set_error(self, error): + self.error = error -class AsyncResponse(object): - """Store the response of asynchronous request - - When get the response, user should check if error is None (which means no - Exception happens). - If error is None, then should check if the status is expected. - """ - - def __init__(self): - #: exception object if any happens - self.error = None - - self.version = None - self.status = None - self.reason = None - #: Response header. str - self.headers = None - #: Response body. str - self.body = None - #: Jsonified response body. Remains None if conversion is unsuccessful. - self.json_body = None - #: Point back to the AsyncRequest object - self.request = None - - def __str__(self): - return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version, - self.status, self.reason, - self.headers, self.body) - - def set_resp(self, version, status, reason, headers, body): - self.version = version - self.status = status - self.reason = reason - self.headers = headers - self.body = body - # Try to extract the json. - try: - self.json_body = json.loads(body) - except ValueError as ex: - self.json_body = None - - def set_error(self, error): - self.error = error - - def set_request(self, request): - self.request = request + def set_request(self, request): + self.request = request class PredictionIOHttpConnection(object): + def __init__(self, host, https=True, timeout=5): + if https: # https connection + self._connection = httplib.HTTPSConnection(host, timeout=timeout) + else: + self._connection = httplib.HTTPConnection(host, timeout=timeout) - def __init__(self, host, https=True, timeout=5): - if https: # https connection - self._connection = httplib.HTTPSConnection(host, timeout=timeout) - else: - self._connection = httplib.HTTPConnection(host, timeout=timeout) + def connect(self): + self._connection.connect() - def connect(self): - self._connection.connect() + def close(self): + self._connection.close() - def close(self): - self._connection.close() + def request(self, method, url, body={}, headers={}): + """ + http request wrapper function, with retry capability in case of error. + catch error exception and store it in AsyncResponse object + return AsyncResponse object - def request(self, method, url, body={}, headers={}): - """ - http request wrapper function, with retry capability in case of error. - catch error exception and store it in AsyncResponse object - return AsyncResponse object + Args: + method: http method, type str + url: url path, type str + body: http request body content, type dict + header: http request header , type dict + """ - Args: - method: http method, type str - url: url path, type str - body: http request body content, type dict - header: http request header , type dict - """ + response = AsyncResponse() - response = AsyncResponse() - - try: - # number of retry in case of error (minimum 0 means no retry) - retry_limit = MAX_RETRY - mod_headers = dict(headers) # copy the headers - mod_headers["Connection"] = "keep-alive" - enc_body = None - if body: # if body is not empty - #enc_body = urlencode(body) - #mod_headers[ - # "Content-type"] = "application/x-www-form-urlencoded" - enc_body = json.dumps(body) - mod_headers[ - "Content-type"] = "application/json" - #mod_headers["Accept"] = "text/plain" - except Exception as e: - response.set_error(e) - return response - - if DEBUG_LOG: - logger.debug("Request m:%s u:%s h:%s b:%s", method, url, - mod_headers, enc_body) - # retry loop - for i in xrange(retry_limit + 1): - try: - if i != 0: - if DEBUG_LOG: - logger.debug("retry request %s times" % i) - if self._connection.sock is None: - self._connection.connect() - self._connection.request(method, url, enc_body, mod_headers) - except Exception as e: - self._connection.close() - if i == retry_limit: - # new copy of e created everytime?? - response.set_error(e) - else: # NOTE: this is try's else clause - # connect() and request() OK try: - resp = self._connection.getresponse() + # number of retry in case of error (minimum 0 means no retry) + retry_limit = MAX_RETRY + mod_headers = dict(headers) # copy the headers + mod_headers["Connection"] = "keep-alive" + enc_body = None + if body: # if body is not empty + # enc_body = urlencode(body) + # mod_headers[ + # "Content-type"] = "application/x-www-form-urlencoded" + enc_body = json.dumps(body) + mod_headers[ + "Content-type"] = "application/json" + # mod_headers["Accept"] = "text/plain" except Exception as e: - self._connection.close() - if i == retry_limit: response.set_error(e) - else: # NOTE: this is try's else clause - # getresponse() OK - resp_version = resp.version # int - resp_status = resp.status # int - resp_reason = resp.reason # str - # resp.getheaders() returns list of tuples - # converted to dict format - resp_headers = dict(resp.getheaders()) - # NOTE: have to read the response before sending out next - # http request - resp_body = resp.read() # str - response.set_resp(version=resp_version, status=resp_status, - reason=resp_reason, headers=resp_headers, - body=resp_body) - break # exit retry loop - # end of retry loop - if DEBUG_LOG: - logger.debug("Response %s", response) - return response # AsyncResponse object + return response + + if DEBUG_LOG: + logger.debug("Request m:%s u:%s h:%s b:%s", method, url, + mod_headers, enc_body) + # retry loop + for i in xrange(retry_limit + 1): + try: + if i != 0: + if DEBUG_LOG: + logger.debug("retry request %s times" % i) + if self._connection.sock is None: + self._connection.connect() + self._connection.request(method, url, enc_body, mod_headers) + except Exception as e: + self._connection.close() + if i == retry_limit: + # new copy of e created everytime?? + response.set_error(e) + else: # NOTE: this is try's else clause + # connect() and request() OK + try: + resp = self._connection.getresponse() + except Exception as e: + self._connection.close() + if i == retry_limit: + response.set_error(e) + else: # NOTE: this is try's else clause + # getresponse() OK + resp_version = resp.version # int + resp_status = resp.status # int + resp_reason = resp.reason # str + # resp.getheaders() returns list of tuples + # converted to dict format + resp_headers = dict(resp.getheaders()) + # NOTE: have to read the response before sending out next + # http request + resp_body = resp.read() # str + response.set_resp(version=resp_version, status=resp_status, + reason=resp_reason, headers=resp_headers, + body=resp_body) + break # exit retry loop + # end of retry loop + if DEBUG_LOG: + logger.debug("Response %s", response) + return response # AsyncResponse object def connection_worker(host, request_queue, https=True, timeout=5, loop=True): - """worker function which establishes connection and wait for request jobs - from the request_queue - - Args: - request_queue: the request queue storing the AsyncRequest object - valid requests: - GET - POST - DELETE - KILL - https: HTTPS (True) or HTTP (False) - timeout: timeout for HTTP connection attempts and requests in seconds - loop: This worker function stays in a loop waiting for request - For testing purpose only. should always be set to True. - """ - - connect = PredictionIOHttpConnection(host, https, timeout) - - # loop waiting for job form request queue - killed = not loop - - while True: - # print "thread %s waiting for request" % thread.get_ident() - request = request_queue.get(True) # NOTE: blocking get - # print "get request %s" % request - method = request.method - if method == "GET": - path = request.qpath - d = connect.request("GET", path) - elif method == "POST": - path = request.path - body = request.params - d = connect.request("POST", path, body) - elif method == "DELETE": - path = request.qpath - d = connect.request("DELETE", path) - elif method == "KILL": - # tell the thread to kill the connection - killed = True - d = AsyncResponse() - else: - d = AsyncResponse() - d.set_error(NotSupportMethodError( - "Don't Support the method %s" % method)) + """worker function which establishes connection and wait for request jobs + from the request_queue - d.set_request(request) - request.set_response(d) - request_queue.task_done() - if killed: - break + Args: + request_queue: the request queue storing the AsyncRequest object + valid requests: + GET + POST + DELETE + KILL + https: HTTPS (True) or HTTP (False) + timeout: timeout for HTTP connection attempts and requests in seconds + loop: This worker function stays in a loop waiting for request + For testing purpose only. should always be set to True. + """ - # end of while loop - connect.close() + connect = PredictionIOHttpConnection(host, https, timeout) + + # loop waiting for job form request queue + killed = not loop + + while True: + # print "thread %s waiting for request" % thread.get_ident() + request = request_queue.get(True) # NOTE: blocking get + # print "get request %s" % request + method = request.method + if method == "GET": + path = request.qpath + d = connect.request("GET", path) + elif method == "POST": + path = request.path + body = request.params + d = connect.request("POST", path, body) + elif method == "DELETE": + path = request.qpath + d = connect.request("DELETE", path) + elif method == "KILL": + # tell the thread to kill the connection + killed = True + d = AsyncResponse() + else: + d = AsyncResponse() + d.set_error(NotSupportMethodError( + "Don't Support the method %s" % method)) + + d.set_request(request) + request.set_response(d) + request_queue.task_done() + if killed: + break + + # end of while loop + connect.close() class Connection(object): + """abstract object for connection with server - """abstract object for connection with server - - spawn multiple connection_worker threads to handle jobs in the queue q - """ - - def __init__(self, host, threads=1, qsize=0, https=True, timeout=5): - """constructor - - Args: - host: host of the server. - threads: type int, number of threads to be spawn - qsize: size of the queue q - https: indicate it is httpS (True) or http connection (False) - timeout: timeout for HTTP connection attempts and requests in - seconds - """ - self.host = host - self.https = https - self.q = Queue.Queue(qsize) # if qsize=0, means infinite - self.threads = threads - self.timeout = timeout - # start thread based on threads number - self.tid = {} # dictionary of thread object - - for i in xrange(threads): - tname = "PredictionIOThread-%s" % i # thread name - self.tid[i] = threading.Thread( - target=connection_worker, name=tname, - kwargs={'host': self.host, 'request_queue': self.q, - 'https': self.https, 'timeout': self.timeout}) - self.tid[i].setDaemon(True) - self.tid[i].start() - - def make_request(self, request): - """put the request into the q - """ - self.q.put(request) - - def pending_requests(self): - """number of pending requests in the queue + spawn multiple connection_worker threads to handle jobs in the queue q """ - return self.q.qsize() - - def close(self): - """close this Connection. Call this when main program exits - """ - # set kill message to q - for i in xrange(self.threads): - self.make_request(AsyncRequest("KILL", "")) - - self.q.join() # wait for q empty - for i in xrange(self.threads): # wait for all thread finish - self.tid[i].join() + def __init__(self, host, threads=1, qsize=0, https=True, timeout=5): + """constructor + + Args: + host: host of the server. + threads: type int, number of threads to be spawn + qsize: size of the queue q + https: indicate it is httpS (True) or http connection (False) + timeout: timeout for HTTP connection attempts and requests in + seconds + """ + self.host = host + self.https = https + self.q = Queue.Queue(qsize) # if qsize=0, means infinite + self.threads = threads + self.timeout = timeout + # start thread based on threads number + self.tid = {} # dictionary of thread object + + for i in xrange(threads): + tname = "PredictionIOThread-%s" % i # thread name + self.tid[i] = threading.Thread( + target=connection_worker, name=tname, + kwargs={'host': self.host, 'request_queue': self.q, + 'https': self.https, 'timeout': self.timeout}) + self.tid[i].setDaemon(True) + self.tid[i].start() + + def make_request(self, request): + """put the request into the q + """ + self.q.put(request) + + def pending_requests(self): + """number of pending requests in the queue + """ + return self.q.qsize() + + def close(self): + """close this Connection. Call this when main program exits + """ + # set kill message to q + for i in xrange(self.threads): + self.make_request(AsyncRequest("KILL", "")) + + self.q.join() # wait for q empty + + for i in xrange(self.threads): # wait for all thread finish + self.tid[i].join()
