Right, that makes sense now. Good thing to know, thanks!

As far as it goes with random task names, I mean that I was using the
datastore to save which task ran, therefore allowing the script to
cancel any tasks that might have ran in the past. Of course, user-
generated task names is a much better way to handle this.

It's a shame that I was not aware of the Mapper API before. It's
exactly what I need.

On 3 oct, 20:26, Eli Jones <[email protected]> wrote:
> Ah, here's the quote from Nick Johnson on why one should use a cursor if
> they're deleting a lot of data:
>
> http://www.mail-archive.com/[email protected]/msg2718...
>
> <http://www.mail-archive.com/[email protected]/msg2718...>Main
> excerpt:
>
> "Deletes using the naive approach slow down because when a record is deleted
> in Bigtable, it simply inserts a 'tombstone' record indicating the original
> record is deleted - the record isn't actually removed entirely from the
> datastore until the tablet it's on does its next compaction cycle. Until
> then, every subsequent query has to skip over the tombstone records to find
> the live records."
>
>
>
> On Sun, Oct 3, 2010 at 7:18 PM, Jimmy Bourassa <[email protected]> wrote:
> > First of all, thank you for taking the time to give a such in depth
> > answer.
>
> > To answer point 1 and 3; I was actually using the random name to make
> > the task idempotent, but yes named tasks will do better. I still don't
> > understand why the above code behaves the way it does, but using user
> > generated name should fix this.
>
> > Regarding point 2, I'm not sure you're correct on this one. Shouldn't
> > the query always be fast since there's no "skip" in the fetch method?
> > Since the query always gets the first X entities and deletes them, I
> > don't see why it'd be longer and longer.
>
> > In this scenario, I don't need the extra speed from the second method
> > you suggest. I will make sure to look in the mapper api though.
>
> > Again, thank you for your time.
>
> > Jimmy
>
> > On 1 oct, 21:44, Eli Jones <[email protected]> wrote:
> > > From the looks of this code.. you should completely rewrite it.
>
> > > 1.  You indicate you want to use a recursive task.. but you aren't using
> > > named tasks.  So, you have no way to guarantee that the task won't fork
> > > indefinitely.
> > > 2.  You aren't using a cursor.. so eventually the db.Query() will take
> > > longer and longer and longer to run if you have a very large number of
> > > entities to delete.
> > > 3.  Why are you saving the random task name to the datastore?
>
> > > There are a few patterns you can use to do batch deletes for a bunch of
> > > entities.
>
> > > 1.  Simple recursive named task with a cursor.
> > >  (For an intelligent explanation of this when using deferred, see here:
> >http://blog.notdot.net/2010/03/Task-Queue-task-chaining-done-right)
>
> > > 2.  Recursive task using keys_only cursor and deferred task to do
> > > db.delete()
>
> > > You do 1 if you really only want each db.delete() to run one at a time.
> >  You
> > > do 2 if you just want the deleting done as fast as possible.  (Or there
> > is
> > > option 3.. familiarize yourself with the GAE Mapper API [aka the Map
> > Reduce
> > > API] and use that to fire off deletes.)
>
> > > 2 Would look something like this (I didn't indent the try except since
> > it's
> > > getting a little too indenty):
>
> > > def mainTask(campaign_id, new_import, cursor=None, i = 0):
> > >     count = app_model.DATASTORE_PUT_LIMIT
> > >     try:
> > >     while count == app_model.DATASTORE_PUT_LIMIT:
> > >         query = db.Query(Recipient,
> > > keys_only=True).ancestor(campaign_id).filter('campaign', camp)
> > >         if cursor is not None:
> > >             query.with_cursor(cursor)
> > >         results = query.fetch(app_model.DATASTORE_PUT_LIMIT)
> > >         count = len(results)
> > >         deferred.defer(deleteByKeyList, repr(results))
> > >         cursor = query.cursor()
> > >     if do_weird_new_import_check(new_import):
> > >         campaign.Campaign.get_cache(campaign_id).spawn_import_tasks()
> > >     except DeadlineExceededError:
> > >         i+= 1
> > >         deferred.defer(mainTask, campaign_id, new_import, cursor, i,
> > _name =
> > > "mainTask-" + str(campain_id) + "-run-" + str(i))
>
> > > def deleteByKeyList(keylist):
> > >     from google.appengine.api import datastore_types
> > >     keylist = eval(keylist)
> > >     db.delete(keylist)
>
> > > You could call that first function from a remote_shell.. and it would
> > then
> > > begin firing off deferred deleteByKeyList batches until it go a
> > > DeadlineExceeded Error.. then it would just defer itself with all of the
> > > appropriate variables.
>
> > > It's a little sloppy.. and there's a chance that the deferred delete
> > would
> > > get called.. and then the deadlineexceeded would throw.. and you'd then
> > have
> > > the cursor variable get passed for the old cursor... but that shouldn't
> > > really change the ultimate behaviour.
>
> > > When the while loop completes (which will only happen when all of the
> > > entities have had a deferred deleteByKeyList() fired off), it will do the
> > > new_import check.
>
> > > If you are worried about maybe the DeadlineExceeded being fired off.. and
> > > then the deferred.defer() call not succeeding.. it wouldn't really
> > matter,
> > > as long as it doesn't happen too often (so that your cursor value gets
> > > passed along) and the error occurrs inside of one of the deferred
> > versions
> > > of the mainTask() function.. it will just run the task again with the
> > > previous cursor value.
>
> > > And, when it runs the query.. it will only get keys for entities that are
> > > still in the datastore.  (the only negative will be that it will be
> > looking
> > > for key values greater than the key value that started an already deleted
> > > batch).
>
> > > I'll admit, this method is a little nutty.. it's effectively trying to do
> > > the db.deletes() as fast as possible.. and it results in a lot of
> > deferred
> > > errors (the usual 10 second appengine could not service the request in
> > time
> > > errors).  But, it's faster than doing it serially.
>
> > > Again.. i am sad to say that the Mapper API pretty much makes this
> > > obsolete.. since once you know how to use it, it makes a lot of stuff
> > like
> > > this easy.. and it seems to provide really neat stats and abilities to
> > > monitor the processes.
>
> > >http://googleappengine.blogspot.com/2010/07/introducing-mapper-api.html
>
> > > On Fri, Oct 1, 2010 at 3:54 PM, Jimmy Bourassa <[email protected]>
> > wrote:
> > > > I created a task that handles batch delete for my application. The
> > > > purpose of the task is to delete records recursively. Once all records
> > > > are deleted, a new process must be started.
>
> > > > I tried to implement this through the code posted below. I really why
> > > > this is not working as expected, but for some reason, I end up having
> > > > many tasks running concurrently. Is this the expected behavior of the
> > > > TaskQueue?
>
> > > > Here's the actual code :
>
> > > > def post(self):
> > > >        task_unique_name = self.request.headers['X-AppEngine-TaskName']
> > > >        #Checks if task has been executed already
> > > >        if task.ExecutedTask.all().filter('task_name=',
> > > > task_unique_name).count():
> > > >                return;
>
> > > >        campaign_id = db.Key(self.request.POST['campaign_id'])
>
> > > >        def delete_transcation(camp):
> > > >                query = db.Query(Recipient,
> > > > keys_only=True).ancestor(campaign_id).filter('campaign', camp)
> > > >                results = query.fetch(app_model.DATASTORE_PUT_LIMIT)
> > > >                #If results are found, delete them and start a new
> > delete
> > > > task
> > > >                if results:
> > > >                        db.delete(results)
> > > >                        t =
> > taskqueue.Task(url='/tasks/delete_recipients',
> > > > params=self.request.POST)
> > > >                        t.add(queue_name='recipients-import')
>
> > > >                #No results found to delete, start import if
> > appropriate.
> > > >                elif self.request.POST.get('new_import', False)
>
> > > >  campaign.Campaign.get_cache(campaign_id).spawn_import_tasks()
>
> > > >                return True
>
> > > >        def mark_task_as_done():
> > > >                save_task =
> > task.ExecutedTask(task_name=task_unique_name)
> > > >                save_task.put()
>
> > > >        if(db.run_in_transaction(delete_transcation, campaign_id)):
> > > >                db.run_in_transaction(mark_task_as_done)
>
> > > > --
> > > > You received this message because you are subscribed to the Google
> > Groups
> > > > "Google App Engine" group.
> > > > To post to this group, send email to [email protected]
> > .
> > > > To unsubscribe from this group, send email to
> > > > [email protected]<google-appengine%2Bunsubscrib
> > > >  [email protected]><google-appengine%2Bunsubscrib
> > [email protected]>
> > > > .
> > > > For more options, visit this group at
> > > >http://groups.google.com/group/google-appengine?hl=en.
>
> > --
> > You received this message because you are subscribed to the Google Groups
> > "Google App Engine" group.
> > To post to this group, send email to [email protected].
> > To unsubscribe from this group, send email to
> > [email protected]<google-appengine%2Bunsubscrib 
> > [email protected]>
> > .
> > For more options, visit this group at
> >http://groups.google.com/group/google-appengine?hl=en.

-- 
You received this message because you are subscribed to the Google Groups 
"Google App Engine" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/google-appengine?hl=en.

Reply via email to