http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/bc678328/predictionio/__init__.py
----------------------------------------------------------------------
diff --git a/predictionio/__init__.py b/predictionio/__init__.py
index 45a379f..f4eb87a 100644
--- a/predictionio/__init__.py
+++ b/predictionio/__init__.py
@@ -4,21 +4,17 @@ The PredictoinIO Python SDK provides easy-to-use functions 
for integrating
 Python applications with PredictionIO REST API services.
 """
 
-
-__version__ = "0.8.3"
-
-# import deprecated libraries.
-from predictionio.obsolete import Client
+__version__ = "0.8.5"
 
 # import packages
 import re
+
 try:
-  import httplib
+    import httplib
 except ImportError:
-  # pylint: disable=F0401
-  # http is a Python3 module, replacing httplib
-  from http import client as httplib
-import json
+    # pylint: disable=F0401
+    # http is a Python3 module, replacing httplib
+    from http import client as httplib
 import urllib
 
 from datetime import datetime
@@ -26,433 +22,434 @@ import pytz
 
 from predictionio.connection import Connection
 from predictionio.connection import AsyncRequest
-from predictionio.connection import AsyncResponse
+from predictionio.connection import AsyncResponse  # noqa
 from predictionio.connection import PredictionIOAPIError
 
 
 class NotCreatedError(PredictionIOAPIError):
-  pass
+    pass
 
 
 class NotFoundError(PredictionIOAPIError):
-  pass
+    pass
 
 
 def event_time_validation(t):
-  """ Validate event_time according to EventAPI Specification.
-  """
+    """ Validate event_time according to EventAPI Specification.
+    """
 
-  if t is None:
-    return datetime.now(pytz.utc)
+    if t is None:
+        return datetime.now(pytz.utc)
 
-  if type(t) != datetime:
-    raise AttributeError("event_time must be datetime.datetime")
+    if type(t) != datetime:
+        raise AttributeError("event_time must be datetime.datetime")
 
-  if t.tzinfo is None:
-    raise AttributeError("event_time must have tzinfo")
+    if t.tzinfo is None:
+        raise AttributeError("event_time must have tzinfo")
 
-  return t
+    return t
 
 
 class BaseClient(object):
-  def __init__(self, url, threads=1, qsize=0, timeout=5):
-    """Constructor of Client object.
+    def __init__(self, url, threads=1, qsize=0, timeout=5):
+        """Constructor of Client object.
+
+        """
+        self.threads = threads
+        self.url = url
+        self.qsize = qsize
+        self.timeout = timeout
+
+        # check connection type
+        https_pattern = r'^https://(.*)'
+        http_pattern = r'^http://(.*)'
+        m = re.match(https_pattern, url)
+        self.https = True
+        if m is None:  # not matching https
+            m = re.match(http_pattern, url)
+            self.https = False
+            if m is None:  # not matching http either
+                raise InvalidArgumentError("url is not valid: %s" % url)  # 
noqa
+        self.host = m.group(1)
+
+        self._uid = None  # identified uid
+        self._connection = Connection(host=self.host, threads=self.threads,
+                                      qsize=self.qsize, https=self.https,
+                                      timeout=self.timeout)
+
+    def close(self):
+        """Close this client and the connection.
+
+        Call this method when you want to completely terminate the connection
+        with PredictionIO.
+        It will wait for all pending requests to finish.
+        """
+        self._connection.close()
+
+    def pending_requests(self):
+        """Return the number of pending requests.
+
+        :returns:
+          The number of pending requests of this client.
+        """
+        return self._connection.pending_requests()
+
+    def get_status(self):
+        """Get the status of the PredictionIO API Server
+
+        :returns:
+          status message.
+
+        :raises:
+          ServerStatusError.
+        """
+        path = "/"
+        request = AsyncRequest("GET", path)
+        request.set_rfunc(self._aget_resp)
+        self._connection.make_request(request)
+        result = request.get_response()
+        return result
+
+    def _acreate_resp(self, response):
+        if response.error is not None:
+            raise NotCreatedError("Exception happened: %s for request %s" %
+                                  (response.error, response.request))
+        elif response.status != httplib.CREATED:
+            raise NotCreatedError("request: %s status: %s body: %s" %
+                                  (response.request, response.status,
+                                   response.body))
+
+        return response
+
+    def _aget_resp(self, response):
+        if response.error is not None:
+            raise NotFoundError("Exception happened: %s for request %s" %
+                                (response.error, response.request))
+        elif response.status != httplib.OK:
+            raise NotFoundError("request: %s status: %s body: %s" %
+                                (response.request, response.status,
+                                 response.body))
+
+        return response.json_body
+
+    def _adelete_resp(self, response):
+        if response.error is not None:
+            raise NotFoundError("Exception happened: %s for request %s" %
+                                (response.error, response.request))
+        elif response.status != httplib.OK:
+            raise NotFoundError("request: %s status: %s body: %s" %
+                                (response.request, response.status,
+                                 response.body))
+
+        return response.body
 
-    """
-    self.threads = threads
-    self.url = url
-    self.qsize = qsize
-    self.timeout = timeout
-
-    # check connection type
-    https_pattern = r'^https://(.*)'
-    http_pattern = r'^http://(.*)'
-    m = re.match(https_pattern, url)
-    self.https = True
-    if m is None:  # not matching https
-      m = re.match(http_pattern, url)
-      self.https = False
-      if m is None:  # not matching http either
-        raise InvalidArgumentError("url is not valid: %s" % url)
-    self.host = m.group(1)
-
-    self._uid = None  # identified uid
-    self._connection = Connection(host=self.host, threads=self.threads,
-                    qsize=self.qsize, https=self.https,
-                    timeout=self.timeout)
-
-  def close(self):
-    """Close this client and the connection.
-
-    Call this method when you want to completely terminate the connection
-    with PredictionIO.
-    It will wait for all pending requests to finish.
-    """
-    self._connection.close()
 
-  def pending_requests(self):
-    """Return the number of pending requests.
-
-    :returns:
-      The number of pending requests of this client.
+class EventClient(BaseClient):
+    """Client for importing data into PredictionIO Event Server.
+
+    Notice that app_id has been deprecated as of 0.8.2. Please use access_token
+    instead.
+
+    :param access_key: the access key for your application.
+    :param url: the url of PredictionIO Event Server.
+    :param threads: number of threads to handle PredictionIO API requests.
+            Must be >= 1.
+    :param qsize: the max size of the request queue (optional).
+        The asynchronous request becomes blocking once this size has been
+        reached, until the queued requests are handled.
+        Default value is 0, which means infinite queue size.
+    :param timeout: timeout for HTTP connection attempts and requests in
+      seconds (optional).
+      Default value is 5.
     """
-    return self._connection.pending_requests()
 
-  def get_status(self):
-    """Get the status of the PredictionIO API Server
+    def __init__(self, access_key,
+                 url="http://localhost:7070";,
+                 threads=1, qsize=0, timeout=5):
+        assert type(access_key) is str, ("access_key must be string. "
+                                         "Notice that app_id has been 
deprecated in Prediction.IO 0.8.2. "
+                                         "Please use access_key instead.")
+
+        super(EventClient, self).__init__(url, threads, qsize, timeout)
+
+        if len(access_key) <= 8:
+            raise DeprecationWarning(
+                "It seems like you are specifying an app_id. It is deprecated 
in "
+                "Prediction.IO 0.8.2. Please use access_key instead. Or, "
+                "you may use an earlier version of this sdk.")
+
+        self.access_key = access_key
+
+    def acreate_event(self, event, entity_type, entity_id,
+                      target_entity_type=None, target_entity_id=None, 
properties=None,
+                      event_time=None):
+        """Asynchronously create an event.
+
+        :param event: event name. type str.
+        :param entity_type: entity type. It is the namespace of the entityId 
and
+          analogous to the table name of a relational database. The entityId 
must be
+          unique within same entityType. type str.
+        :param entity_id: entity id. *entity_type-entity_id* becomes the unique
+          identifier of the entity. For example, you may have entityType named 
user,
+          and different entity IDs, say 1 and 2. In this case, user-1 and 
user-2
+          uniquely identifies entities. type str
+        :param target_entity_type: target entity type. type str.
+        :param target_entity_id: target entity id. type str.
+        :param properties: a custom dict associated with an event. type dict.
+        :param event_time: the time of the event. type datetime, must contain
+          timezone info.
+
+        :returns:
+          AsyncRequest object. You can call the get_response() method using 
this
+          object to get the final resuls or status of this asynchronous 
request.
+        """
+        data = {
+            "event": event,
+            "entityType": entity_type,
+            "entityId": entity_id,
+        }
 
-    :returns:
-      status message.
+        if target_entity_type is not None:
+            data["targetEntityType"] = target_entity_type
+
+        if target_entity_id is not None:
+            data["targetEntityId"] = target_entity_id
+
+        if properties is not None:
+            data["properties"] = properties
+
+        et = event_time_validation(event_time)
+        # EventServer uses milliseconds, but python datetime class uses micro. 
Hence
+        # need to skip the last three digits.
+        et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
+        data["eventTime"] = et_str
+
+        path = "/events.json?accessKey=%s" % (self.access_key, )
+        request = AsyncRequest("POST", path, **data)
+        request.set_rfunc(self._acreate_resp)
+        self._connection.make_request(request)
+        return request
+
+    def create_event(self, event, entity_type, entity_id,
+                     target_entity_type=None, target_entity_id=None, 
properties=None,
+                     event_time=None):
+        """Synchronously (blocking) create an event."""
+        return self.acreate_event(event, entity_type, entity_id,
+                                  target_entity_type, target_entity_id, 
properties,
+                                  event_time).get_response()
+
+    def aget_event(self, event_id):
+        """Asynchronouly get an event from Event Server.
+
+        :param event_id: event id returned by the EventServer when creating the
+          event.
+
+        :returns:
+          AsyncRequest object.
+        """
+        enc_event_id = urllib.quote(event_id, "")  # replace special char with 
%xx
+        path = "/events/%s.json" % enc_event_id
+        request = AsyncRequest("GET", path, accessKey=self.access_key)
+        request.set_rfunc(self._aget_resp)
+        self._connection.make_request(request)
+        return request
+
+    def get_event(self, event_id):
+        """Synchronouly get an event from Event Server."""
+        return self.aget_event(event_id).get_response()
+
+    def adelete_event(self, event_id):
+        """Asynchronouly delete an event from Event Server.
+
+        :param event_id: event id returned by the EventServer when creating the
+          event.
+
+        :returns:
+          AsyncRequest object.
+        """
+        enc_event_id = urllib.quote(event_id, "")  # replace special char with 
%xx
+        path = "/events/%s.json" % (enc_event_id, )
+        request = AsyncRequest("DELETE", path, accessKey=self.access_key)
+        request.set_rfunc(self._adelete_resp)
+        self._connection.make_request(request)
+        return request
+
+    def delete_event(self, event_id):
+        """Synchronouly delete an event from Event Server."""
+        return self.adelete_event(event_id).get_response()
+
+    # Below are helper functions
+
+    def aset_user(self, uid, properties={}, event_time=None):
+        """Set properties of a user.
+
+        Wrapper of acreate_event function, setting event to "$set" and 
entity_type
+        to "user".
+        """
+        return self.acreate_event(
+            event="$set",
+            entity_type="user",
+            entity_id=uid,
+            properties=properties,
+            event_time=event_time,
+        )
 
-    :raises:
-      ServerStatusError.
-    """
-    path = "/"
-    request = AsyncRequest("GET", path)
-    request.set_rfunc(self._aget_resp)
-    self._connection.make_request(request)
-    result = request.get_response()
-    return result
-
-  def _acreate_resp(self, response):
-    if response.error is not None:
-      raise NotCreatedError("Exception happened: %s for request %s" %
-                    (response.error, response.request))
-    elif response.status != httplib.CREATED:
-      raise NotCreatedError("request: %s status: %s body: %s" %
-                    (response.request, response.status,
-                     response.body))
-
-    return response
-
-  def _aget_resp(self, response):
-    if response.error is not None:
-      raise NotFoundError("Exception happened: %s for request %s" %
-                  (response.error, response.request))
-    elif response.status != httplib.OK:
-      raise NotFoundError("request: %s status: %s body: %s" %
-                  (response.request, response.status,
-                   response.body))
-
-    return response.json_body
-
-  def _adelete_resp(self, response):
-    if response.error is not None:
-      raise NotFoundError("Exception happened: %s for request %s" %
-                  (response.error, response.request))
-    elif response.status != httplib.OK:
-      raise NotFoundError("request: %s status: %s body: %s" %
-                  (response.request, response.status,
-                   response.body))
-
-    return response.body
+    def set_user(self, uid, properties={}, event_time=None):
+        """Set properties of a user"""
+        return self.aset_user(uid, properties, event_time).get_response()
+
+    def aunset_user(self, uid, properties, event_time=None):
+        """Unset properties of an user.
+
+        Wrapper of acreate_event function, setting event to "$unset" and 
entity_type
+        to "user".
+        """
+        # check properties={}, it cannot be empty
+        return self.acreate_event(
+            event="$unset",
+            entity_type="user",
+            entity_id=uid,
+            properties=properties,
+            event_time=event_time,
+        )
 
+    def unset_user(self, uid, properties, event_time=None):
+        """Set properties of a user"""
+        return self.aunset_user(uid, properties, event_time).get_response()
+
+    def adelete_user(self, uid, event_time=None):
+        """Delete a user.
+
+        Wrapper of acreate_event function, setting event to "$delete" and 
entity_type
+        to "user".
+        """
+        return self.acreate_event(
+            event="$delete",
+            entity_type="user",
+            entity_id=uid,
+            event_time=event_time)
+
+    def delete_user(self, uid, event_time=None):
+        """Delete a user."""
+        return self.adelete_user(uid, event_time).get_response()
+
+    def aset_item(self, iid, properties={}, event_time=None):
+        """Set properties of an item.
+
+        Wrapper of acreate_event function, setting event to "$set" and 
entity_type
+        to "item".
+        """
+        return self.acreate_event(
+            event="$set",
+            entity_type="item",
+            entity_id=iid,
+            properties=properties,
+            event_time=event_time)
+
+    def set_item(self, iid, properties={}, event_time=None):
+        """Set properties of an item."""
+        return self.aset_item(iid, properties, event_time).get_response()
+
+    def aunset_item(self, iid, properties={}, event_time=None):
+        """Unset properties of an item.
+
+        Wrapper of acreate_event function, setting event to "$unset" and 
entity_type
+        to "item".
+        """
+        return self.acreate_event(
+            event="$unset",
+            entity_type="item",
+            entity_id=iid,
+            properties=properties,
+            event_time=event_time)
+
+    def unset_item(self, iid, properties={}, event_time=None):
+        """Unset properties of an item."""
+        return self.aunset_item(iid, properties, event_time).get_response()
+
+    def adelete_item(self, iid, event_time=None):
+        """Delete an item.
+
+        Wrapper of acreate_event function, setting event to "$delete" and 
entity_type
+        to "item".
+        """
+        return self.acreate_event(
+            event="$delete",
+            entity_type="item",
+            entity_id=iid,
+            event_time=event_time)
+
+    def delete_item(self, iid, event_time=None):
+        """Delete an item."""
+        return self.adelete_item(iid, event_time).get_response()
+
+    def arecord_user_action_on_item(self, action, uid, iid, properties={},
+                                    event_time=None):
+        """Create a user-to-item action.
+
+        Wrapper of acreate_event function, setting entity_type to "user" and
+        target_entity_type to "item".
+        """
+        return self.acreate_event(
+            event=action,
+            entity_type="user",
+            entity_id=uid,
+            target_entity_type="item",
+            target_entity_id=iid,
+            properties=properties,
+            event_time=event_time)
+
+    def record_user_action_on_item(self, action, uid, iid, properties={},
+                                   event_time=None):
+        """Create a user-to-item action."""
+        return self.arecord_user_action_on_item(
+            action, uid, iid, properties, event_time).get_response()
 
-class EventClient(BaseClient):
-  """Client for importing data into PredictionIO Event Server.
-
-  Notice that app_id has been deprecated as of 0.8.2. Please use access_token
-  instead.
-
-  :param access_key: the access key for your application.
-  :param url: the url of PredictionIO Event Server.
-  :param threads: number of threads to handle PredictionIO API requests.
-          Must be >= 1.
-  :param qsize: the max size of the request queue (optional).
-      The asynchronous request becomes blocking once this size has been
-      reached, until the queued requests are handled.
-      Default value is 0, which means infinite queue size.
-  :param timeout: timeout for HTTP connection attempts and requests in
-    seconds (optional).
-    Default value is 5.
-  """
-
-  def __init__(self, access_key, 
-      url="http://localhost:7070";,
-      threads=1, qsize=0, timeout=5):
-    assert type(access_key) is str, ("access_key must be string. "
-        "Notice that app_id has been deprecated in Prediction.IO 0.8.2. "
-        "Please use access_key instead.")
-
-    super(EventClient, self).__init__(url, threads, qsize, timeout)
-
-    if len(access_key) <= 8:
-      raise DeprecationWarning(
-          "It seems like you are specifying an app_id. It is deprecated in "
-          "Prediction.IO 0.8.2. Please use access_key instead. Or, "
-          "you may use an earlier version of this sdk.")
-
-    self.access_key = access_key
-
-  def acreate_event(self, event, entity_type, entity_id,
-      target_entity_type=None, target_entity_id=None, properties=None,
-      event_time=None):
-    """Asynchronously create an event.
-
-    :param event: event name. type str.
-    :param entity_type: entity type. It is the namespace of the entityId and
-      analogous to the table name of a relational database. The entityId must 
be
-      unique within same entityType. type str.
-    :param entity_id: entity id. *entity_type-entity_id* becomes the unique
-      identifier of the entity. For example, you may have entityType named 
user,
-      and different entity IDs, say 1 and 2. In this case, user-1 and user-2
-      uniquely identifies entities. type str
-    :param target_entity_type: target entity type. type str.
-    :param target_entity_id: target entity id. type str.
-    :param properties: a custom dict associated with an event. type dict.
-    :param event_time: the time of the event. type datetime, must contain
-      timezone info. 
-
-    :returns:
-      AsyncRequest object. You can call the get_response() method using this
-      object to get the final resuls or status of this asynchronous request.
-    """
-    data = {
-        "event": event,
-        "entityType": entity_type,
-        "entityId": entity_id,
-        }
 
-    if target_entity_type is not None:
-      data["targetEntityType"] = target_entity_type
-
-    if target_entity_id is not None:
-      data["targetEntityId"] = target_entity_id
-
-    if properties is not None:
-      data["properties"] = properties
-
-    et = event_time_validation(event_time)
-    # EventServer uses milliseconds, but python datetime class uses micro. 
Hence
-    # need to skip the last three digits.
-    et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
-    data["eventTime"] = et_str
-    
-    path = "/events.json?accessKey=%s" % (self.access_key, )
-    request = AsyncRequest("POST", path, **data)
-    request.set_rfunc(self._acreate_resp)
-    self._connection.make_request(request)
-    return request
-
-  def create_event(self, event, entity_type, entity_id,
-      target_entity_type=None, target_entity_id=None, properties=None,
-      event_time=None):
-    """Synchronously (blocking) create an event."""
-    return self.acreate_event(event, entity_type, entity_id,
-        target_entity_type, target_entity_id, properties, 
-        event_time).get_response()
-
-  def aget_event(self, event_id):
-    """Asynchronouly get an event from Event Server.
-
-    :param event_id: event id returned by the EventServer when creating the
-      event.
-
-    :returns:
-      AsyncRequest object. 
+class EngineClient(BaseClient):
+    """Client for extracting prediction results from an PredictionIO Engine
+    Instance.
+
+    :param url: the url of the PredictionIO Engine Instance.
+    :param threads: number of threads to handle PredictionIO API requests.
+            Must be >= 1.
+    :param qsize: the max size of the request queue (optional).
+        The asynchronous request becomes blocking once this size has been
+        reached, until the queued requests are handled.
+        Default value is 0, which means infinite queue size.
+    :param timeout: timeout for HTTP connection attempts and requests in
+      seconds (optional).
+      Default value is 5.
+
     """
-    enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
-    path = "/events/%s.json" % enc_event_id
-    request = AsyncRequest("GET", path, accessKey=self.access_key)
-    request.set_rfunc(self._aget_resp)
-    self._connection.make_request(request)
-    return request
 
-  def get_event(self, event_id):
-    """Synchronouly get an event from Event Server."""
-    return self.aget_event(event_id).get_response()
+    def __init__(self, url="http://localhost:8000";, threads=1,
+                 qsize=0, timeout=5):
+        super(EngineClient, self).__init__(url, threads, qsize, timeout)
 
-  def adelete_event(self, event_id):
-    """Asynchronouly delete an event from Event Server.
+    def asend_query(self, data):
+        """Asynchronously send a request to the engine instance with data as 
the
+        query.
 
-    :param event_id: event id returned by the EventServer when creating the
-      event.
+        :param data: the query: It is coverted to an json object using 
json.dumps
+          method. type dict.
 
-    :returns:
-      AsyncRequest object. 
-    """
-    enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
-    path = "/events/%s.json" % (enc_event_id, )
-    request = AsyncRequest("DELETE", path, accessKey=self.access_key)
-    request.set_rfunc(self._adelete_resp)
-    self._connection.make_request(request)
-    return request
-
-  def delete_event(self, event_id):
-    """Synchronouly delete an event from Event Server."""
-    return self.adelete_event(event_id).get_response()
-
-  ## Below are helper functions
-
-  def aset_user(self, uid, properties={}, event_time=None):
-    """Set properties of a user.
-   
-    Wrapper of acreate_event function, setting event to "$set" and entity_type
-    to "user".
-    """
-    return self.acreate_event(
-      event="$set",
-      entity_type="user",
-      entity_id=uid,
-      properties=properties,
-      event_time=event_time,
-    )
-
-  def set_user(self, uid, properties={}, event_time=None):
-    """Set properties of a user"""
-    return self.aset_user(uid, properties, event_time).get_response()
-
-  def aunset_user(self, uid, properties, event_time=None):
-    """Unset properties of an user.
-   
-    Wrapper of acreate_event function, setting event to "$unset" and 
entity_type
-    to "user".
-    """
-    # check properties={}, it cannot be empty
-    return self.acreate_event(
-        event="$unset",
-        entity_type="user",
-        entity_id=uid,
-        properties=properties,
-        event_time=event_time,
-        )
+        :returns:
+          AsyncRequest object. You can call the get_response() method using 
this
+          object to get the final resuls or status of this asynchronous 
request.
+        """
+        path = "/queries.json"
+        request = AsyncRequest("POST", path, **data)
+        request.set_rfunc(self._aget_resp)
+        self._connection.make_request(request)
+        return request
 
-  def unset_user(self, uid, properties, event_time=None):
-    """Set properties of a user"""
-    return self.aunset_user(uid, properties, event_time).get_response()
-
-  def adelete_user(self, uid, event_time=None):
-    """Delete a user.
-   
-    Wrapper of acreate_event function, setting event to "$delete" and 
entity_type
-    to "user".
-    """
-    return self.acreate_event(
-        event="$delete",
-        entity_type="user",
-        entity_id=uid,
-        event_time=event_time)
-
-  def delete_user(self, uid, event_time=None):
-    """Delete a user."""
-    return self.adelete_user(uid, event_time).get_response()
-
-  def aset_item(self, iid, properties={}, event_time=None):
-    """Set properties of an item.
-   
-    Wrapper of acreate_event function, setting event to "$set" and entity_type
-    to "item".
-    """
-    return self.acreate_event(
-        event="$set",
-        entity_type="item",
-        entity_id=iid,
-        properties=properties,
-        event_time=event_time)
-
-  def set_item(self, iid, properties={}, event_time=None):
-    """Set properties of an item."""
-    return self.aset_item(iid, properties, event_time).get_response()
-
-  def aunset_item(self, iid, properties={}, event_time=None):
-    """Unset properties of an item.
-   
-    Wrapper of acreate_event function, setting event to "$unset" and 
entity_type
-    to "item".
-    """
-    return self.acreate_event(
-        event="$unset",
-        entity_type="item",
-        entity_id=iid,
-        properties=properties,
-        event_time=event_time)
-
-  def unset_item(self, iid, properties={}, event_time=None):
-    """Unset properties of an item."""
-    return self.aunset_item(iid, properties, event_time).get_response()
-
-  def adelete_item(self, iid, event_time=None):
-    """Delete an item.
-   
-    Wrapper of acreate_event function, setting event to "$delete" and 
entity_type
-    to "item".
-    """
-    return self.acreate_event(
-        event="$delete",
-        entity_type="item",
-        entity_id=iid,
-        event_time=event_time)
-
-  def delete_item(self, iid, event_time=None):
-    """Delete an item."""
-    return self.adelete_item(iid, event_time).get_response()
-
-  def arecord_user_action_on_item(self, action, uid, iid, properties={},
-      event_time=None):
-    """Create a user-to-item action.
-
-    Wrapper of acreate_event function, setting entity_type to "user" and
-    target_entity_type to "item".
-    """
-    return self.acreate_event(
-        event=action,
-        entity_type="user",
-        entity_id=uid,
-        target_entity_type="item",
-        target_entity_id=iid,
-        properties=properties,
-        event_time=event_time)
-
-  def record_user_action_on_item(self, action, uid, iid, properties={},
-      event_time=None):
-    """Create a user-to-item action."""
-    return self.arecord_user_action_on_item(
-      action, uid, iid, properties, event_time).get_response()
+    def send_query(self, data):
+        """Synchronously send a request.
 
+        :param data: the query: It is coverted to an json object using 
json.dumps
+          method. type dict.
 
-class EngineClient(BaseClient):
-  """Client for extracting prediction results from an PredictionIO Engine
-  Instance.
-  
-  :param url: the url of the PredictionIO Engine Instance.
-  :param threads: number of threads to handle PredictionIO API requests.
-          Must be >= 1.
-  :param qsize: the max size of the request queue (optional).
-      The asynchronous request becomes blocking once this size has been
-      reached, until the queued requests are handled.
-      Default value is 0, which means infinite queue size.
-  :param timeout: timeout for HTTP connection attempts and requests in
-    seconds (optional).
-    Default value is 5.
-  
-  """
-  def __init__(self, url="http://localhost:8000";, threads=1,
-      qsize=0, timeout=5):
-    super(EngineClient, self).__init__(url, threads, qsize, timeout)
-
-  def asend_query(self, data):
-    """Asynchronously send a request to the engine instance with data as the
-    query.
-
-    :param data: the query: It is coverted to an json object using json.dumps
-      method. type dict. 
-
-    :returns:
-      AsyncRequest object. You can call the get_response() method using this
-      object to get the final resuls or status of this asynchronous request.
-    """
-    path = "/queries.json"
-    request = AsyncRequest("POST", path, **data)
-    request.set_rfunc(self._aget_resp)
-    self._connection.make_request(request)
-    return request
-
-  def send_query(self, data):
-    """Synchronously send a request.
-    
-    :param data: the query: It is coverted to an json object using json.dumps
-      method. type dict. 
-
-    :returns: the prediction.
-    """
-    return self.asend_query(data).get_response()
+        :returns: the prediction.
+        """
+        return self.asend_query(data).get_response()

http://git-wip-us.apache.org/repos/asf/incubator-predictionio-sdk-python/blob/bc678328/predictionio/connection.py
----------------------------------------------------------------------
diff --git a/predictionio/connection.py b/predictionio/connection.py
index 2c79d64..8c231f4 100644
--- a/predictionio/connection.py
+++ b/predictionio/connection.py
@@ -1,23 +1,22 @@
-
 try:
-  import Queue
+    import Queue
 except ImportError:
-  # pylint: disable=F0401
-  # http is a Python3 module, replacing httplib. Ditto.
-  import queue as Queue
+    # pylint: disable=F0401
+    # http is a Python3 module, replacing httplib. Ditto.
+    import queue as Queue
 import threading
 
 try:
-  import httplib
+    import httplib
 except ImportError:
-  # pylint: disable=F0401
-  from http import client as httplib
+    # pylint: disable=F0401
+    from http import client as httplib
 
 try:
-  from urllib import urlencode
+    from urllib import urlencode
 except ImportError:
-  # pylint: disable=F0401,E0611
-  from urllib.parse import urlencode
+    # pylint: disable=F0401,E0611
+    from urllib.parse import urlencode
 
 import datetime
 import json
@@ -25,9 +24,9 @@ import logging
 
 # use generators for python2 and python3
 try:
-  xrange
+    xrange
 except NameError:
-  xrange = range
+    xrange = range
 
 # some constants
 MAX_RETRY = 1  # 0 means no retry
@@ -39,332 +38,329 @@ DEBUG_LOG = False
 
 
 def enable_log(filename=None):
-  global logger
-  global DEBUG_LOG
-  timestamp = datetime.datetime.today()
-  if not filename:
-    logfile = "./log/predictionio_%s.log" % timestamp.strftime(
-      "%Y-%m-%d_%H:%M:%S.%f")
-  else:
-    logfile = filename
-  logging.basicConfig(filename=logfile,
-            filemode='w',
-            level=logging.DEBUG,
-            format='[%(levelname)s] %(name)s (%(threadName)s) %(message)s')
-  logger = logging.getLogger(__name__)
-  DEBUG_LOG = True
+    global logger
+    global DEBUG_LOG
+    timestamp = datetime.datetime.today()
+    if not filename:
+        logfile = "./log/predictionio_%s.log" % timestamp.strftime(
+            "%Y-%m-%d_%H:%M:%S.%f")
+    else:
+        logfile = filename
+    logging.basicConfig(filename=logfile,
+                        filemode='w',
+                        level=logging.DEBUG,
+                        format='[%(levelname)s] %(name)s (%(threadName)s) 
%(message)s')
+    logger = logging.getLogger(__name__)
+    DEBUG_LOG = True
 
 
 class PredictionIOAPIError(Exception):
-  pass
+    pass
 
 
 class NotSupportMethodError(PredictionIOAPIError):
-  pass
+    pass
 
 
 class ProgramError(PredictionIOAPIError):
-  pass
+    pass
 
 
 class AsyncRequest(object):
+    """AsyncRequest object
 
-  """AsyncRequest object
+    """
 
-  """
+    def __init__(self, method, path, **params):
+        self.method = method  # "GET" "POST" etc
+        # the sub path eg. POST /v1/users.json  GET /v1/users/1.json
+        self.path = path
+        # dictionary format eg. {"appkey" : 123, "id" : 3}
+        self.params = params
+        # use queue to implement response, store AsyncResponse object
+        self.response_q = Queue.Queue(1)
+        self.qpath = "%s?%s" % (self.path, urlencode(self.params))
+        self._response = None
+        # response function to be called to handle the response
+        self.rfunc = None
 
-  def __init__(self, method, path, **params):
-    self.method = method  # "GET" "POST" etc
-    # the sub path eg. POST /v1/users.json  GET /v1/users/1.json
-    self.path = path
-    # dictionary format eg. {"appkey" : 123, "id" : 3}
-    self.params = params
-    # use queue to implement response, store AsyncResponse object
-    self.response_q = Queue.Queue(1)
-    self.qpath = "%s?%s" % (self.path, urlencode(self.params))
-    self._response = None
-    # response function to be called to handle the response
-    self.rfunc = None
+    def __str__(self):
+        return "%s %s %s %s" % (self.method, self.path, self.params,
+                                self.qpath)
 
-  def __str__(self):
-    return "%s %s %s %s" % (self.method, self.path, self.params,
-                self.qpath)
+    def set_rfunc(self, func):
+        self.rfunc = func
 
-  def set_rfunc(self, func):
-    self.rfunc = func
+    def set_response(self, response):
+        """ store the response
 
-  def set_response(self, response):
-    """ store the response
+        NOTE: Must be only called once
+        """
+        self.response_q.put(response)
 
-    NOTE: Must be only called once
-    """
-    self.response_q.put(response)
+    def get_response(self):
+        """Get the response. Blocking.
 
-  def get_response(self):
-    """Get the response. Blocking.
+        :returns: self.rfunc's return type.
+        """
+        if self._response is None:
+            tmp_response = self.response_q.get(True)  # NOTE: blocking
+            if self.rfunc is None:
+                self._response = tmp_response
+            else:
+                self._response = self.rfunc(tmp_response)
+
+        return self._response
+
+
+class AsyncResponse(object):
+    """Store the response of asynchronous request
 
-    :returns: self.rfunc's return type.
+    When get the response, user should check if error is None (which means no
+    Exception happens).
+    If error is None, then should check if the status is expected.
     """
-    if self._response is None:
-      tmp_response = self.response_q.get(True)  # NOTE: blocking
-      if self.rfunc is None:
-        self._response = tmp_response
-      else:
-        self._response = self.rfunc(tmp_response)
 
-    return self._response
+    def __init__(self):
+        #: exception object if any happens
+        self.error = None
+
+        self.version = None
+        self.status = None
+        self.reason = None
+        #: Response header. str
+        self.headers = None
+        #: Response body. str
+        self.body = None
+        #: Jsonified response body. Remains None if conversion is unsuccessful.
+        self.json_body = None
+        #: Point back to the AsyncRequest object
+        self.request = None
+
+    def __str__(self):
+        return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version,
+                                                  self.status, self.reason,
+                                                  self.headers, self.body)
+
+    def set_resp(self, version, status, reason, headers, body):
+        self.version = version
+        self.status = status
+        self.reason = reason
+        self.headers = headers
+        self.body = body
+        # Try to extract the json.
+        try:
+            self.json_body = json.loads(body)
+        except ValueError as ex:  # noqa
+            self.json_body = None
 
+    def set_error(self, error):
+        self.error = error
 
-class AsyncResponse(object):
-  """Store the response of asynchronous request
-
-  When get the response, user should check if error is None (which means no
-  Exception happens).
-  If error is None, then should check if the status is expected.
-  """
-
-  def __init__(self):
-    #: exception object if any happens
-    self.error = None
-
-    self.version = None
-    self.status = None
-    self.reason = None
-    #: Response header. str
-    self.headers = None
-    #: Response body. str
-    self.body = None
-    #: Jsonified response body. Remains None if conversion is unsuccessful.
-    self.json_body = None
-    #: Point back to the AsyncRequest object
-    self.request = None  
-
-  def __str__(self):
-    return "e:%s v:%s s:%s r:%s h:%s b:%s" % (self.error, self.version,
-                          self.status, self.reason,
-                          self.headers, self.body)
-
-  def set_resp(self, version, status, reason, headers, body):
-    self.version = version
-    self.status = status
-    self.reason = reason
-    self.headers = headers
-    self.body = body
-    # Try to extract the json.
-    try:
-      self.json_body = json.loads(body)
-    except ValueError as ex:
-      self.json_body = None
-
-  def set_error(self, error):
-    self.error = error
-
-  def set_request(self, request):
-    self.request = request
+    def set_request(self, request):
+        self.request = request
 
 
 class PredictionIOHttpConnection(object):
+    def __init__(self, host, https=True, timeout=5):
+        if https:  # https connection
+            self._connection = httplib.HTTPSConnection(host, timeout=timeout)
+        else:
+            self._connection = httplib.HTTPConnection(host, timeout=timeout)
 
-  def __init__(self, host, https=True, timeout=5):
-    if https:  # https connection
-      self._connection = httplib.HTTPSConnection(host, timeout=timeout)
-    else:
-      self._connection = httplib.HTTPConnection(host, timeout=timeout)
+    def connect(self):
+        self._connection.connect()
 
-  def connect(self):
-    self._connection.connect()
+    def close(self):
+        self._connection.close()
 
-  def close(self):
-    self._connection.close()
+    def request(self, method, url, body={}, headers={}):
+        """
+        http request wrapper function, with retry capability in case of error.
+        catch error exception and store it in AsyncResponse object
+        return AsyncResponse object
 
-  def request(self, method, url, body={}, headers={}):
-    """
-    http request wrapper function, with retry capability in case of error.
-    catch error exception and store it in AsyncResponse object
-    return AsyncResponse object
+        Args:
+          method: http method, type str
+          url: url path, type str
+          body: http request body content, type dict
+          header: http request header , type dict
+        """
 
-    Args:
-      method: http method, type str
-      url: url path, type str
-      body: http request body content, type dict
-      header: http request header , type dict
-    """
+        response = AsyncResponse()
 
-    response = AsyncResponse()
-
-    try:
-      # number of retry in case of error (minimum 0 means no retry)
-      retry_limit = MAX_RETRY
-      mod_headers = dict(headers)  # copy the headers
-      mod_headers["Connection"] = "keep-alive"
-      enc_body = None
-      if body:  # if body is not empty
-        #enc_body = urlencode(body)
-        #mod_headers[
-        #  "Content-type"] = "application/x-www-form-urlencoded"
-        enc_body = json.dumps(body)
-        mod_headers[
-          "Content-type"] = "application/json"
-        #mod_headers["Accept"] = "text/plain"
-    except Exception as e:
-      response.set_error(e)
-      return response
-
-    if DEBUG_LOG:
-      logger.debug("Request m:%s u:%s h:%s b:%s", method, url,
-             mod_headers, enc_body)
-    # retry loop
-    for i in xrange(retry_limit + 1):
-      try:
-        if i != 0:
-          if DEBUG_LOG:
-            logger.debug("retry request %s times" % i)
-        if self._connection.sock is None:
-          self._connection.connect()
-        self._connection.request(method, url, enc_body, mod_headers)
-      except Exception as e:
-        self._connection.close()
-        if i == retry_limit:
-          # new copy of e created everytime??
-          response.set_error(e)
-      else:  # NOTE: this is try's else clause
-        # connect() and request() OK
         try:
-          resp = self._connection.getresponse()
+            # number of retry in case of error (minimum 0 means no retry)
+            retry_limit = MAX_RETRY
+            mod_headers = dict(headers)  # copy the headers
+            mod_headers["Connection"] = "keep-alive"
+            enc_body = None
+            if body:  # if body is not empty
+                # enc_body = urlencode(body)
+                # mod_headers[
+                # "Content-type"] = "application/x-www-form-urlencoded"
+                enc_body = json.dumps(body)
+                mod_headers[
+                    "Content-type"] = "application/json"
+                # mod_headers["Accept"] = "text/plain"
         except Exception as e:
-          self._connection.close()
-          if i == retry_limit:
             response.set_error(e)
-        else:  # NOTE: this is try's else clause
-          # getresponse() OK
-          resp_version = resp.version  # int
-          resp_status = resp.status  # int
-          resp_reason = resp.reason  # str
-          # resp.getheaders() returns list of tuples
-          # converted to dict format
-          resp_headers = dict(resp.getheaders())
-          # NOTE: have to read the response before sending out next
-          # http request
-          resp_body = resp.read()  # str
-          response.set_resp(version=resp_version, status=resp_status,
-                    reason=resp_reason, headers=resp_headers,
-                    body=resp_body)
-          break  # exit retry loop
-    # end of retry loop
-    if DEBUG_LOG:
-      logger.debug("Response %s", response)
-    return response  # AsyncResponse object
+            return response
+
+        if DEBUG_LOG:
+            logger.debug("Request m:%s u:%s h:%s b:%s", method, url,
+                         mod_headers, enc_body)
+        # retry loop
+        for i in xrange(retry_limit + 1):
+            try:
+                if i != 0:
+                    if DEBUG_LOG:
+                        logger.debug("retry request %s times" % i)
+                if self._connection.sock is None:
+                    self._connection.connect()
+                self._connection.request(method, url, enc_body, mod_headers)
+            except Exception as e:
+                self._connection.close()
+                if i == retry_limit:
+                    # new copy of e created everytime??
+                    response.set_error(e)
+            else:  # NOTE: this is try's else clause
+                # connect() and request() OK
+                try:
+                    resp = self._connection.getresponse()
+                except Exception as e:
+                    self._connection.close()
+                    if i == retry_limit:
+                        response.set_error(e)
+                else:  # NOTE: this is try's else clause
+                    # getresponse() OK
+                    resp_version = resp.version  # int
+                    resp_status = resp.status  # int
+                    resp_reason = resp.reason  # str
+                    # resp.getheaders() returns list of tuples
+                    # converted to dict format
+                    resp_headers = dict(resp.getheaders())
+                    # NOTE: have to read the response before sending out next
+                    # http request
+                    resp_body = resp.read()  # str
+                    response.set_resp(version=resp_version, status=resp_status,
+                                      reason=resp_reason, headers=resp_headers,
+                                      body=resp_body)
+                    break  # exit retry loop
+        # end of retry loop
+        if DEBUG_LOG:
+            logger.debug("Response %s", response)
+        return response  # AsyncResponse object
 
 
 def connection_worker(host, request_queue, https=True, timeout=5, loop=True):
-  """worker function which establishes connection and wait for request jobs
-  from the request_queue
-
-  Args:
-    request_queue: the request queue storing the AsyncRequest object
-      valid requests:
-        GET
-        POST
-        DELETE
-        KILL
-    https: HTTPS (True) or HTTP (False)
-    timeout: timeout for HTTP connection attempts and requests in seconds
-    loop: This worker function stays in a loop waiting for request
-      For testing purpose only. should always be set to True.
-  """
-
-  connect = PredictionIOHttpConnection(host, https, timeout)
-
-  # loop waiting for job form request queue
-  killed = not loop
-
-  while True:
-    # print "thread %s waiting for request" % thread.get_ident()
-    request = request_queue.get(True)  # NOTE: blocking get
-    # print "get request %s" % request
-    method = request.method
-    if method == "GET":
-      path = request.qpath
-      d = connect.request("GET", path)
-    elif method == "POST":
-      path = request.path
-      body = request.params
-      d = connect.request("POST", path, body)
-    elif method == "DELETE":
-      path = request.qpath
-      d = connect.request("DELETE", path)
-    elif method == "KILL":
-      # tell the thread to kill the connection
-      killed = True
-      d = AsyncResponse()
-    else:
-      d = AsyncResponse()
-      d.set_error(NotSupportMethodError(
-        "Don't Support the method %s" % method))
+    """worker function which establishes connection and wait for request jobs
+    from the request_queue
 
-    d.set_request(request)
-    request.set_response(d)
-    request_queue.task_done()
-    if killed:
-      break
+    Args:
+      request_queue: the request queue storing the AsyncRequest object
+        valid requests:
+          GET
+          POST
+          DELETE
+          KILL
+      https: HTTPS (True) or HTTP (False)
+      timeout: timeout for HTTP connection attempts and requests in seconds
+      loop: This worker function stays in a loop waiting for request
+        For testing purpose only. should always be set to True.
+    """
 
-  # end of while loop
-  connect.close()
+    connect = PredictionIOHttpConnection(host, https, timeout)
+
+    # loop waiting for job form request queue
+    killed = not loop
+
+    while True:
+        # print "thread %s waiting for request" % thread.get_ident()
+        request = request_queue.get(True)  # NOTE: blocking get
+        # print "get request %s" % request
+        method = request.method
+        if method == "GET":
+            path = request.qpath
+            d = connect.request("GET", path)
+        elif method == "POST":
+            path = request.path
+            body = request.params
+            d = connect.request("POST", path, body)
+        elif method == "DELETE":
+            path = request.qpath
+            d = connect.request("DELETE", path)
+        elif method == "KILL":
+            # tell the thread to kill the connection
+            killed = True
+            d = AsyncResponse()
+        else:
+            d = AsyncResponse()
+            d.set_error(NotSupportMethodError(
+                "Don't Support the method %s" % method))
+
+        d.set_request(request)
+        request.set_response(d)
+        request_queue.task_done()
+        if killed:
+            break
+
+    # end of while loop
+    connect.close()
 
 
 class Connection(object):
+    """abstract object for connection with server
 
-  """abstract object for connection with server
-
-  spawn multiple connection_worker threads to handle jobs in the queue q
-  """
-
-  def __init__(self, host, threads=1, qsize=0, https=True, timeout=5):
-    """constructor
-
-    Args:
-      host: host of the server.
-      threads: type int, number of threads to be spawn
-      qsize: size of the queue q
-      https: indicate it is httpS (True) or http connection (False)
-      timeout: timeout for HTTP connection attempts and requests in
-        seconds
-    """
-    self.host = host
-    self.https = https
-    self.q = Queue.Queue(qsize)  # if qsize=0, means infinite
-    self.threads = threads
-    self.timeout = timeout
-    # start thread based on threads number
-    self.tid = {}  # dictionary of thread object
-
-    for i in xrange(threads):
-      tname = "PredictionIOThread-%s" % i  # thread name
-      self.tid[i] = threading.Thread(
-        target=connection_worker, name=tname,
-        kwargs={'host': self.host, 'request_queue': self.q,
-            'https': self.https, 'timeout': self.timeout})
-      self.tid[i].setDaemon(True)
-      self.tid[i].start()
-
-  def make_request(self, request):
-    """put the request into the q
-    """
-    self.q.put(request)
-
-  def pending_requests(self):
-    """number of pending requests in the queue
+    spawn multiple connection_worker threads to handle jobs in the queue q
     """
-    return self.q.qsize()
-
-  def close(self):
-    """close this Connection. Call this when main program exits
-    """
-    # set kill message to q
-    for i in xrange(self.threads):
-      self.make_request(AsyncRequest("KILL", ""))
-
-    self.q.join()  # wait for q empty
 
-    for i in xrange(self.threads):  # wait for all thread finish
-      self.tid[i].join()
+    def __init__(self, host, threads=1, qsize=0, https=True, timeout=5):
+        """constructor
+
+        Args:
+          host: host of the server.
+          threads: type int, number of threads to be spawn
+          qsize: size of the queue q
+          https: indicate it is httpS (True) or http connection (False)
+          timeout: timeout for HTTP connection attempts and requests in
+            seconds
+        """
+        self.host = host
+        self.https = https
+        self.q = Queue.Queue(qsize)  # if qsize=0, means infinite
+        self.threads = threads
+        self.timeout = timeout
+        # start thread based on threads number
+        self.tid = {}  # dictionary of thread object
+
+        for i in xrange(threads):
+            tname = "PredictionIOThread-%s" % i  # thread name
+            self.tid[i] = threading.Thread(
+                target=connection_worker, name=tname,
+                kwargs={'host': self.host, 'request_queue': self.q,
+                        'https': self.https, 'timeout': self.timeout})
+            self.tid[i].setDaemon(True)
+            self.tid[i].start()
+
+    def make_request(self, request):
+        """put the request into the q
+        """
+        self.q.put(request)
+
+    def pending_requests(self):
+        """number of pending requests in the queue
+        """
+        return self.q.qsize()
+
+    def close(self):
+        """close this Connection. Call this when main program exits
+        """
+        # set kill message to q
+        for i in xrange(self.threads):
+            self.make_request(AsyncRequest("KILL", ""))
+
+        self.q.join()  # wait for q empty
+
+        for i in xrange(self.threads):  # wait for all thread finish
+            self.tid[i].join()

Reply via email to