Merge PR #11, prep 0.9.6 . Clobbered the PR's predictionio/__init__.py 
conflicting changes, fixed the rest


Project: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/commit/7b6e210d
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/tree/7b6e210d
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/diff/7b6e210d

Branch: refs/heads/master
Commit: 7b6e210d7ad8147358685f513fab98f58a79039b
Parents: b85d044 bc67832
Author: EmergentOrder <[email protected]>
Authored: Tue Nov 3 13:04:18 2015 -0400
Committer: EmergentOrder <[email protected]>
Committed: Tue Nov 3 13:04:18 2015 -0400

----------------------------------------------------------------------
 .gitignore                                      |    4 +
 docs/source/conf.py                             |  110 +-
 examples/demo-movielens/appdata.py              |  316 ++---
 examples/demo-movielens/batch_import.py         |  246 ++--
 examples/event_sample.py                        |   48 +-
 examples/import_yahoo.py                        |  261 ++--
 examples/itemrank_quick_query.py                |   18 +-
 examples/itemrank_quick_start.py                |   69 +-
 examples/obsolete/__init__.py                   |    0
 examples/obsolete/itemrec/__init__.py           |    0
 examples/obsolete/itemrec/movies/.gitignore     |    3 -
 examples/obsolete/itemrec/movies/README.md      |   15 -
 examples/obsolete/itemrec/movies/__init__.py    |    0
 examples/obsolete/itemrec/movies/app_config.py  |    2 -
 examples/obsolete/itemrec/movies/appdata.py     |  148 ---
 .../obsolete/itemrec/movies/batch_import.py     |   65 -
 .../obsolete/itemrec/movies/movie_rec_app.py    |  152 ---
 predictionio/__init__.py                        |   40 +-
 predictionio/connection.py                      |  537 ++++----
 predictionio/obsolete.py                        | 1205 ------------------
 setup.py                                        |   32 +-
 tests-obsolete/conversion_test.py               |   16 +
 tests-obsolete/import_testdata.py               |   46 +
 tests-obsolete/import_testdata_id_mismatch.py   |   46 +
 tests-obsolete/import_testdata_special_char.py  |   46 +
 tests-obsolete/predictionio_itemrec_test.py     |  336 +++++
 .../predictionio_itemrec_test_special_char.py   |  366 ++++++
 tests-obsolete/predictionio_itemsim_test.py     |  200 +++
 tests-obsolete/predictionio_test.py             |  257 ++++
 tests/.keep                                     |    0
 tests/conversion_test.py                        |   23 -
 tests/import_testdata.py                        |   52 -
 tests/import_testdata_id_mismatch.py            |   52 -
 tests/import_testdata_special_char.py           |   52 -
 tests/predictionio_itemrec_test.py              |  300 -----
 tests/predictionio_itemrec_test_special_char.py |  291 -----
 tests/predictionio_itemsim_test.py              |  160 ---
 tests/predictionio_test.py                      |  246 ----
 tox.ini                                         |    9 +
 39 files changed, 2171 insertions(+), 3598 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/7b6e210d/docs/source/conf.py
----------------------------------------------------------------------
diff --cc docs/source/conf.py
index 0d87655,539cf04..d52d83e
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@@ -52,9 -52,9 +52,9 @@@ copyright = u'2014, TappingStone, Inc.
  # built documents.
  #
  # The short X.Y version.
 -version = '0.8'
 +version = '0.9'
  # The full version, including alpha/beta/rc tags.
- release = '0.9.2'
 -release = '0.8.3'
++release = '0.9.6'
  
  # The language for content autogenerated by Sphinx. Refer to documentation
  # for a list of supported languages.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/7b6e210d/predictionio/__init__.py
----------------------------------------------------------------------
diff --cc predictionio/__init__.py
index 290c31e,f4eb87a..e5e5387
--- a/predictionio/__init__.py
+++ b/predictionio/__init__.py
@@@ -1,14 -1,10 +1,13 @@@
--"""PredictoinIO Python SDK
--
--The PredictoinIO Python SDK provides easy-to-use functions for integrating
++"""PredictionIO Python SDK
++The PredictionIO Python SDK provides easy-to-use functions for integrating
  Python applications with PredictionIO REST API services.
  """
  
 -__version__ = "0.8.5"
 +
- __version__ = "0.9.2"
++__version__ = "0.9.6"
 +
 +# import deprecated libraries.
 +from predictionio.obsolete import Client, InvalidArgumentError
  
  # import packages
  import re
@@@ -62,530 -51,405 +61,497 @@@ def event_time_validation(t)
  
  
  class BaseClient(object):
 -    def __init__(self, url, threads=1, qsize=0, timeout=5):
 -        """Constructor of Client object.
 -
 -        """
 -        self.threads = threads
 -        self.url = url
 -        self.qsize = qsize
 -        self.timeout = timeout
 -
 -        # check connection type
 -        https_pattern = r'^https://(.*)'
 -        http_pattern = r'^http://(.*)'
 -        m = re.match(https_pattern, url)
 -        self.https = True
 -        if m is None:  # not matching https
 -            m = re.match(http_pattern, url)
 -            self.https = False
 -            if m is None:  # not matching http either
 -                raise InvalidArgumentError("url is not valid: %s" % url)  # 
noqa
 -        self.host = m.group(1)
 -
 -        self._uid = None  # identified uid
 -        self._connection = Connection(host=self.host, threads=self.threads,
 -                                      qsize=self.qsize, https=self.https,
 -                                      timeout=self.timeout)
 -
 -    def close(self):
 -        """Close this client and the connection.
 -
 -        Call this method when you want to completely terminate the connection
 -        with PredictionIO.
 -        It will wait for all pending requests to finish.
 -        """
 -        self._connection.close()
 -
 -    def pending_requests(self):
 -        """Return the number of pending requests.
 -
 -        :returns:
 -          The number of pending requests of this client.
 -        """
 -        return self._connection.pending_requests()
 -
 -    def get_status(self):
 -        """Get the status of the PredictionIO API Server
 -
 -        :returns:
 -          status message.
 -
 -        :raises:
 -          ServerStatusError.
 -        """
 -        path = "/"
 -        request = AsyncRequest("GET", path)
 -        request.set_rfunc(self._aget_resp)
 -        self._connection.make_request(request)
 -        result = request.get_response()
 -        return result
 -
 -    def _acreate_resp(self, response):
 -        if response.error is not None:
 -            raise NotCreatedError("Exception happened: %s for request %s" %
 -                                  (response.error, response.request))
 -        elif response.status != httplib.CREATED:
 -            raise NotCreatedError("request: %s status: %s body: %s" %
 -                                  (response.request, response.status,
 -                                   response.body))
 -
 -        return response
 -
 -    def _aget_resp(self, response):
 -        if response.error is not None:
 -            raise NotFoundError("Exception happened: %s for request %s" %
 -                                (response.error, response.request))
 -        elif response.status != httplib.OK:
 -            raise NotFoundError("request: %s status: %s body: %s" %
 -                                (response.request, response.status,
 -                                 response.body))
 -
 -        return response.json_body
 -
 -    def _adelete_resp(self, response):
 -        if response.error is not None:
 -            raise NotFoundError("Exception happened: %s for request %s" %
 -                                (response.error, response.request))
 -        elif response.status != httplib.OK:
 -            raise NotFoundError("request: %s status: %s body: %s" %
 -                                (response.request, response.status,
 -                                 response.body))
 -
 -        return response.body
 +  def __init__(self, url, threads=1, qsize=0, timeout=5):
 +    """Constructor of Client object.
- 
 +    """
 +    self.threads = threads
 +    self.url = url
 +    self.qsize = qsize
 +    self.timeout = timeout
 +
 +    # check connection type
 +    https_pattern = r'^https://(.*)'
 +    http_pattern = r'^http://(.*)'
 +    m = re.match(https_pattern, url)
 +    self.https = True
 +    if m is None:  # not matching https
 +      m = re.match(http_pattern, url)
 +      self.https = False
 +      if m is None:  # not matching http either
 +        raise InvalidArgumentError("url is not valid: %s" % url)
 +    self.host = m.group(1)
 +
 +    self._uid = None  # identified uid
 +    self._connection = Connection(host=self.host, threads=self.threads,
 +                    qsize=self.qsize, https=self.https,
 +                    timeout=self.timeout)
 +
 +  def close(self):
 +    """Close this client and the connection.
- 
 +    Call this method when you want to completely terminate the connection
 +    with PredictionIO.
 +    It will wait for all pending requests to finish.
 +    """
 +    self._connection.close()
 +
 +  def pending_requests(self):
 +    """Return the number of pending requests.
- 
 +    :returns:
 +      The number of pending requests of this client.
 +    """
 +    return self._connection.pending_requests()
 +
 +  def get_status(self):
 +    """Get the status of the PredictionIO API Server
- 
 +    :returns:
 +      status message.
- 
 +    :raises:
 +      ServerStatusError.
 +    """
 +    path = "/"
 +    request = AsyncRequest("GET", path)
 +    request.set_rfunc(self._aget_resp)
 +    self._connection.make_request(request)
 +    result = request.get_response()
 +    return result
 +
 +  def _acreate_resp(self, response):
 +    if response.error is not None:
 +      raise NotCreatedError("Exception happened: %s for request %s" %
 +                    (response.error, response.request))
 +    elif response.status != httplib.CREATED:
 +      raise NotCreatedError("request: %s status: %s body: %s" %
 +                    (response.request, response.status,
 +                     response.body))
 +
 +    return response
 +
 +  def _aget_resp(self, response):
 +    if response.error is not None:
 +      raise NotFoundError("Exception happened: %s for request %s" %
 +                  (response.error, response.request))
 +    elif response.status != httplib.OK:
 +      raise NotFoundError("request: %s status: %s body: %s" %
 +                  (response.request, response.status,
 +                   response.body))
 +
 +    return response.json_body
 +
 +  def _adelete_resp(self, response):
 +    if response.error is not None:
 +      raise NotFoundError("Exception happened: %s for request %s" %
 +                  (response.error, response.request))
 +    elif response.status != httplib.OK:
 +      raise NotFoundError("request: %s status: %s body: %s" %
 +                  (response.request, response.status,
 +                   response.body))
 +
 +    return response.body
  
  
  class EventClient(BaseClient):
 -    """Client for importing data into PredictionIO Event Server.
 -
 -    Notice that app_id has been deprecated as of 0.8.2. Please use 
access_token
 -    instead.
 -
 -    :param access_key: the access key for your application.
 -    :param url: the url of PredictionIO Event Server.
 -    :param threads: number of threads to handle PredictionIO API requests.
 -            Must be >= 1.
 -    :param qsize: the max size of the request queue (optional).
 -        The asynchronous request becomes blocking once this size has been
 -        reached, until the queued requests are handled.
 -        Default value is 0, which means infinite queue size.
 -    :param timeout: timeout for HTTP connection attempts and requests in
 -      seconds (optional).
 -      Default value is 5.
 +  """Client for importing data into PredictionIO Event Server.
- 
 +  Notice that app_id has been deprecated as of 0.8.2. Please use access_token
 +  instead.
- 
 +  :param access_key: the access key for your application.
 +  :param url: the url of PredictionIO Event Server.
 +  :param threads: number of threads to handle PredictionIO API requests.
 +          Must be >= 1.
 +  :param qsize: the max size of the request queue (optional).
 +      The asynchronous request becomes blocking once this size has been
 +      reached, until the queued requests are handled.
 +      Default value is 0, which means infinite queue size.
 +  :param timeout: timeout for HTTP connection attempts and requests in
 +    seconds (optional).
 +    Default value is 5.
 +  :param channel: channel name (optional)
 +  """
 +
 +  def __init__(self, access_key,
 +      url="http://localhost:7070";,
 +      threads=1, qsize=0, timeout=5, channel=None):
 +    assert type(access_key) is str, ("access_key must be string. "
 +        "Notice that app_id has been deprecated in Prediction.IO 0.8.2. "
 +        "Please use access_key instead.")
 +
 +    super(EventClient, self).__init__(url, threads, qsize, timeout)
 +
 +    if len(access_key) <= 8:
 +      raise DeprecationWarning(
 +          "It seems like you are specifying an app_id. It is deprecated in "
 +          "Prediction.IO 0.8.2. Please use access_key instead. Or, "
 +          "you may use an earlier version of this sdk.")
 +
 +    self.access_key = access_key
 +    self.channel = channel
 +
 +  def acreate_event(self, event, entity_type, entity_id,
 +      target_entity_type=None, target_entity_id=None, properties=None,
 +      event_time=None):
 +    """Asynchronously create an event.
- 
 +    :param event: event name. type str.
 +    :param entity_type: entity type. It is the namespace of the entityId and
 +      analogous to the table name of a relational database. The entityId must 
be
 +      unique within same entityType. type str.
 +    :param entity_id: entity id. *entity_type-entity_id* becomes the unique
 +      identifier of the entity. For example, you may have entityType named 
user,
 +      and different entity IDs, say 1 and 2. In this case, user-1 and user-2
 +      uniquely identifies entities. type str
 +    :param target_entity_type: target entity type. type str.
 +    :param target_entity_id: target entity id. type str.
 +    :param properties: a custom dict associated with an event. type dict.
 +    :param event_time: the time of the event. type datetime, must contain
 +      timezone info.
- 
 +    :returns:
 +      AsyncRequest object. You can call the get_response() method using this
 +      object to get the final resuls or status of this asynchronous request.
      """
 +    data = {
 +        "event": event,
 +        "entityType": entity_type,
 +        "entityId": entity_id,
 +        }
 +
 +    if target_entity_type is not None:
 +      data["targetEntityType"] = target_entity_type
 +
 +    if target_entity_id is not None:
 +      data["targetEntityId"] = target_entity_id
 +
 +    if properties is not None:
 +      data["properties"] = properties
  
 -    def __init__(self, access_key,
 -                 url="http://localhost:7070";,
 -                 threads=1, qsize=0, timeout=5):
 -        assert type(access_key) is str, ("access_key must be string. "
 -                                         "Notice that app_id has been 
deprecated in Prediction.IO 0.8.2. "
 -                                         "Please use access_key instead.")
 -
 -        super(EventClient, self).__init__(url, threads, qsize, timeout)
 -
 -        if len(access_key) <= 8:
 -            raise DeprecationWarning(
 -                "It seems like you are specifying an app_id. It is deprecated 
in "
 -                "Prediction.IO 0.8.2. Please use access_key instead. Or, "
 -                "you may use an earlier version of this sdk.")
 -
 -        self.access_key = access_key
 -
 -    def acreate_event(self, event, entity_type, entity_id,
 -                      target_entity_type=None, target_entity_id=None, 
properties=None,
 -                      event_time=None):
 -        """Asynchronously create an event.
 -
 -        :param event: event name. type str.
 -        :param entity_type: entity type. It is the namespace of the entityId 
and
 -          analogous to the table name of a relational database. The entityId 
must be
 -          unique within same entityType. type str.
 -        :param entity_id: entity id. *entity_type-entity_id* becomes the 
unique
 -          identifier of the entity. For example, you may have entityType 
named user,
 -          and different entity IDs, say 1 and 2. In this case, user-1 and 
user-2
 -          uniquely identifies entities. type str
 -        :param target_entity_type: target entity type. type str.
 -        :param target_entity_id: target entity id. type str.
 -        :param properties: a custom dict associated with an event. type dict.
 -        :param event_time: the time of the event. type datetime, must contain
 -          timezone info.
 -
 -        :returns:
 -          AsyncRequest object. You can call the get_response() method using 
this
 -          object to get the final resuls or status of this asynchronous 
request.
 -        """
 -        data = {
 -            "event": event,
 -            "entityType": entity_type,
 -            "entityId": entity_id,
 +    et = event_time_validation(event_time)
 +    # EventServer uses milliseconds, but python datetime class uses micro. 
Hence
 +    # need to skip the last three digits.
 +    et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
 +    data["eventTime"] = et_str
 +
 +    qparam = {
 +        "accessKey" : self.access_key
          }
  
 -        if target_entity_type is not None:
 -            data["targetEntityType"] = target_entity_type
 -
 -        if target_entity_id is not None:
 -            data["targetEntityId"] = target_entity_id
 -
 -        if properties is not None:
 -            data["properties"] = properties
 -
 -        et = event_time_validation(event_time)
 -        # EventServer uses milliseconds, but python datetime class uses 
micro. Hence
 -        # need to skip the last three digits.
 -        et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
 -        data["eventTime"] = et_str
 -
 -        path = "/events.json?accessKey=%s" % (self.access_key, )
 -        request = AsyncRequest("POST", path, **data)
 -        request.set_rfunc(self._acreate_resp)
 -        self._connection.make_request(request)
 -        return request
 -
 -    def create_event(self, event, entity_type, entity_id,
 -                     target_entity_type=None, target_entity_id=None, 
properties=None,
 -                     event_time=None):
 -        """Synchronously (blocking) create an event."""
 -        return self.acreate_event(event, entity_type, entity_id,
 -                                  target_entity_type, target_entity_id, 
properties,
 -                                  event_time).get_response()
 -
 -    def aget_event(self, event_id):
 -        """Asynchronouly get an event from Event Server.
 -
 -        :param event_id: event id returned by the EventServer when creating 
the
 -          event.
 -
 -        :returns:
 -          AsyncRequest object.
 -        """
 -        enc_event_id = urllib.quote(event_id, "")  # replace special char 
with %xx
 -        path = "/events/%s.json" % enc_event_id
 -        request = AsyncRequest("GET", path, accessKey=self.access_key)
 -        request.set_rfunc(self._aget_resp)
 -        self._connection.make_request(request)
 -        return request
 -
 -    def get_event(self, event_id):
 -        """Synchronouly get an event from Event Server."""
 -        return self.aget_event(event_id).get_response()
 -
 -    def adelete_event(self, event_id):
 -        """Asynchronouly delete an event from Event Server.
 -
 -        :param event_id: event id returned by the EventServer when creating 
the
 -          event.
 -
 -        :returns:
 -          AsyncRequest object.
 -        """
 -        enc_event_id = urllib.quote(event_id, "")  # replace special char 
with %xx
 -        path = "/events/%s.json" % (enc_event_id, )
 -        request = AsyncRequest("DELETE", path, accessKey=self.access_key)
 -        request.set_rfunc(self._adelete_resp)
 -        self._connection.make_request(request)
 -        return request
 -
 -    def delete_event(self, event_id):
 -        """Synchronouly delete an event from Event Server."""
 -        return self.adelete_event(event_id).get_response()
 -
 -    # Below are helper functions
 -
 -    def aset_user(self, uid, properties={}, event_time=None):
 -        """Set properties of a user.
 -
 -        Wrapper of acreate_event function, setting event to "$set" and 
entity_type
 -        to "user".
 -        """
 -        return self.acreate_event(
 -            event="$set",
 -            entity_type="user",
 -            entity_id=uid,
 -            properties=properties,
 -            event_time=event_time,
 -        )
 +    if self.channel is not None:
 +      qparam["channel"] = self.channel
 +
 +    path = "/events.json?%s" % (urlencode(qparam), )
 +
 +    request = AsyncRequest("POST", path, **data)
 +    request.set_rfunc(self._acreate_resp)
 +    self._connection.make_request(request)
 +    return request
 +
 +  def create_event(self, event, entity_type, entity_id,
 +      target_entity_type=None, target_entity_id=None, properties=None,
 +      event_time=None):
 +    """Synchronously (blocking) create an event."""
 +    return self.acreate_event(event, entity_type, entity_id,
 +        target_entity_type, target_entity_id, properties,
 +        event_time).get_response()
 +
 +  def aget_event(self, event_id):
 +    """Asynchronouly get an event from Event Server.
- 
 +    :param event_id: event id returned by the EventServer when creating the
 +      event.
- 
 +    :returns:
 +      AsyncRequest object.
 +    """
 +    qparam = {
 +        "accessKey" : self.access_key
 +        }
 +
 +    if self.channel is not None:
 +      qparam["channel"] = self.channel
 +
 +    enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
 +    path = "/events/%s.json" % (enc_event_id, )
 +    request = AsyncRequest("GET", path, **qparam)
 +    request.set_rfunc(self._aget_resp)
 +    self._connection.make_request(request)
 +    return request
 +
 +  def get_event(self, event_id):
 +    """Synchronouly get an event from Event Server."""
 +    return self.aget_event(event_id).get_response()
 +
 +  def aget_events(self, startTime=None, untilTime=None, entityType=None, 
entityId=None, limit=None, reversed=False):
 +    """Asynchronouly get events from Event Server. (Getting events through 
the Event Server API is used for debugging and not recommended for production)
- 
 +    :param startTime: time in ISO8601 format. Return events with eventTime >= 
startTime.
 +    :param untilTime: time in ISO8601 format. Return events with eventTime < 
untilTime.
 +    :param entityId: String. The entityId. Return events for this entityId 
only.
 +    :param limit: Integer. The number of record events returned. Default is 
20. -1 to get all.
 +    :param reversed: Boolean. Must be used with both entityType and entityId 
specified,
 +      returns events in reversed chronological order. Default is false.
- 
 +    :returns:
 +      AsyncRequest object.
 +    """
 +    qparam = {
 +        "accessKey" : self.access_key,
 +        "reversed": reversed
 +      }
 +
 +    if startTime is not None:
 +      qparam["startTime"] = startTime
 +
 +    if untilTime is not None:
 +      qparam["untilTime"] = untilTime
 +
 +    if entityType is not None:
 +      qparam["entityType"] = entityType
 +
 +    if entityId is not None:
 +      qparam["entityId"] = entityId
 +
 +    if limit is not None:
 +      qparam["limit"] = limit
 +
 +    if self.channel is not None:
 +      qparam["channel"] = self.channel
 +    path = "/events.json"
 +    request = AsyncRequest("GET", path, **qparam)
 +    request.set_rfunc(self._aget_resp)
 +    self._connection.make_request(request)
 +    return request
 +
 +  def get_events(self, startTime=None, untilTime=None, entityType=None, 
entityId=None, limit=None, reversed=False):
 +    """Synchronouly get event from Event Server. (Getting events through the 
Event Server API is used for debugging and not recommended for production)"""
 +    return self.aget_events(
 +      startTime=startTime,
 +      untilTime=untilTime,
 +      entityType=entityType,
 +      entityId=entityId,
 +      limit=limit,
 +      reversed=reversed
 +    ).get_response()
 +
 +  def adelete_event(self, event_id):
 +    """Asynchronouly delete an event from Event Server.
- 
 +    :param event_id: event id returned by the EventServer when creating the
 +      event.
- 
 +    :returns:
 +      AsyncRequest object.
 +    """
 +    qparam = {
 +        "accessKey" : self.access_key
 +        }
 +
 +    if self.channel is not None:
 +      qparam["channel"] = self.channel
 +
 +    enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
 +    path = "/events/%s.json" % (enc_event_id, )
 +    request = AsyncRequest("DELETE", path, **qparam)
 +    request.set_rfunc(self._adelete_resp)
 +    self._connection.make_request(request)
 +    return request
 +
 +  def delete_event(self, event_id):
 +    """Synchronouly delete an event from Event Server."""
 +    return self.adelete_event(event_id).get_response()
  
 -    def set_user(self, uid, properties={}, event_time=None):
 -        """Set properties of a user"""
 -        return self.aset_user(uid, properties, event_time).get_response()
 -
 -    def aunset_user(self, uid, properties, event_time=None):
 -        """Unset properties of an user.
 -
 -        Wrapper of acreate_event function, setting event to "$unset" and 
entity_type
 -        to "user".
 -        """
 -        # check properties={}, it cannot be empty
 -        return self.acreate_event(
 -            event="$unset",
 -            entity_type="user",
 -            entity_id=uid,
 -            properties=properties,
 -            event_time=event_time,
 +  ## Below are helper functions
 +
 +  def aset_user(self, uid, properties={}, event_time=None):
 +    """Set properties of a user.
- 
 +    Wrapper of acreate_event function, setting event to "$set" and entity_type
 +    to "user".
 +    """
 +    return self.acreate_event(
 +      event="$set",
 +      entity_type="user",
 +      entity_id=uid,
 +      properties=properties,
 +      event_time=event_time,
 +    )
 +
 +  def set_user(self, uid, properties={}, event_time=None):
 +    """Set properties of a user"""
 +    return self.aset_user(uid, properties, event_time).get_response()
 +
 +  def aunset_user(self, uid, properties, event_time=None):
 +    """Unset properties of an user.
- 
 +    Wrapper of acreate_event function, setting event to "$unset" and 
entity_type
 +    to "user".
 +    """
 +    # check properties={}, it cannot be empty
 +    return self.acreate_event(
 +        event="$unset",
 +        entity_type="user",
 +        entity_id=uid,
 +        properties=properties,
 +        event_time=event_time,
          )
  
 -    def unset_user(self, uid, properties, event_time=None):
 -        """Set properties of a user"""
 -        return self.aunset_user(uid, properties, event_time).get_response()
 -
 -    def adelete_user(self, uid, event_time=None):
 -        """Delete a user.
 -
 -        Wrapper of acreate_event function, setting event to "$delete" and 
entity_type
 -        to "user".
 -        """
 -        return self.acreate_event(
 -            event="$delete",
 -            entity_type="user",
 -            entity_id=uid,
 -            event_time=event_time)
 -
 -    def delete_user(self, uid, event_time=None):
 -        """Delete a user."""
 -        return self.adelete_user(uid, event_time).get_response()
 -
 -    def aset_item(self, iid, properties={}, event_time=None):
 -        """Set properties of an item.
 -
 -        Wrapper of acreate_event function, setting event to "$set" and 
entity_type
 -        to "item".
 -        """
 -        return self.acreate_event(
 -            event="$set",
 -            entity_type="item",
 -            entity_id=iid,
 -            properties=properties,
 -            event_time=event_time)
 -
 -    def set_item(self, iid, properties={}, event_time=None):
 -        """Set properties of an item."""
 -        return self.aset_item(iid, properties, event_time).get_response()
 -
 -    def aunset_item(self, iid, properties={}, event_time=None):
 -        """Unset properties of an item.
 -
 -        Wrapper of acreate_event function, setting event to "$unset" and 
entity_type
 -        to "item".
 -        """
 -        return self.acreate_event(
 -            event="$unset",
 -            entity_type="item",
 -            entity_id=iid,
 -            properties=properties,
 -            event_time=event_time)
 -
 -    def unset_item(self, iid, properties={}, event_time=None):
 -        """Unset properties of an item."""
 -        return self.aunset_item(iid, properties, event_time).get_response()
 -
 -    def adelete_item(self, iid, event_time=None):
 -        """Delete an item.
 -
 -        Wrapper of acreate_event function, setting event to "$delete" and 
entity_type
 -        to "item".
 -        """
 -        return self.acreate_event(
 -            event="$delete",
 -            entity_type="item",
 -            entity_id=iid,
 -            event_time=event_time)
 -
 -    def delete_item(self, iid, event_time=None):
 -        """Delete an item."""
 -        return self.adelete_item(iid, event_time).get_response()
 -
 -    def arecord_user_action_on_item(self, action, uid, iid, properties={},
 -                                    event_time=None):
 -        """Create a user-to-item action.
 -
 -        Wrapper of acreate_event function, setting entity_type to "user" and
 -        target_entity_type to "item".
 -        """
 -        return self.acreate_event(
 -            event=action,
 -            entity_type="user",
 -            entity_id=uid,
 -            target_entity_type="item",
 -            target_entity_id=iid,
 -            properties=properties,
 -            event_time=event_time)
 -
 -    def record_user_action_on_item(self, action, uid, iid, properties={},
 -                                   event_time=None):
 -        """Create a user-to-item action."""
 -        return self.arecord_user_action_on_item(
 -            action, uid, iid, properties, event_time).get_response()
 +  def unset_user(self, uid, properties, event_time=None):
 +    """Set properties of a user"""
 +    return self.aunset_user(uid, properties, event_time).get_response()
  
 +  def adelete_user(self, uid, event_time=None):
 +    """Delete a user.
- 
 +    Wrapper of acreate_event function, setting event to "$delete" and 
entity_type
 +    to "user".
 +    """
 +    return self.acreate_event(
 +        event="$delete",
 +        entity_type="user",
 +        entity_id=uid,
 +        event_time=event_time)
 +
 +  def delete_user(self, uid, event_time=None):
 +    """Delete a user."""
 +    return self.adelete_user(uid, event_time).get_response()
 +
 +  def aset_item(self, iid, properties={}, event_time=None):
 +    """Set properties of an item.
- 
 +    Wrapper of acreate_event function, setting event to "$set" and entity_type
 +    to "item".
 +    """
 +    return self.acreate_event(
 +        event="$set",
 +        entity_type="item",
 +        entity_id=iid,
 +        properties=properties,
 +        event_time=event_time)
 +
 +  def set_item(self, iid, properties={}, event_time=None):
 +    """Set properties of an item."""
 +    return self.aset_item(iid, properties, event_time).get_response()
 +
 +  def aunset_item(self, iid, properties={}, event_time=None):
 +    """Unset properties of an item.
- 
 +    Wrapper of acreate_event function, setting event to "$unset" and 
entity_type
 +    to "item".
 +    """
 +    return self.acreate_event(
 +        event="$unset",
 +        entity_type="item",
 +        entity_id=iid,
 +        properties=properties,
 +        event_time=event_time)
 +
 +  def unset_item(self, iid, properties={}, event_time=None):
 +    """Unset properties of an item."""
 +    return self.aunset_item(iid, properties, event_time).get_response()
 +
 +  def adelete_item(self, iid, event_time=None):
 +    """Delete an item.
- 
 +    Wrapper of acreate_event function, setting event to "$delete" and 
entity_type
 +    to "item".
 +    """
 +    return self.acreate_event(
 +        event="$delete",
 +        entity_type="item",
 +        entity_id=iid,
 +        event_time=event_time)
 +
 +  def delete_item(self, iid, event_time=None):
 +    """Delete an item."""
 +    return self.adelete_item(iid, event_time).get_response()
 +
 +  def arecord_user_action_on_item(self, action, uid, iid, properties={},
 +      event_time=None):
 +    """Create a user-to-item action.
- 
 +    Wrapper of acreate_event function, setting entity_type to "user" and
 +    target_entity_type to "item".
 +    """
 +    return self.acreate_event(
 +        event=action,
 +        entity_type="user",
 +        entity_id=uid,
 +        target_entity_type="item",
 +        target_entity_id=iid,
 +        properties=properties,
 +        event_time=event_time)
 +
 +  def record_user_action_on_item(self, action, uid, iid, properties={},
 +      event_time=None):
 +    """Create a user-to-item action."""
 +    return self.arecord_user_action_on_item(
 +      action, uid, iid, properties, event_time).get_response()
  
 -class EngineClient(BaseClient):
 -    """Client for extracting prediction results from an PredictionIO Engine
 -    Instance.
 -
 -    :param url: the url of the PredictionIO Engine Instance.
 -    :param threads: number of threads to handle PredictionIO API requests.
 -            Must be >= 1.
 -    :param qsize: the max size of the request queue (optional).
 -        The asynchronous request becomes blocking once this size has been
 -        reached, until the queued requests are handled.
 -        Default value is 0, which means infinite queue size.
 -    :param timeout: timeout for HTTP connection attempts and requests in
 -      seconds (optional).
 -      Default value is 5.
  
 +class EngineClient(BaseClient):
 +  """Client for extracting prediction results from an PredictionIO Engine
 +  Instance.
- 
 +  :param url: the url of the PredictionIO Engine Instance.
 +  :param threads: number of threads to handle PredictionIO API requests.
 +          Must be >= 1.
 +  :param qsize: the max size of the request queue (optional).
 +      The asynchronous request becomes blocking once this size has been
 +      reached, until the queued requests are handled.
 +      Default value is 0, which means infinite queue size.
 +  :param timeout: timeout for HTTP connection attempts and requests in
 +    seconds (optional).
 +    Default value is 5.
- 
 +  """
 +  def __init__(self, url="http://localhost:8000";, threads=1,
 +      qsize=0, timeout=5):
 +    super(EngineClient, self).__init__(url, threads, qsize, timeout)
 +
 +  def asend_query(self, data):
 +    """Asynchronously send a request to the engine instance with data as the
 +    query.
- 
 +    :param data: the query: It is coverted to an json object using json.dumps
 +      method. type dict.
- 
 +    :returns:
 +      AsyncRequest object. You can call the get_response() method using this
 +      object to get the final resuls or status of this asynchronous request.
      """
 +    path = "/queries.json"
 +    request = AsyncRequest("POST", path, **data)
 +    request.set_rfunc(self._aget_resp)
 +    self._connection.make_request(request)
 +    return request
 +
 +  def send_query(self, data):
 +    """Synchronously send a request.
- 
 +    :param data: the query: It is coverted to an json object using json.dumps
 +      method. type dict.
- 
 +    :returns: the prediction.
 +    """
 +    return self.asend_query(data).get_response()
 +
 +class FileExporter(object):
 +  """File exporter to write events to JSON file for batch import
- 
 +  :param file_name: the destination file name
 +  """
 +  def __init__(self, file_name):
 +    """Constructor of Exporter.
- 
 +    """
 +    self._file = open(file_name, 'w')
  
 -    def __init__(self, url="http://localhost:8000";, threads=1,
 -                 qsize=0, timeout=5):
 -        super(EngineClient, self).__init__(url, threads, qsize, timeout)
 +  def create_event(self, event, entity_type, entity_id,
 +      target_entity_type=None, target_entity_id=None, properties=None,
 +      event_time=None):
 +    """Create an event and write to the file.
- 
 +    (please refer to EventClient's create_event())
- 
 +    """
 +    data = {
 +        "event": event,
 +        "entityType": entity_type,
 +        "entityId": entity_id,
 +        }
  
 -    def asend_query(self, data):
 -        """Asynchronously send a request to the engine instance with data as 
the
 -        query.
 +    if target_entity_type is not None:
 +      data["targetEntityType"] = target_entity_type
  
 -        :param data: the query: It is coverted to an json object using 
json.dumps
 -          method. type dict.
 +    if target_entity_id is not None:
 +      data["targetEntityId"] = target_entity_id
  
 -        :returns:
 -          AsyncRequest object. You can call the get_response() method using 
this
 -          object to get the final resuls or status of this asynchronous 
request.
 -        """
 -        path = "/queries.json"
 -        request = AsyncRequest("POST", path, **data)
 -        request.set_rfunc(self._aget_resp)
 -        self._connection.make_request(request)
 -        return request
 +    if properties is not None:
 +      data["properties"] = properties
  
 -    def send_query(self, data):
 -        """Synchronously send a request.
 +    et = event_time_validation(event_time)
 +    # EventServer uses milliseconds, but python datetime class uses micro. 
Hence
 +    # need to skip the last three digits.
 +    et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
 +    data["eventTime"] = et_str
  
 -        :param data: the query: It is coverted to an json object using 
json.dumps
 -          method. type dict.
 +    j = json.dumps(data)
 +    self._file.write(j+"\n")
  
 -        :returns: the prediction.
 -        """
 -        return self.asend_query(data).get_response()
 +  def close(self):
 +    """Close the FileExporter
- 
 +    Call this method when you finish writing all events to JSON file
 +    """
 +    self._file.close()

http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/7b6e210d/predictionio/connection.py
----------------------------------------------------------------------
diff --cc predictionio/connection.py
index 04fe7b5,8c231f4..8205bb2
--- a/predictionio/connection.py
+++ b/predictionio/connection.py
@@@ -68,192 -67,190 +67,233 @@@ class ProgramError(PredictionIOAPIError
  
  
  class AsyncRequest(object):
+     """AsyncRequest object
  
-   """AsyncRequest object
+     """
  
-   """
+     def __init__(self, method, path, **params):
+         self.method = method  # "GET" "POST" etc
+         # the sub path eg. POST /v1/users.json  GET /v1/users/1.json
+         self.path = path
+         # dictionary format eg. {"appkey" : 123, "id" : 3}
+         self.params = params
+         # use queue to implement response, store AsyncResponse object
+         self.response_q = Queue.Queue(1)
+         self.qpath = "%s?%s" % (self.path, urlencode(self.params))
+         self._response = None
+         # response function to be called to handle the response
+         self.rfunc = None
  
-   def __init__(self, method, path, **params):
-     self.method = method  # "GET" "POST" etc
-     # the sub path eg. POST /v1/users.json  GET /v1/users/1.json
-     self.path = path
-     # dictionary format eg. {"appkey" : 123, "id" : 3}
-     self.params = params
-     # use queue to implement response, store AsyncResponse object
-     self.response_q = Queue.Queue(1)
-     self.qpath = "%s?%s" % (self.path, urlencode(self.params))
-     self._response = None
-     # response function to be called to handle the response
-     self.rfunc = None
+     def __str__(self):
+         return "%s %s %s %s" % (self.method, self.path, self.params,
+                                 self.qpath)
  
-   def __str__(self):
-     return "%s %s %s %s" % (self.method, self.path, self.params,
-                 self.qpath)
+     def set_rfunc(self, func):
+         self.rfunc = func
  
-   def set_rfunc(self, func):
-     self.rfunc = func
+     def set_response(self, response):
+         """ store the response
  
-   def set_response(self, response):
-     """ store the response
+         NOTE: Must be only called once
+         """
+         self.response_q.put(response)
+ 
+     def get_response(self):
+         """Get the response. Blocking.
+ 
+         :returns: self.rfunc's return type.
+         """
+         if self._response is None:
+             tmp_response = self.response_q.get(True)  # NOTE: blocking
+             if self.rfunc is None:
+                 self._response = tmp_response
+             else:
+                 self._response = self.rfunc(tmp_response)
+ 
+         return self._response
  
-     NOTE: Must be only called once
-     """
-     self.response_q.put(response)
  
-   def get_response(self):
-     """Get the response. Blocking.
+ class AsyncResponse(object):
+     """Store the response of asynchronous request
  
-     :returns: self.rfunc's return type.
+     When get the response, user should check if error is None (which means no
+     Exception happens).
+     If error is None, then should check if the status is expected.
      """
-     if self._response is None:
-       tmp_response = self.response_q.get(True)  # NOTE: blocking
-       if self.rfunc is None:
-         self._response = tmp_response
-       else:
-         self._response = self.rfunc(tmp_response)
  
-     return self._response
+     def __init__(self):
+         #: exception object if any happens
+         self.error = None
+ 
+         self.version = None
+         self.status = None
+         self.reason = None
+         #: Response header. str
+         self.headers = None
+         #: Response body. str
+         self.body = None
+         #: Jsonified response body. Remains None if conversion is 
unsuccessful.
+         self.json_body = None
+         #: Point back to the AsyncRequest object
+         self.request = None
+ 
+     def __str__(self):
+         return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version,
+                                                   self.status, self.reason,
+                                                   self.headers, self.body)
+ 
+     def set_resp(self, version, status, reason, headers, body):
+         self.version = version
+         self.status = status
+         self.reason = reason
+         self.headers = headers
+         self.body = body
+         # Try to extract the json.
+         try:
+             self.json_body = json.loads(body)
+         except ValueError as ex:  # noqa
+             self.json_body = None
  
+     def set_error(self, error):
+         self.error = error
  
 -    def set_request(self, request):
 -        self.request = request
 -
 +class AsyncResponse(object):
 +  """Store the response of asynchronous request
 +
 +  When get the response, user should check if error is None (which means no
 +  Exception happens).
 +  If error is None, then should check if the status is expected.
 +  """
 +
 +  def __init__(self):
 +    #: exception object if any happens
 +    self.error = None
 +
 +    self.version = None
 +    self.status = None
 +    self.reason = None
 +    #: Response header. str
 +    self.headers = None
 +    #: Response body. str
 +    self.body = None
 +    #: Jsonified response body. Remains None if conversion is unsuccessful.
 +    self.json_body = None
 +    #: Point back to the AsyncRequest object
 +    self.request = None  
 +
 +  def __str__(self):
 +    return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version,
 +                          self.status, self.reason,
 +                          self.headers, self.body)
 +
 +  def set_resp(self, version, status, reason, headers, body):
 +    self.version = version
 +    self.status = status
 +    self.reason = reason
 +    self.headers = headers
 +    self.body = body
 +    # Try to extract the json.
 +    try:
 +      self.json_body = json.loads(body.decode('utf8'))
 +    except ValueError as ex:
 +      self.json_body = None
 +
 +  def set_error(self, error):
 +    self.error = error
 +
 +  def set_request(self, request):
 +    self.request = request
  
- 
  class PredictionIOHttpConnection(object):
+     def __init__(self, host, https=True, timeout=5):
+         if https:  # https connection
+             self._connection = httplib.HTTPSConnection(host, timeout=timeout)
+         else:
+             self._connection = httplib.HTTPConnection(host, timeout=timeout)
  
-   def __init__(self, host, https=True, timeout=5):
-     if https:  # https connection
-       self._connection = httplib.HTTPSConnection(host, timeout=timeout)
-     else:
-       self._connection = httplib.HTTPConnection(host, timeout=timeout)
- 
-   def connect(self):
-     self._connection.connect()
+     def connect(self):
+         self._connection.connect()
  
-   def close(self):
-     self._connection.close()
+     def close(self):
+         self._connection.close()
  
-   def request(self, method, url, body={}, headers={}):
-     """
-     http request wrapper function, with retry capability in case of error.
-     catch error exception and store it in AsyncResponse object
-     return AsyncResponse object
+     def request(self, method, url, body={}, headers={}):
+         """
+         http request wrapper function, with retry capability in case of error.
+         catch error exception and store it in AsyncResponse object
+         return AsyncResponse object
  
-     Args:
-       method: http method, type str
-       url: url path, type str
-       body: http request body content, type dict
-       header: http request header , type dict
-     """
+         Args:
+           method: http method, type str
+           url: url path, type str
+           body: http request body content, type dict
+           header: http request header , type dict
+         """
  
-     response = AsyncResponse()
+         response = AsyncResponse()
  
-     try:
-       # number of retry in case of error (minimum 0 means no retry)
-       retry_limit = MAX_RETRY
-       mod_headers = dict(headers)  # copy the headers
-       mod_headers["Connection"] = "keep-alive"
-       enc_body = None
-       if body:  # if body is not empty
-         #enc_body = urlencode(body)
-         #mod_headers[
-         #  "Content-type"] = "application/x-www-form-urlencoded"
-         enc_body = json.dumps(body)
-         mod_headers[
-           "Content-type"] = "application/json"
-         #mod_headers["Accept"] = "text/plain"
-     except Exception as e:
-       response.set_error(e)
-       return response
- 
-     if DEBUG_LOG:
-       logger.debug("Request m:%s u:%s h:%s b:%s", method, url,
-              mod_headers, enc_body)
-     # retry loop
-     for i in xrange(retry_limit + 1):
-       try:
-         if i != 0:
-           if DEBUG_LOG:
-             logger.debug("retry request %s times" % i)
-         if self._connection.sock is None:
-           self._connection.connect()
-         self._connection.request(method, url, enc_body, mod_headers)
-       except Exception as e:
-         self._connection.close()
-         if i == retry_limit:
-           # new copy of e created everytime??
-           response.set_error(e)
-       else:  # NOTE: this is try's else clause
-         # connect() and request() OK
          try:
-           resp = self._connection.getresponse()
+             # number of retry in case of error (minimum 0 means no retry)
+             retry_limit = MAX_RETRY
+             mod_headers = dict(headers)  # copy the headers
+             mod_headers["Connection"] = "keep-alive"
+             enc_body = None
+             if body:  # if body is not empty
+                 # enc_body = urlencode(body)
+                 # mod_headers[
+                 # "Content-type"] = "application/x-www-form-urlencoded"
+                 enc_body = json.dumps(body)
+                 mod_headers[
+                     "Content-type"] = "application/json"
+                 # mod_headers["Accept"] = "text/plain"
          except Exception as e:
-           self._connection.close()
-           if i == retry_limit:
              response.set_error(e)
-         else:  # NOTE: this is try's else clause
-           # getresponse() OK
-           resp_version = resp.version  # int
-           resp_status = resp.status  # int
-           resp_reason = resp.reason  # str
-           # resp.getheaders() returns list of tuples
-           # converted to dict format
-           resp_headers = dict(resp.getheaders())
-           # NOTE: have to read the response before sending out next
-           # http request
-           resp_body = resp.read()  # str
-           response.set_resp(version=resp_version, status=resp_status,
-                     reason=resp_reason, headers=resp_headers,
-                     body=resp_body)
-           break  # exit retry loop
-     # end of retry loop
-     if DEBUG_LOG:
-       logger.debug("Response %s", response)
-     return response  # AsyncResponse object
+             return response
+ 
+         if DEBUG_LOG:
+             logger.debug("Request m:%s u:%s h:%s b:%s", method, url,
+                          mod_headers, enc_body)
+         # retry loop
+         for i in xrange(retry_limit + 1):
+             try:
+                 if i != 0:
+                     if DEBUG_LOG:
+                         logger.debug("retry request %s times" % i)
+                 if self._connection.sock is None:
+                     self._connection.connect()
+                 self._connection.request(method, url, enc_body, mod_headers)
+             except Exception as e:
+                 self._connection.close()
+                 if i == retry_limit:
+                     # new copy of e created everytime??
+                     response.set_error(e)
+             else:  # NOTE: this is try's else clause
+                 # connect() and request() OK
+                 try:
+                     resp = self._connection.getresponse()
+                 except Exception as e:
+                     self._connection.close()
+                     if i == retry_limit:
+                         response.set_error(e)
+                 else:  # NOTE: this is try's else clause
+                     # getresponse() OK
+                     resp_version = resp.version  # int
+                     resp_status = resp.status  # int
+                     resp_reason = resp.reason  # str
+                     # resp.getheaders() returns list of tuples
+                     # converted to dict format
+                     resp_headers = dict(resp.getheaders())
+                     # NOTE: have to read the response before sending out next
+                     # http request
+                     resp_body = resp.read()  # str
+                     response.set_resp(version=resp_version, 
status=resp_status,
+                                       reason=resp_reason, 
headers=resp_headers,
+                                       body=resp_body)
+                     break  # exit retry loop
+         # end of retry loop
+         if DEBUG_LOG:
+             logger.debug("Response %s", response)
+         return response  # AsyncResponse object
  
  
  def connection_worker(host, request_queue, https=True, timeout=5, loop=True):

http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/7b6e210d/setup.py
----------------------------------------------------------------------
diff --cc setup.py
index 15163d1,f0e1038..67b12e3
--- a/setup.py
+++ b/setup.py
@@@ -10,7 -10,7 +10,7 @@@ __license__ = "Apache License, Version 
  
  setup(
      name='PredictionIO',
-     version="0.9.2",
 -    version="0.8.5",
++    version="0.9.6",
      author=__author__,
      author_email=__email__,
      packages=['predictionio'],

Reply via email to