I speak mainly from experience trying to deleting a lot of entities last
Spring.  When deleting 100s of thousands of entities using a query in
batches of 100 or so.. you will eventually get to a point where the query
takes longer and longer to return results.

For some reason, running the query after a bunch of deletes results in the
datastore having to skip over all of these initial keys that have already
been deleted.. but are still in the datastore (they're just marked as
deleted).

So, using the cursor lets you avoid this need to skip deleted but still
present entities. (I could be making this entire part up though.)

Now, if you are deleting less than 100,000 entities, I don't know that the
issue will matter for you. (It's also possible that the datastore has been
optimized so that this is not an issue, but I do not believe so.)

Also, the randomly assigned task name won't insure idempotency.. or even run
once behaviour.  Mainly, something could happen after you add the next task
(so, you successfully added the next task in the chain.. yet you continue on
to do more work).. or during the task add.. and this could cause the current
task to run again.. and when it got to the task add part again.. it would
just add a new task with a new random name.. and that task would be a fork
(so the task that ran and re-ran would end up firing off two tasks instead
of the expected one task).. this could happen several times.. so you would
have more and more forked tasks running that were in turn adding new tasks
after they finished running.

For chained tasks, you need to use task names.. and you need to catch the
TaskAlreadyExists and TombStonedTask Errors.. and you need an except that
forces the TaskAdd to try again if there is some other error that is thrown
when you try to add.

On Sun, Oct 3, 2010 at 7:18 PM, Jimmy Bourassa <[email protected]> wrote:

> First of all, thank you for taking the time to give a such in depth
> answer.
>
> To answer point 1 and 3; I was actually using the random name to make
> the task idempotent, but yes named tasks will do better. I still don't
> understand why the above code behaves the way it does, but using user
> generated name should fix this.
>
> Regarding point 2, I'm not sure you're correct on this one. Shouldn't
> the query always be fast since there's no "skip" in the fetch method?
> Since the query always gets the first X entities and deletes them, I
> don't see why it'd be longer and longer.
>
> In this scenario, I don't need the extra speed from the second method
> you suggest. I will make sure to look in the mapper api though.
>
> Again, thank you for your time.
>
> Jimmy
>
> On 1 oct, 21:44, Eli Jones <[email protected]> wrote:
> > From the looks of this code.. you should completely rewrite it.
> >
> > 1.  You indicate you want to use a recursive task.. but you aren't using
> > named tasks.  So, you have no way to guarantee that the task won't fork
> > indefinitely.
> > 2.  You aren't using a cursor.. so eventually the db.Query() will take
> > longer and longer and longer to run if you have a very large number of
> > entities to delete.
> > 3.  Why are you saving the random task name to the datastore?
> >
> > There are a few patterns you can use to do batch deletes for a bunch of
> > entities.
> >
> > 1.  Simple recursive named task with a cursor.
> >  (For an intelligent explanation of this when using deferred, see here:
> http://blog.notdot.net/2010/03/Task-Queue-task-chaining-done-right)
> >
> > 2.  Recursive task using keys_only cursor and deferred task to do
> > db.delete()
> >
> > You do 1 if you really only want each db.delete() to run one at a time.
>  You
> > do 2 if you just want the deleting done as fast as possible.  (Or there
> is
> > option 3.. familiarize yourself with the GAE Mapper API [aka the Map
> Reduce
> > API] and use that to fire off deletes.)
> >
> > 2 Would look something like this (I didn't indent the try except since
> it's
> > getting a little too indenty):
> >
> > def mainTask(campaign_id, new_import, cursor=None, i = 0):
> >     count = app_model.DATASTORE_PUT_LIMIT
> >     try:
> >     while count == app_model.DATASTORE_PUT_LIMIT:
> >         query = db.Query(Recipient,
> > keys_only=True).ancestor(campaign_id).filter('campaign', camp)
> >         if cursor is not None:
> >             query.with_cursor(cursor)
> >         results = query.fetch(app_model.DATASTORE_PUT_LIMIT)
> >         count = len(results)
> >         deferred.defer(deleteByKeyList, repr(results))
> >         cursor = query.cursor()
> >     if do_weird_new_import_check(new_import):
> >         campaign.Campaign.get_cache(campaign_id).spawn_import_tasks()
> >     except DeadlineExceededError:
> >         i+= 1
> >         deferred.defer(mainTask, campaign_id, new_import, cursor, i,
> _name =
> > "mainTask-" + str(campain_id) + "-run-" + str(i))
> >
> > def deleteByKeyList(keylist):
> >     from google.appengine.api import datastore_types
> >     keylist = eval(keylist)
> >     db.delete(keylist)
> >
> > You could call that first function from a remote_shell.. and it would
> then
> > begin firing off deferred deleteByKeyList batches until it go a
> > DeadlineExceeded Error.. then it would just defer itself with all of the
> > appropriate variables.
> >
> > It's a little sloppy.. and there's a chance that the deferred delete
> would
> > get called.. and then the deadlineexceeded would throw.. and you'd then
> have
> > the cursor variable get passed for the old cursor... but that shouldn't
> > really change the ultimate behaviour.
> >
> > When the while loop completes (which will only happen when all of the
> > entities have had a deferred deleteByKeyList() fired off), it will do the
> > new_import check.
> >
> > If you are worried about maybe the DeadlineExceeded being fired off.. and
> > then the deferred.defer() call not succeeding.. it wouldn't really
> matter,
> > as long as it doesn't happen too often (so that your cursor value gets
> > passed along) and the error occurrs inside of one of the deferred
> versions
> > of the mainTask() function.. it will just run the task again with the
> > previous cursor value.
> >
> > And, when it runs the query.. it will only get keys for entities that are
> > still in the datastore.  (the only negative will be that it will be
> looking
> > for key values greater than the key value that started an already deleted
> > batch).
> >
> > I'll admit, this method is a little nutty.. it's effectively trying to do
> > the db.deletes() as fast as possible.. and it results in a lot of
> deferred
> > errors (the usual 10 second appengine could not service the request in
> time
> > errors).  But, it's faster than doing it serially.
> >
> > Again.. i am sad to say that the Mapper API pretty much makes this
> > obsolete.. since once you know how to use it, it makes a lot of stuff
> like
> > this easy.. and it seems to provide really neat stats and abilities to
> > monitor the processes.
> >
> > http://googleappengine.blogspot.com/2010/07/introducing-mapper-api.html
> >
> >
> >
> > On Fri, Oct 1, 2010 at 3:54 PM, Jimmy Bourassa <[email protected]>
> wrote:
> > > I created a task that handles batch delete for my application. The
> > > purpose of the task is to delete records recursively. Once all records
> > > are deleted, a new process must be started.
> >
> > > I tried to implement this through the code posted below. I really why
> > > this is not working as expected, but for some reason, I end up having
> > > many tasks running concurrently. Is this the expected behavior of the
> > > TaskQueue?
> >
> > > Here's the actual code :
> >
> > > def post(self):
> > >        task_unique_name = self.request.headers['X-AppEngine-TaskName']
> > >        #Checks if task has been executed already
> > >        if task.ExecutedTask.all().filter('task_name=',
> > > task_unique_name).count():
> > >                return;
> >
> > >        campaign_id = db.Key(self.request.POST['campaign_id'])
> >
> > >        def delete_transcation(camp):
> > >                query = db.Query(Recipient,
> > > keys_only=True).ancestor(campaign_id).filter('campaign', camp)
> > >                results = query.fetch(app_model.DATASTORE_PUT_LIMIT)
> > >                #If results are found, delete them and start a new
> delete
> > > task
> > >                if results:
> > >                        db.delete(results)
> > >                        t =
> taskqueue.Task(url='/tasks/delete_recipients',
> > > params=self.request.POST)
> > >                        t.add(queue_name='recipients-import')
> >
> > >                #No results found to delete, start import if
> appropriate.
> > >                elif self.request.POST.get('new_import', False)
> >
> > >  campaign.Campaign.get_cache(campaign_id).spawn_import_tasks()
> >
> > >                return True
> >
> > >        def mark_task_as_done():
> > >                save_task =
> task.ExecutedTask(task_name=task_unique_name)
> > >                save_task.put()
> >
> > >        if(db.run_in_transaction(delete_transcation, campaign_id)):
> > >                db.run_in_transaction(mark_task_as_done)
> >
> > > --
> > > You received this message because you are subscribed to the Google
> Groups
> > > "Google App Engine" group.
> > > To post to this group, send email to [email protected]
> .
> > > To unsubscribe from this group, send email to
> > > [email protected]<google-appengine%[email protected]><google-appengine%2Bunsubscrib
> [email protected]>
> > > .
> > > For more options, visit this group at
> > >http://groups.google.com/group/google-appengine?hl=en.
>
> --
> You received this message because you are subscribed to the Google Groups
> "Google App Engine" group.
> To post to this group, send email to [email protected].
> To unsubscribe from this group, send email to
> [email protected]<google-appengine%[email protected]>
> .
> For more options, visit this group at
> http://groups.google.com/group/google-appengine?hl=en.
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Google App Engine" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/google-appengine?hl=en.

Reply via email to