Merge PR #11, prep 0.9.6 . Clobbered the PR's predictionio/__init__.py conflicting changes, fixed the rest
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/commit/7b6e210d Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/tree/7b6e210d Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/diff/7b6e210d Branch: refs/heads/master Commit: 7b6e210d7ad8147358685f513fab98f58a79039b Parents: b85d044 bc67832 Author: EmergentOrder <[email protected]> Authored: Tue Nov 3 13:04:18 2015 -0400 Committer: EmergentOrder <[email protected]> Committed: Tue Nov 3 13:04:18 2015 -0400 ---------------------------------------------------------------------- .gitignore | 4 + docs/source/conf.py | 110 +- examples/demo-movielens/appdata.py | 316 ++--- examples/demo-movielens/batch_import.py | 246 ++-- examples/event_sample.py | 48 +- examples/import_yahoo.py | 261 ++-- examples/itemrank_quick_query.py | 18 +- examples/itemrank_quick_start.py | 69 +- examples/obsolete/__init__.py | 0 examples/obsolete/itemrec/__init__.py | 0 examples/obsolete/itemrec/movies/.gitignore | 3 - examples/obsolete/itemrec/movies/README.md | 15 - examples/obsolete/itemrec/movies/__init__.py | 0 examples/obsolete/itemrec/movies/app_config.py | 2 - examples/obsolete/itemrec/movies/appdata.py | 148 --- .../obsolete/itemrec/movies/batch_import.py | 65 - .../obsolete/itemrec/movies/movie_rec_app.py | 152 --- predictionio/__init__.py | 40 +- predictionio/connection.py | 537 ++++---- predictionio/obsolete.py | 1205 ------------------ setup.py | 32 +- tests-obsolete/conversion_test.py | 16 + tests-obsolete/import_testdata.py | 46 + tests-obsolete/import_testdata_id_mismatch.py | 46 + tests-obsolete/import_testdata_special_char.py | 46 + tests-obsolete/predictionio_itemrec_test.py | 336 +++++ .../predictionio_itemrec_test_special_char.py | 366 ++++++ tests-obsolete/predictionio_itemsim_test.py | 200 +++ tests-obsolete/predictionio_test.py | 257 ++++ tests/.keep | 0 tests/conversion_test.py | 23 - tests/import_testdata.py | 52 - tests/import_testdata_id_mismatch.py | 52 - tests/import_testdata_special_char.py | 52 - tests/predictionio_itemrec_test.py | 300 ----- tests/predictionio_itemrec_test_special_char.py | 291 ----- tests/predictionio_itemsim_test.py | 160 --- tests/predictionio_test.py | 246 ---- tox.ini | 9 + 39 files changed, 2171 insertions(+), 3598 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/7b6e210d/docs/source/conf.py ---------------------------------------------------------------------- diff --cc docs/source/conf.py index 0d87655,539cf04..d52d83e --- a/docs/source/conf.py +++ b/docs/source/conf.py @@@ -52,9 -52,9 +52,9 @@@ copyright = u'2014, TappingStone, Inc. # built documents. # # The short X.Y version. -version = '0.8' +version = '0.9' # The full version, including alpha/beta/rc tags. - release = '0.9.2' -release = '0.8.3' ++release = '0.9.6' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/7b6e210d/predictionio/__init__.py ---------------------------------------------------------------------- diff --cc predictionio/__init__.py index 290c31e,f4eb87a..e5e5387 --- a/predictionio/__init__.py +++ b/predictionio/__init__.py @@@ -1,14 -1,10 +1,13 @@@ --"""PredictoinIO Python SDK -- --The PredictoinIO Python SDK provides easy-to-use functions for integrating ++"""PredictionIO Python SDK ++The PredictionIO Python SDK provides easy-to-use functions for integrating Python applications with PredictionIO REST API services. """ -__version__ = "0.8.5" + - __version__ = "0.9.2" ++__version__ = "0.9.6" + +# import deprecated libraries. +from predictionio.obsolete import Client, InvalidArgumentError # import packages import re @@@ -62,530 -51,405 +61,497 @@@ def event_time_validation(t) class BaseClient(object): - def __init__(self, url, threads=1, qsize=0, timeout=5): - """Constructor of Client object. - - """ - self.threads = threads - self.url = url - self.qsize = qsize - self.timeout = timeout - - # check connection type - https_pattern = r'^https://(.*)' - http_pattern = r'^http://(.*)' - m = re.match(https_pattern, url) - self.https = True - if m is None: # not matching https - m = re.match(http_pattern, url) - self.https = False - if m is None: # not matching http either - raise InvalidArgumentError("url is not valid: %s" % url) # noqa - self.host = m.group(1) - - self._uid = None # identified uid - self._connection = Connection(host=self.host, threads=self.threads, - qsize=self.qsize, https=self.https, - timeout=self.timeout) - - def close(self): - """Close this client and the connection. - - Call this method when you want to completely terminate the connection - with PredictionIO. - It will wait for all pending requests to finish. - """ - self._connection.close() - - def pending_requests(self): - """Return the number of pending requests. - - :returns: - The number of pending requests of this client. - """ - return self._connection.pending_requests() - - def get_status(self): - """Get the status of the PredictionIO API Server - - :returns: - status message. - - :raises: - ServerStatusError. - """ - path = "/" - request = AsyncRequest("GET", path) - request.set_rfunc(self._aget_resp) - self._connection.make_request(request) - result = request.get_response() - return result - - def _acreate_resp(self, response): - if response.error is not None: - raise NotCreatedError("Exception happened: %s for request %s" % - (response.error, response.request)) - elif response.status != httplib.CREATED: - raise NotCreatedError("request: %s status: %s body: %s" % - (response.request, response.status, - response.body)) - - return response - - def _aget_resp(self, response): - if response.error is not None: - raise NotFoundError("Exception happened: %s for request %s" % - (response.error, response.request)) - elif response.status != httplib.OK: - raise NotFoundError("request: %s status: %s body: %s" % - (response.request, response.status, - response.body)) - - return response.json_body - - def _adelete_resp(self, response): - if response.error is not None: - raise NotFoundError("Exception happened: %s for request %s" % - (response.error, response.request)) - elif response.status != httplib.OK: - raise NotFoundError("request: %s status: %s body: %s" % - (response.request, response.status, - response.body)) - - return response.body + def __init__(self, url, threads=1, qsize=0, timeout=5): + """Constructor of Client object. - + """ + self.threads = threads + self.url = url + self.qsize = qsize + self.timeout = timeout + + # check connection type + https_pattern = r'^https://(.*)' + http_pattern = r'^http://(.*)' + m = re.match(https_pattern, url) + self.https = True + if m is None: # not matching https + m = re.match(http_pattern, url) + self.https = False + if m is None: # not matching http either + raise InvalidArgumentError("url is not valid: %s" % url) + self.host = m.group(1) + + self._uid = None # identified uid + self._connection = Connection(host=self.host, threads=self.threads, + qsize=self.qsize, https=self.https, + timeout=self.timeout) + + def close(self): + """Close this client and the connection. - + Call this method when you want to completely terminate the connection + with PredictionIO. + It will wait for all pending requests to finish. + """ + self._connection.close() + + def pending_requests(self): + """Return the number of pending requests. - + :returns: + The number of pending requests of this client. + """ + return self._connection.pending_requests() + + def get_status(self): + """Get the status of the PredictionIO API Server - + :returns: + status message. - + :raises: + ServerStatusError. + """ + path = "/" + request = AsyncRequest("GET", path) + request.set_rfunc(self._aget_resp) + self._connection.make_request(request) + result = request.get_response() + return result + + def _acreate_resp(self, response): + if response.error is not None: + raise NotCreatedError("Exception happened: %s for request %s" % + (response.error, response.request)) + elif response.status != httplib.CREATED: + raise NotCreatedError("request: %s status: %s body: %s" % + (response.request, response.status, + response.body)) + + return response + + def _aget_resp(self, response): + if response.error is not None: + raise NotFoundError("Exception happened: %s for request %s" % + (response.error, response.request)) + elif response.status != httplib.OK: + raise NotFoundError("request: %s status: %s body: %s" % + (response.request, response.status, + response.body)) + + return response.json_body + + def _adelete_resp(self, response): + if response.error is not None: + raise NotFoundError("Exception happened: %s for request %s" % + (response.error, response.request)) + elif response.status != httplib.OK: + raise NotFoundError("request: %s status: %s body: %s" % + (response.request, response.status, + response.body)) + + return response.body class EventClient(BaseClient): - """Client for importing data into PredictionIO Event Server. - - Notice that app_id has been deprecated as of 0.8.2. Please use access_token - instead. - - :param access_key: the access key for your application. - :param url: the url of PredictionIO Event Server. - :param threads: number of threads to handle PredictionIO API requests. - Must be >= 1. - :param qsize: the max size of the request queue (optional). - The asynchronous request becomes blocking once this size has been - reached, until the queued requests are handled. - Default value is 0, which means infinite queue size. - :param timeout: timeout for HTTP connection attempts and requests in - seconds (optional). - Default value is 5. + """Client for importing data into PredictionIO Event Server. - + Notice that app_id has been deprecated as of 0.8.2. Please use access_token + instead. - + :param access_key: the access key for your application. + :param url: the url of PredictionIO Event Server. + :param threads: number of threads to handle PredictionIO API requests. + Must be >= 1. + :param qsize: the max size of the request queue (optional). + The asynchronous request becomes blocking once this size has been + reached, until the queued requests are handled. + Default value is 0, which means infinite queue size. + :param timeout: timeout for HTTP connection attempts and requests in + seconds (optional). + Default value is 5. + :param channel: channel name (optional) + """ + + def __init__(self, access_key, + url="http://localhost:7070", + threads=1, qsize=0, timeout=5, channel=None): + assert type(access_key) is str, ("access_key must be string. " + "Notice that app_id has been deprecated in Prediction.IO 0.8.2. " + "Please use access_key instead.") + + super(EventClient, self).__init__(url, threads, qsize, timeout) + + if len(access_key) <= 8: + raise DeprecationWarning( + "It seems like you are specifying an app_id. It is deprecated in " + "Prediction.IO 0.8.2. Please use access_key instead. Or, " + "you may use an earlier version of this sdk.") + + self.access_key = access_key + self.channel = channel + + def acreate_event(self, event, entity_type, entity_id, + target_entity_type=None, target_entity_id=None, properties=None, + event_time=None): + """Asynchronously create an event. - + :param event: event name. type str. + :param entity_type: entity type. It is the namespace of the entityId and + analogous to the table name of a relational database. The entityId must be + unique within same entityType. type str. + :param entity_id: entity id. *entity_type-entity_id* becomes the unique + identifier of the entity. For example, you may have entityType named user, + and different entity IDs, say 1 and 2. In this case, user-1 and user-2 + uniquely identifies entities. type str + :param target_entity_type: target entity type. type str. + :param target_entity_id: target entity id. type str. + :param properties: a custom dict associated with an event. type dict. + :param event_time: the time of the event. type datetime, must contain + timezone info. - + :returns: + AsyncRequest object. You can call the get_response() method using this + object to get the final resuls or status of this asynchronous request. """ + data = { + "event": event, + "entityType": entity_type, + "entityId": entity_id, + } + + if target_entity_type is not None: + data["targetEntityType"] = target_entity_type + + if target_entity_id is not None: + data["targetEntityId"] = target_entity_id + + if properties is not None: + data["properties"] = properties - def __init__(self, access_key, - url="http://localhost:7070", - threads=1, qsize=0, timeout=5): - assert type(access_key) is str, ("access_key must be string. " - "Notice that app_id has been deprecated in Prediction.IO 0.8.2. " - "Please use access_key instead.") - - super(EventClient, self).__init__(url, threads, qsize, timeout) - - if len(access_key) <= 8: - raise DeprecationWarning( - "It seems like you are specifying an app_id. It is deprecated in " - "Prediction.IO 0.8.2. Please use access_key instead. Or, " - "you may use an earlier version of this sdk.") - - self.access_key = access_key - - def acreate_event(self, event, entity_type, entity_id, - target_entity_type=None, target_entity_id=None, properties=None, - event_time=None): - """Asynchronously create an event. - - :param event: event name. type str. - :param entity_type: entity type. It is the namespace of the entityId and - analogous to the table name of a relational database. The entityId must be - unique within same entityType. type str. - :param entity_id: entity id. *entity_type-entity_id* becomes the unique - identifier of the entity. For example, you may have entityType named user, - and different entity IDs, say 1 and 2. In this case, user-1 and user-2 - uniquely identifies entities. type str - :param target_entity_type: target entity type. type str. - :param target_entity_id: target entity id. type str. - :param properties: a custom dict associated with an event. type dict. - :param event_time: the time of the event. type datetime, must contain - timezone info. - - :returns: - AsyncRequest object. You can call the get_response() method using this - object to get the final resuls or status of this asynchronous request. - """ - data = { - "event": event, - "entityType": entity_type, - "entityId": entity_id, + et = event_time_validation(event_time) + # EventServer uses milliseconds, but python datetime class uses micro. Hence + # need to skip the last three digits. + et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z") + data["eventTime"] = et_str + + qparam = { + "accessKey" : self.access_key } - if target_entity_type is not None: - data["targetEntityType"] = target_entity_type - - if target_entity_id is not None: - data["targetEntityId"] = target_entity_id - - if properties is not None: - data["properties"] = properties - - et = event_time_validation(event_time) - # EventServer uses milliseconds, but python datetime class uses micro. Hence - # need to skip the last three digits. - et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z") - data["eventTime"] = et_str - - path = "/events.json?accessKey=%s" % (self.access_key, ) - request = AsyncRequest("POST", path, **data) - request.set_rfunc(self._acreate_resp) - self._connection.make_request(request) - return request - - def create_event(self, event, entity_type, entity_id, - target_entity_type=None, target_entity_id=None, properties=None, - event_time=None): - """Synchronously (blocking) create an event.""" - return self.acreate_event(event, entity_type, entity_id, - target_entity_type, target_entity_id, properties, - event_time).get_response() - - def aget_event(self, event_id): - """Asynchronouly get an event from Event Server. - - :param event_id: event id returned by the EventServer when creating the - event. - - :returns: - AsyncRequest object. - """ - enc_event_id = urllib.quote(event_id, "") # replace special char with %xx - path = "/events/%s.json" % enc_event_id - request = AsyncRequest("GET", path, accessKey=self.access_key) - request.set_rfunc(self._aget_resp) - self._connection.make_request(request) - return request - - def get_event(self, event_id): - """Synchronouly get an event from Event Server.""" - return self.aget_event(event_id).get_response() - - def adelete_event(self, event_id): - """Asynchronouly delete an event from Event Server. - - :param event_id: event id returned by the EventServer when creating the - event. - - :returns: - AsyncRequest object. - """ - enc_event_id = urllib.quote(event_id, "") # replace special char with %xx - path = "/events/%s.json" % (enc_event_id, ) - request = AsyncRequest("DELETE", path, accessKey=self.access_key) - request.set_rfunc(self._adelete_resp) - self._connection.make_request(request) - return request - - def delete_event(self, event_id): - """Synchronouly delete an event from Event Server.""" - return self.adelete_event(event_id).get_response() - - # Below are helper functions - - def aset_user(self, uid, properties={}, event_time=None): - """Set properties of a user. - - Wrapper of acreate_event function, setting event to "$set" and entity_type - to "user". - """ - return self.acreate_event( - event="$set", - entity_type="user", - entity_id=uid, - properties=properties, - event_time=event_time, - ) + if self.channel is not None: + qparam["channel"] = self.channel + + path = "/events.json?%s" % (urlencode(qparam), ) + + request = AsyncRequest("POST", path, **data) + request.set_rfunc(self._acreate_resp) + self._connection.make_request(request) + return request + + def create_event(self, event, entity_type, entity_id, + target_entity_type=None, target_entity_id=None, properties=None, + event_time=None): + """Synchronously (blocking) create an event.""" + return self.acreate_event(event, entity_type, entity_id, + target_entity_type, target_entity_id, properties, + event_time).get_response() + + def aget_event(self, event_id): + """Asynchronouly get an event from Event Server. - + :param event_id: event id returned by the EventServer when creating the + event. - + :returns: + AsyncRequest object. + """ + qparam = { + "accessKey" : self.access_key + } + + if self.channel is not None: + qparam["channel"] = self.channel + + enc_event_id = urllib.quote(event_id, "") # replace special char with %xx + path = "/events/%s.json" % (enc_event_id, ) + request = AsyncRequest("GET", path, **qparam) + request.set_rfunc(self._aget_resp) + self._connection.make_request(request) + return request + + def get_event(self, event_id): + """Synchronouly get an event from Event Server.""" + return self.aget_event(event_id).get_response() + + def aget_events(self, startTime=None, untilTime=None, entityType=None, entityId=None, limit=None, reversed=False): + """Asynchronouly get events from Event Server. (Getting events through the Event Server API is used for debugging and not recommended for production) - + :param startTime: time in ISO8601 format. Return events with eventTime >= startTime. + :param untilTime: time in ISO8601 format. Return events with eventTime < untilTime. + :param entityId: String. The entityId. Return events for this entityId only. + :param limit: Integer. The number of record events returned. Default is 20. -1 to get all. + :param reversed: Boolean. Must be used with both entityType and entityId specified, + returns events in reversed chronological order. Default is false. - + :returns: + AsyncRequest object. + """ + qparam = { + "accessKey" : self.access_key, + "reversed": reversed + } + + if startTime is not None: + qparam["startTime"] = startTime + + if untilTime is not None: + qparam["untilTime"] = untilTime + + if entityType is not None: + qparam["entityType"] = entityType + + if entityId is not None: + qparam["entityId"] = entityId + + if limit is not None: + qparam["limit"] = limit + + if self.channel is not None: + qparam["channel"] = self.channel + path = "/events.json" + request = AsyncRequest("GET", path, **qparam) + request.set_rfunc(self._aget_resp) + self._connection.make_request(request) + return request + + def get_events(self, startTime=None, untilTime=None, entityType=None, entityId=None, limit=None, reversed=False): + """Synchronouly get event from Event Server. (Getting events through the Event Server API is used for debugging and not recommended for production)""" + return self.aget_events( + startTime=startTime, + untilTime=untilTime, + entityType=entityType, + entityId=entityId, + limit=limit, + reversed=reversed + ).get_response() + + def adelete_event(self, event_id): + """Asynchronouly delete an event from Event Server. - + :param event_id: event id returned by the EventServer when creating the + event. - + :returns: + AsyncRequest object. + """ + qparam = { + "accessKey" : self.access_key + } + + if self.channel is not None: + qparam["channel"] = self.channel + + enc_event_id = urllib.quote(event_id, "") # replace special char with %xx + path = "/events/%s.json" % (enc_event_id, ) + request = AsyncRequest("DELETE", path, **qparam) + request.set_rfunc(self._adelete_resp) + self._connection.make_request(request) + return request + + def delete_event(self, event_id): + """Synchronouly delete an event from Event Server.""" + return self.adelete_event(event_id).get_response() - def set_user(self, uid, properties={}, event_time=None): - """Set properties of a user""" - return self.aset_user(uid, properties, event_time).get_response() - - def aunset_user(self, uid, properties, event_time=None): - """Unset properties of an user. - - Wrapper of acreate_event function, setting event to "$unset" and entity_type - to "user". - """ - # check properties={}, it cannot be empty - return self.acreate_event( - event="$unset", - entity_type="user", - entity_id=uid, - properties=properties, - event_time=event_time, + ## Below are helper functions + + def aset_user(self, uid, properties={}, event_time=None): + """Set properties of a user. - + Wrapper of acreate_event function, setting event to "$set" and entity_type + to "user". + """ + return self.acreate_event( + event="$set", + entity_type="user", + entity_id=uid, + properties=properties, + event_time=event_time, + ) + + def set_user(self, uid, properties={}, event_time=None): + """Set properties of a user""" + return self.aset_user(uid, properties, event_time).get_response() + + def aunset_user(self, uid, properties, event_time=None): + """Unset properties of an user. - + Wrapper of acreate_event function, setting event to "$unset" and entity_type + to "user". + """ + # check properties={}, it cannot be empty + return self.acreate_event( + event="$unset", + entity_type="user", + entity_id=uid, + properties=properties, + event_time=event_time, ) - def unset_user(self, uid, properties, event_time=None): - """Set properties of a user""" - return self.aunset_user(uid, properties, event_time).get_response() - - def adelete_user(self, uid, event_time=None): - """Delete a user. - - Wrapper of acreate_event function, setting event to "$delete" and entity_type - to "user". - """ - return self.acreate_event( - event="$delete", - entity_type="user", - entity_id=uid, - event_time=event_time) - - def delete_user(self, uid, event_time=None): - """Delete a user.""" - return self.adelete_user(uid, event_time).get_response() - - def aset_item(self, iid, properties={}, event_time=None): - """Set properties of an item. - - Wrapper of acreate_event function, setting event to "$set" and entity_type - to "item". - """ - return self.acreate_event( - event="$set", - entity_type="item", - entity_id=iid, - properties=properties, - event_time=event_time) - - def set_item(self, iid, properties={}, event_time=None): - """Set properties of an item.""" - return self.aset_item(iid, properties, event_time).get_response() - - def aunset_item(self, iid, properties={}, event_time=None): - """Unset properties of an item. - - Wrapper of acreate_event function, setting event to "$unset" and entity_type - to "item". - """ - return self.acreate_event( - event="$unset", - entity_type="item", - entity_id=iid, - properties=properties, - event_time=event_time) - - def unset_item(self, iid, properties={}, event_time=None): - """Unset properties of an item.""" - return self.aunset_item(iid, properties, event_time).get_response() - - def adelete_item(self, iid, event_time=None): - """Delete an item. - - Wrapper of acreate_event function, setting event to "$delete" and entity_type - to "item". - """ - return self.acreate_event( - event="$delete", - entity_type="item", - entity_id=iid, - event_time=event_time) - - def delete_item(self, iid, event_time=None): - """Delete an item.""" - return self.adelete_item(iid, event_time).get_response() - - def arecord_user_action_on_item(self, action, uid, iid, properties={}, - event_time=None): - """Create a user-to-item action. - - Wrapper of acreate_event function, setting entity_type to "user" and - target_entity_type to "item". - """ - return self.acreate_event( - event=action, - entity_type="user", - entity_id=uid, - target_entity_type="item", - target_entity_id=iid, - properties=properties, - event_time=event_time) - - def record_user_action_on_item(self, action, uid, iid, properties={}, - event_time=None): - """Create a user-to-item action.""" - return self.arecord_user_action_on_item( - action, uid, iid, properties, event_time).get_response() + def unset_user(self, uid, properties, event_time=None): + """Set properties of a user""" + return self.aunset_user(uid, properties, event_time).get_response() + def adelete_user(self, uid, event_time=None): + """Delete a user. - + Wrapper of acreate_event function, setting event to "$delete" and entity_type + to "user". + """ + return self.acreate_event( + event="$delete", + entity_type="user", + entity_id=uid, + event_time=event_time) + + def delete_user(self, uid, event_time=None): + """Delete a user.""" + return self.adelete_user(uid, event_time).get_response() + + def aset_item(self, iid, properties={}, event_time=None): + """Set properties of an item. - + Wrapper of acreate_event function, setting event to "$set" and entity_type + to "item". + """ + return self.acreate_event( + event="$set", + entity_type="item", + entity_id=iid, + properties=properties, + event_time=event_time) + + def set_item(self, iid, properties={}, event_time=None): + """Set properties of an item.""" + return self.aset_item(iid, properties, event_time).get_response() + + def aunset_item(self, iid, properties={}, event_time=None): + """Unset properties of an item. - + Wrapper of acreate_event function, setting event to "$unset" and entity_type + to "item". + """ + return self.acreate_event( + event="$unset", + entity_type="item", + entity_id=iid, + properties=properties, + event_time=event_time) + + def unset_item(self, iid, properties={}, event_time=None): + """Unset properties of an item.""" + return self.aunset_item(iid, properties, event_time).get_response() + + def adelete_item(self, iid, event_time=None): + """Delete an item. - + Wrapper of acreate_event function, setting event to "$delete" and entity_type + to "item". + """ + return self.acreate_event( + event="$delete", + entity_type="item", + entity_id=iid, + event_time=event_time) + + def delete_item(self, iid, event_time=None): + """Delete an item.""" + return self.adelete_item(iid, event_time).get_response() + + def arecord_user_action_on_item(self, action, uid, iid, properties={}, + event_time=None): + """Create a user-to-item action. - + Wrapper of acreate_event function, setting entity_type to "user" and + target_entity_type to "item". + """ + return self.acreate_event( + event=action, + entity_type="user", + entity_id=uid, + target_entity_type="item", + target_entity_id=iid, + properties=properties, + event_time=event_time) + + def record_user_action_on_item(self, action, uid, iid, properties={}, + event_time=None): + """Create a user-to-item action.""" + return self.arecord_user_action_on_item( + action, uid, iid, properties, event_time).get_response() -class EngineClient(BaseClient): - """Client for extracting prediction results from an PredictionIO Engine - Instance. - - :param url: the url of the PredictionIO Engine Instance. - :param threads: number of threads to handle PredictionIO API requests. - Must be >= 1. - :param qsize: the max size of the request queue (optional). - The asynchronous request becomes blocking once this size has been - reached, until the queued requests are handled. - Default value is 0, which means infinite queue size. - :param timeout: timeout for HTTP connection attempts and requests in - seconds (optional). - Default value is 5. +class EngineClient(BaseClient): + """Client for extracting prediction results from an PredictionIO Engine + Instance. - + :param url: the url of the PredictionIO Engine Instance. + :param threads: number of threads to handle PredictionIO API requests. + Must be >= 1. + :param qsize: the max size of the request queue (optional). + The asynchronous request becomes blocking once this size has been + reached, until the queued requests are handled. + Default value is 0, which means infinite queue size. + :param timeout: timeout for HTTP connection attempts and requests in + seconds (optional). + Default value is 5. - + """ + def __init__(self, url="http://localhost:8000", threads=1, + qsize=0, timeout=5): + super(EngineClient, self).__init__(url, threads, qsize, timeout) + + def asend_query(self, data): + """Asynchronously send a request to the engine instance with data as the + query. - + :param data: the query: It is coverted to an json object using json.dumps + method. type dict. - + :returns: + AsyncRequest object. You can call the get_response() method using this + object to get the final resuls or status of this asynchronous request. """ + path = "/queries.json" + request = AsyncRequest("POST", path, **data) + request.set_rfunc(self._aget_resp) + self._connection.make_request(request) + return request + + def send_query(self, data): + """Synchronously send a request. - + :param data: the query: It is coverted to an json object using json.dumps + method. type dict. - + :returns: the prediction. + """ + return self.asend_query(data).get_response() + +class FileExporter(object): + """File exporter to write events to JSON file for batch import - + :param file_name: the destination file name + """ + def __init__(self, file_name): + """Constructor of Exporter. - + """ + self._file = open(file_name, 'w') - def __init__(self, url="http://localhost:8000", threads=1, - qsize=0, timeout=5): - super(EngineClient, self).__init__(url, threads, qsize, timeout) + def create_event(self, event, entity_type, entity_id, + target_entity_type=None, target_entity_id=None, properties=None, + event_time=None): + """Create an event and write to the file. - + (please refer to EventClient's create_event()) - + """ + data = { + "event": event, + "entityType": entity_type, + "entityId": entity_id, + } - def asend_query(self, data): - """Asynchronously send a request to the engine instance with data as the - query. + if target_entity_type is not None: + data["targetEntityType"] = target_entity_type - :param data: the query: It is coverted to an json object using json.dumps - method. type dict. + if target_entity_id is not None: + data["targetEntityId"] = target_entity_id - :returns: - AsyncRequest object. You can call the get_response() method using this - object to get the final resuls or status of this asynchronous request. - """ - path = "/queries.json" - request = AsyncRequest("POST", path, **data) - request.set_rfunc(self._aget_resp) - self._connection.make_request(request) - return request + if properties is not None: + data["properties"] = properties - def send_query(self, data): - """Synchronously send a request. + et = event_time_validation(event_time) + # EventServer uses milliseconds, but python datetime class uses micro. Hence + # need to skip the last three digits. + et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z") + data["eventTime"] = et_str - :param data: the query: It is coverted to an json object using json.dumps - method. type dict. + j = json.dumps(data) + self._file.write(j+"\n") - :returns: the prediction. - """ - return self.asend_query(data).get_response() + def close(self): + """Close the FileExporter - + Call this method when you finish writing all events to JSON file + """ + self._file.close() http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/7b6e210d/predictionio/connection.py ---------------------------------------------------------------------- diff --cc predictionio/connection.py index 04fe7b5,8c231f4..8205bb2 --- a/predictionio/connection.py +++ b/predictionio/connection.py @@@ -68,192 -67,190 +67,233 @@@ class ProgramError(PredictionIOAPIError class AsyncRequest(object): + """AsyncRequest object - """AsyncRequest object + """ - """ + def __init__(self, method, path, **params): + self.method = method # "GET" "POST" etc + # the sub path eg. POST /v1/users.json GET /v1/users/1.json + self.path = path + # dictionary format eg. {"appkey" : 123, "id" : 3} + self.params = params + # use queue to implement response, store AsyncResponse object + self.response_q = Queue.Queue(1) + self.qpath = "%s?%s" % (self.path, urlencode(self.params)) + self._response = None + # response function to be called to handle the response + self.rfunc = None - def __init__(self, method, path, **params): - self.method = method # "GET" "POST" etc - # the sub path eg. POST /v1/users.json GET /v1/users/1.json - self.path = path - # dictionary format eg. {"appkey" : 123, "id" : 3} - self.params = params - # use queue to implement response, store AsyncResponse object - self.response_q = Queue.Queue(1) - self.qpath = "%s?%s" % (self.path, urlencode(self.params)) - self._response = None - # response function to be called to handle the response - self.rfunc = None + def __str__(self): + return "%s %s %s %s" % (self.method, self.path, self.params, + self.qpath) - def __str__(self): - return "%s %s %s %s" % (self.method, self.path, self.params, - self.qpath) + def set_rfunc(self, func): + self.rfunc = func - def set_rfunc(self, func): - self.rfunc = func + def set_response(self, response): + """ store the response - def set_response(self, response): - """ store the response + NOTE: Must be only called once + """ + self.response_q.put(response) + + def get_response(self): + """Get the response. Blocking. + + :returns: self.rfunc's return type. + """ + if self._response is None: + tmp_response = self.response_q.get(True) # NOTE: blocking + if self.rfunc is None: + self._response = tmp_response + else: + self._response = self.rfunc(tmp_response) + + return self._response - NOTE: Must be only called once - """ - self.response_q.put(response) - def get_response(self): - """Get the response. Blocking. + class AsyncResponse(object): + """Store the response of asynchronous request - :returns: self.rfunc's return type. + When get the response, user should check if error is None (which means no + Exception happens). + If error is None, then should check if the status is expected. """ - if self._response is None: - tmp_response = self.response_q.get(True) # NOTE: blocking - if self.rfunc is None: - self._response = tmp_response - else: - self._response = self.rfunc(tmp_response) - return self._response + def __init__(self): + #: exception object if any happens + self.error = None + + self.version = None + self.status = None + self.reason = None + #: Response header. str + self.headers = None + #: Response body. str + self.body = None + #: Jsonified response body. Remains None if conversion is unsuccessful. + self.json_body = None + #: Point back to the AsyncRequest object + self.request = None + + def __str__(self): + return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version, + self.status, self.reason, + self.headers, self.body) + + def set_resp(self, version, status, reason, headers, body): + self.version = version + self.status = status + self.reason = reason + self.headers = headers + self.body = body + # Try to extract the json. + try: + self.json_body = json.loads(body) + except ValueError as ex: # noqa + self.json_body = None + def set_error(self, error): + self.error = error - def set_request(self, request): - self.request = request - +class AsyncResponse(object): + """Store the response of asynchronous request + + When get the response, user should check if error is None (which means no + Exception happens). + If error is None, then should check if the status is expected. + """ + + def __init__(self): + #: exception object if any happens + self.error = None + + self.version = None + self.status = None + self.reason = None + #: Response header. str + self.headers = None + #: Response body. str + self.body = None + #: Jsonified response body. Remains None if conversion is unsuccessful. + self.json_body = None + #: Point back to the AsyncRequest object + self.request = None + + def __str__(self): + return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version, + self.status, self.reason, + self.headers, self.body) + + def set_resp(self, version, status, reason, headers, body): + self.version = version + self.status = status + self.reason = reason + self.headers = headers + self.body = body + # Try to extract the json. + try: + self.json_body = json.loads(body.decode('utf8')) + except ValueError as ex: + self.json_body = None + + def set_error(self, error): + self.error = error + + def set_request(self, request): + self.request = request - class PredictionIOHttpConnection(object): + def __init__(self, host, https=True, timeout=5): + if https: # https connection + self._connection = httplib.HTTPSConnection(host, timeout=timeout) + else: + self._connection = httplib.HTTPConnection(host, timeout=timeout) - def __init__(self, host, https=True, timeout=5): - if https: # https connection - self._connection = httplib.HTTPSConnection(host, timeout=timeout) - else: - self._connection = httplib.HTTPConnection(host, timeout=timeout) - - def connect(self): - self._connection.connect() + def connect(self): + self._connection.connect() - def close(self): - self._connection.close() + def close(self): + self._connection.close() - def request(self, method, url, body={}, headers={}): - """ - http request wrapper function, with retry capability in case of error. - catch error exception and store it in AsyncResponse object - return AsyncResponse object + def request(self, method, url, body={}, headers={}): + """ + http request wrapper function, with retry capability in case of error. + catch error exception and store it in AsyncResponse object + return AsyncResponse object - Args: - method: http method, type str - url: url path, type str - body: http request body content, type dict - header: http request header , type dict - """ + Args: + method: http method, type str + url: url path, type str + body: http request body content, type dict + header: http request header , type dict + """ - response = AsyncResponse() + response = AsyncResponse() - try: - # number of retry in case of error (minimum 0 means no retry) - retry_limit = MAX_RETRY - mod_headers = dict(headers) # copy the headers - mod_headers["Connection"] = "keep-alive" - enc_body = None - if body: # if body is not empty - #enc_body = urlencode(body) - #mod_headers[ - # "Content-type"] = "application/x-www-form-urlencoded" - enc_body = json.dumps(body) - mod_headers[ - "Content-type"] = "application/json" - #mod_headers["Accept"] = "text/plain" - except Exception as e: - response.set_error(e) - return response - - if DEBUG_LOG: - logger.debug("Request m:%s u:%s h:%s b:%s", method, url, - mod_headers, enc_body) - # retry loop - for i in xrange(retry_limit + 1): - try: - if i != 0: - if DEBUG_LOG: - logger.debug("retry request %s times" % i) - if self._connection.sock is None: - self._connection.connect() - self._connection.request(method, url, enc_body, mod_headers) - except Exception as e: - self._connection.close() - if i == retry_limit: - # new copy of e created everytime?? - response.set_error(e) - else: # NOTE: this is try's else clause - # connect() and request() OK try: - resp = self._connection.getresponse() + # number of retry in case of error (minimum 0 means no retry) + retry_limit = MAX_RETRY + mod_headers = dict(headers) # copy the headers + mod_headers["Connection"] = "keep-alive" + enc_body = None + if body: # if body is not empty + # enc_body = urlencode(body) + # mod_headers[ + # "Content-type"] = "application/x-www-form-urlencoded" + enc_body = json.dumps(body) + mod_headers[ + "Content-type"] = "application/json" + # mod_headers["Accept"] = "text/plain" except Exception as e: - self._connection.close() - if i == retry_limit: response.set_error(e) - else: # NOTE: this is try's else clause - # getresponse() OK - resp_version = resp.version # int - resp_status = resp.status # int - resp_reason = resp.reason # str - # resp.getheaders() returns list of tuples - # converted to dict format - resp_headers = dict(resp.getheaders()) - # NOTE: have to read the response before sending out next - # http request - resp_body = resp.read() # str - response.set_resp(version=resp_version, status=resp_status, - reason=resp_reason, headers=resp_headers, - body=resp_body) - break # exit retry loop - # end of retry loop - if DEBUG_LOG: - logger.debug("Response %s", response) - return response # AsyncResponse object + return response + + if DEBUG_LOG: + logger.debug("Request m:%s u:%s h:%s b:%s", method, url, + mod_headers, enc_body) + # retry loop + for i in xrange(retry_limit + 1): + try: + if i != 0: + if DEBUG_LOG: + logger.debug("retry request %s times" % i) + if self._connection.sock is None: + self._connection.connect() + self._connection.request(method, url, enc_body, mod_headers) + except Exception as e: + self._connection.close() + if i == retry_limit: + # new copy of e created everytime?? + response.set_error(e) + else: # NOTE: this is try's else clause + # connect() and request() OK + try: + resp = self._connection.getresponse() + except Exception as e: + self._connection.close() + if i == retry_limit: + response.set_error(e) + else: # NOTE: this is try's else clause + # getresponse() OK + resp_version = resp.version # int + resp_status = resp.status # int + resp_reason = resp.reason # str + # resp.getheaders() returns list of tuples + # converted to dict format + resp_headers = dict(resp.getheaders()) + # NOTE: have to read the response before sending out next + # http request + resp_body = resp.read() # str + response.set_resp(version=resp_version, status=resp_status, + reason=resp_reason, headers=resp_headers, + body=resp_body) + break # exit retry loop + # end of retry loop + if DEBUG_LOG: + logger.debug("Response %s", response) + return response # AsyncResponse object def connection_worker(host, request_queue, https=True, timeout=5, loop=True): http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/7b6e210d/setup.py ---------------------------------------------------------------------- diff --cc setup.py index 15163d1,f0e1038..67b12e3 --- a/setup.py +++ b/setup.py @@@ -10,7 -10,7 +10,7 @@@ __license__ = "Apache License, Version setup( name='PredictionIO', - version="0.9.2", - version="0.8.5", ++ version="0.9.6", author=__author__, author_email=__email__, packages=['predictionio'],
