Right, that makes sense now. Good thing to know, thanks! As far as it goes with random task names, I mean that I was using the datastore to save which task ran, therefore allowing the script to cancel any tasks that might have ran in the past. Of course, user- generated task names is a much better way to handle this.
It's a shame that I was not aware of the Mapper API before. It's exactly what I need. On 3 oct, 20:26, Eli Jones <[email protected]> wrote: > Ah, here's the quote from Nick Johnson on why one should use a cursor if > they're deleting a lot of data: > > http://www.mail-archive.com/[email protected]/msg2718... > > <http://www.mail-archive.com/[email protected]/msg2718...>Main > excerpt: > > "Deletes using the naive approach slow down because when a record is deleted > in Bigtable, it simply inserts a 'tombstone' record indicating the original > record is deleted - the record isn't actually removed entirely from the > datastore until the tablet it's on does its next compaction cycle. Until > then, every subsequent query has to skip over the tombstone records to find > the live records." > > > > On Sun, Oct 3, 2010 at 7:18 PM, Jimmy Bourassa <[email protected]> wrote: > > First of all, thank you for taking the time to give a such in depth > > answer. > > > To answer point 1 and 3; I was actually using the random name to make > > the task idempotent, but yes named tasks will do better. I still don't > > understand why the above code behaves the way it does, but using user > > generated name should fix this. > > > Regarding point 2, I'm not sure you're correct on this one. Shouldn't > > the query always be fast since there's no "skip" in the fetch method? > > Since the query always gets the first X entities and deletes them, I > > don't see why it'd be longer and longer. > > > In this scenario, I don't need the extra speed from the second method > > you suggest. I will make sure to look in the mapper api though. > > > Again, thank you for your time. > > > Jimmy > > > On 1 oct, 21:44, Eli Jones <[email protected]> wrote: > > > From the looks of this code.. you should completely rewrite it. > > > > 1. You indicate you want to use a recursive task.. but you aren't using > > > named tasks. So, you have no way to guarantee that the task won't fork > > > indefinitely. > > > 2. You aren't using a cursor.. so eventually the db.Query() will take > > > longer and longer and longer to run if you have a very large number of > > > entities to delete. > > > 3. Why are you saving the random task name to the datastore? > > > > There are a few patterns you can use to do batch deletes for a bunch of > > > entities. > > > > 1. Simple recursive named task with a cursor. > > > (For an intelligent explanation of this when using deferred, see here: > >http://blog.notdot.net/2010/03/Task-Queue-task-chaining-done-right) > > > > 2. Recursive task using keys_only cursor and deferred task to do > > > db.delete() > > > > You do 1 if you really only want each db.delete() to run one at a time. > > You > > > do 2 if you just want the deleting done as fast as possible. (Or there > > is > > > option 3.. familiarize yourself with the GAE Mapper API [aka the Map > > Reduce > > > API] and use that to fire off deletes.) > > > > 2 Would look something like this (I didn't indent the try except since > > it's > > > getting a little too indenty): > > > > def mainTask(campaign_id, new_import, cursor=None, i = 0): > > > count = app_model.DATASTORE_PUT_LIMIT > > > try: > > > while count == app_model.DATASTORE_PUT_LIMIT: > > > query = db.Query(Recipient, > > > keys_only=True).ancestor(campaign_id).filter('campaign', camp) > > > if cursor is not None: > > > query.with_cursor(cursor) > > > results = query.fetch(app_model.DATASTORE_PUT_LIMIT) > > > count = len(results) > > > deferred.defer(deleteByKeyList, repr(results)) > > > cursor = query.cursor() > > > if do_weird_new_import_check(new_import): > > > campaign.Campaign.get_cache(campaign_id).spawn_import_tasks() > > > except DeadlineExceededError: > > > i+= 1 > > > deferred.defer(mainTask, campaign_id, new_import, cursor, i, > > _name = > > > "mainTask-" + str(campain_id) + "-run-" + str(i)) > > > > def deleteByKeyList(keylist): > > > from google.appengine.api import datastore_types > > > keylist = eval(keylist) > > > db.delete(keylist) > > > > You could call that first function from a remote_shell.. and it would > > then > > > begin firing off deferred deleteByKeyList batches until it go a > > > DeadlineExceeded Error.. then it would just defer itself with all of the > > > appropriate variables. > > > > It's a little sloppy.. and there's a chance that the deferred delete > > would > > > get called.. and then the deadlineexceeded would throw.. and you'd then > > have > > > the cursor variable get passed for the old cursor... but that shouldn't > > > really change the ultimate behaviour. > > > > When the while loop completes (which will only happen when all of the > > > entities have had a deferred deleteByKeyList() fired off), it will do the > > > new_import check. > > > > If you are worried about maybe the DeadlineExceeded being fired off.. and > > > then the deferred.defer() call not succeeding.. it wouldn't really > > matter, > > > as long as it doesn't happen too often (so that your cursor value gets > > > passed along) and the error occurrs inside of one of the deferred > > versions > > > of the mainTask() function.. it will just run the task again with the > > > previous cursor value. > > > > And, when it runs the query.. it will only get keys for entities that are > > > still in the datastore. (the only negative will be that it will be > > looking > > > for key values greater than the key value that started an already deleted > > > batch). > > > > I'll admit, this method is a little nutty.. it's effectively trying to do > > > the db.deletes() as fast as possible.. and it results in a lot of > > deferred > > > errors (the usual 10 second appengine could not service the request in > > time > > > errors). But, it's faster than doing it serially. > > > > Again.. i am sad to say that the Mapper API pretty much makes this > > > obsolete.. since once you know how to use it, it makes a lot of stuff > > like > > > this easy.. and it seems to provide really neat stats and abilities to > > > monitor the processes. > > > >http://googleappengine.blogspot.com/2010/07/introducing-mapper-api.html > > > > On Fri, Oct 1, 2010 at 3:54 PM, Jimmy Bourassa <[email protected]> > > wrote: > > > > I created a task that handles batch delete for my application. The > > > > purpose of the task is to delete records recursively. Once all records > > > > are deleted, a new process must be started. > > > > > I tried to implement this through the code posted below. I really why > > > > this is not working as expected, but for some reason, I end up having > > > > many tasks running concurrently. Is this the expected behavior of the > > > > TaskQueue? > > > > > Here's the actual code : > > > > > def post(self): > > > > task_unique_name = self.request.headers['X-AppEngine-TaskName'] > > > > #Checks if task has been executed already > > > > if task.ExecutedTask.all().filter('task_name=', > > > > task_unique_name).count(): > > > > return; > > > > > campaign_id = db.Key(self.request.POST['campaign_id']) > > > > > def delete_transcation(camp): > > > > query = db.Query(Recipient, > > > > keys_only=True).ancestor(campaign_id).filter('campaign', camp) > > > > results = query.fetch(app_model.DATASTORE_PUT_LIMIT) > > > > #If results are found, delete them and start a new > > delete > > > > task > > > > if results: > > > > db.delete(results) > > > > t = > > taskqueue.Task(url='/tasks/delete_recipients', > > > > params=self.request.POST) > > > > t.add(queue_name='recipients-import') > > > > > #No results found to delete, start import if > > appropriate. > > > > elif self.request.POST.get('new_import', False) > > > > > campaign.Campaign.get_cache(campaign_id).spawn_import_tasks() > > > > > return True > > > > > def mark_task_as_done(): > > > > save_task = > > task.ExecutedTask(task_name=task_unique_name) > > > > save_task.put() > > > > > if(db.run_in_transaction(delete_transcation, campaign_id)): > > > > db.run_in_transaction(mark_task_as_done) > > > > > -- > > > > You received this message because you are subscribed to the Google > > Groups > > > > "Google App Engine" group. > > > > To post to this group, send email to [email protected] > > . > > > > To unsubscribe from this group, send email to > > > > [email protected]<google-appengine%2Bunsubscrib > > > > [email protected]><google-appengine%2Bunsubscrib > > [email protected]> > > > > . > > > > For more options, visit this group at > > > >http://groups.google.com/group/google-appengine?hl=en. > > > -- > > You received this message because you are subscribed to the Google Groups > > "Google App Engine" group. > > To post to this group, send email to [email protected]. > > To unsubscribe from this group, send email to > > [email protected]<google-appengine%2Bunsubscrib > > [email protected]> > > . > > For more options, visit this group at > >http://groups.google.com/group/google-appengine?hl=en. -- You received this message because you are subscribed to the Google Groups "Google App Engine" group. To post to this group, send email to [email protected]. To unsubscribe from this group, send email to [email protected]. For more options, visit this group at http://groups.google.com/group/google-appengine?hl=en.
