Author: spadkins
Date: Thu Jan 17 05:39:09 2008
New Revision: 10577

Modified:
   p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
   p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm

Log:
a lot of refactoring, mostly surrounding the release processes and the related 
_maintain_queue_buffers call

Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm       (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm       Thu Jan 17 05:39:09 2008
@@ -460,7 +460,6 @@
     my ($self) = @_;
 
     my $context = $self->{context};
-    my $db = $self->_db();
     my ($entry);
     my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
     my $STATUS_ACQUIRED   = $self->{STATUS_ACQUIRED};
@@ -496,7 +495,10 @@
             my ($acquired);
             foreach my $e (@$entries) {
                 next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
-                if ($self->_acquire_resources($e)) {
+                if (!$e || !%$e) {
+                    $context->log({level=>1}, "WorkQueue : $self->{name} : 
EMPTY ENTRY [$e]\n");
+                }
+                elsif ($self->_acquire_resources($e)) {
                     $entry = $e;
                     $acquired = $self->_acquire_entry($entry);
                     if ($acquired) {
@@ -504,8 +506,13 @@
                         last;
                     }
                     else {
+                        ### THIS SHOULD NEVER HAPPEN, SOMEDAY FIGURE OUT WHY
                         $self->_release_resources($entry);
-                        $context->log({level=>1}, "WorkQueue : $self->{name} : 
ACQUISITION FAILED : [$entry->{shop_request_id}|$entry->{subrequest_id}] : last 
sql stmt[$db->{sql}]\n");
+                        if ($self->can("_db")) {
+                            ### TODO: this is a debugging hack that shouldn't 
stay in it
+                            my $db = $self->_db();
+                            $context->log({level=>1}, "WorkQueue : 
$self->{name} : ACQUISITION FAILED : 
[$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql 
stmt[$db->{sql}]\n");
+                        }
                         $entry = undef;
                         # the following line appears to have been a bug
                         #$self->_maintain_queue_buffers("release",$entry);
@@ -599,6 +606,7 @@
     my $STATUS_RELEASED   = $self->{STATUS_RELEASED};
     my $status_attrib     = $self->{status_attrib};
     my $data              = $self->{data};
+    my $context           = $self->{context};
     my ($e, $ent, $entry_key);
 
     my @columns = ( $self->{status_attrib} );
@@ -638,9 +646,14 @@
                     $self->_release_resources($ent);
                 }
                 else {
-                    $self->_release_resources($ent);
+                    ### We only expect to get here via cancels, so we don't 
want to release_resources
+                    #$self->_release_resources($ent);
+                    $context->log("WorkQueue: _release_in_mem: release with 
$ent->{$status_attrib} not equal $STATUS_ACQUIRED\n");
+                }
+                my $released = $self->update($ent,[EMAIL PROTECTED],[EMAIL 
PROTECTED]);
+                if (!$released) {
+                    $context->log("WorkQueue: _release_in_mem: update of db 
failed : " . join("|", %$ent) . "\n");
                 }
-                $self->update($ent,[EMAIL PROTECTED],[EMAIL PROTECTED]);
                 splice(@$data, $e, 1);
                 print "RELEASED[M]: {", join("|",%$entry), "}\n" if ($verbose);
                 $released = 1;
@@ -1557,12 +1570,8 @@
             if ($complies) {
                 foreach my $c (@$constraints) {
                     $key = $entry->[$c->[$CONSTR_KEY_IDX]];
-                    $limit = $c->[$CONSTR_LIMITS]{$key};
-                    $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined 
$limit);
-                    if (defined $limit) {
-                        $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? 
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
-                        $c->[$CONSTR_COUNTS]{$key} += $count_incr;
-                    }
+                    $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? 
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
+                    $c->[$CONSTR_COUNTS]{$key} += $count_incr;
                 }
             }
         }
@@ -1583,12 +1592,8 @@
             if ($complies) {
                 foreach my $c (@$constraints) {
                     $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
-                    $limit = $c->[$CONSTR_LIMITS]{$key};
-                    $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined 
$limit);
-                    if (defined $limit) {
-                        $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? 
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
-                        $c->[$CONSTR_COUNTS]{$key} += $count_incr;
-                    }
+                    $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? 
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
+                    $c->[$CONSTR_COUNTS]{$key} += $count_incr;
                 }
             }
         }
@@ -1653,27 +1658,20 @@
         if ($self->{type} eq "ARRAY") {
             foreach my $c (@$constraints) {
                 $key = $entry->[$c->[$CONSTR_KEY_IDX]];
-                $limit = $c->[$CONSTR_LIMITS]{$key};
-                $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
-                if (defined $limit) {
-                    $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? 
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
-                    $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
-                }
+                $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ? 
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
+                $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
             }
         }
         else {
             foreach my $c (@$constraints) {
                 $key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
-                $limit = $c->[$CONSTR_LIMITS]{$key};
-                $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
-                if (defined $limit) {
-                    $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? 
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
-                    $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
-                }
+                $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ? 
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
+                $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
             }
         }
     }
-    &App::sub_exit() if ($App::trace);
+    &App::sub_exit(1) if ($App::trace);
+    return(1);
 }
 
 sub _resource_counts {
@@ -1745,7 +1743,7 @@
         my $resource_counts = $self->_resource_counts();
 
         if ($op eq "push") {
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
push begin\n");
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
push begin\n");
             $resource_counts->{total}{$resource_key}++;
             if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
                 $resource_counts->{buffer}{$resource_key}++;
@@ -1754,48 +1752,88 @@
             else {
                 $self->_push_in_mem($entry,1); # release lowest
             }
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
push end\n");
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
push end\n");
         }
         elsif ($op eq "acquire") {
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
acquire begin\n");
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
acquire begin\n");
             $resource_counts->{total}{$resource_key}--;
             $resource_counts->{buffer}{$resource_key}--;
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
acquire end\n");
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
acquire end\n");
         }
         elsif ($op eq "release") {
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
release begin\n");
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
release begin\n");
             $self->_release_in_mem($entry, $columns, $values);
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
release end\n");
+
+            my $status_attrib = $self->{status_attrib};
+            my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
+            my $release_without_acquire = 0;
+            if ($self->{type} eq "ARRAY") {
+                my $colidx = $self->_colidx();
+                my $status_idx = $colidx->{$status_attrib};
+                if ($entry->[$status_idx] ne $STATUS_ACQUIRED) {
+                    $release_without_acquire = 1;
+                }
+            }
+            else {
+                if ($entry->{$status_attrib} ne $STATUS_ACQUIRED) {
+                    $release_without_acquire = 1;
+                }
+            }
+            ### TODO: figure out how to maintain numbers when $released is 
false, causing constraint issues
+            my $released = $self->_release_in_mem($entry, $columns, $values);
+            if ($released) {
+                if ($release_without_acquire) {
+                    $resource_counts->{total}{$resource_key}--;
+                    $resource_counts->{buffer}{$resource_key}--;
+                }
+                else {
+                    # do nothing
+                }
+            }
+            else {
+                my $nrows = $self->_release_in_db($entry,$columns,$values);
+                if ($release_without_acquire) {
+                    $resource_counts->{total}{$resource_key}--;
+                }
+                else {
+                    # do nothing
+                    #$self->_release_resources($entry) if ($nrows && 
!$release_without_acquire);
+                }
+                my $context = $self->{context};
+                $context->log("WorkQueue: release: released something not in 
memory entry=[$entry] nrows=[$nrows] 
release_without_acquire=[$release_without_acquire]\n");
+            }
+
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
release end\n");
         }
         elsif ($op eq "unacquire") {
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
unacquire begin\n");
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
unacquire begin\n");
             $resource_counts->{total}{$resource_key}++;
             $resource_counts->{buffer}{$resource_key}++;
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
unacquire end\n");
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
unacquire end\n");
         }
-        $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call 
_refill_buffer begin\n");
+        #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call 
_refill_buffer begin\n");
         my $num_total       = $resource_counts->{total}{$resource_key};
         my $num_in_buffer   = $resource_counts->{buffer}{$resource_key};
         if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
             my $num_added = $self->_refill_buffer($resource_key);
             $resource_counts->{buffer}{$resource_key} += $num_added;
         }
-        $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call 
_refill_buffer end\n");
+        #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call 
_refill_buffer end\n");
     }
     else {
-        $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
foreach resource_count key begin\n");
+        #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
foreach resource_count key begin\n");
         my $resource_counts = $self->_resource_counts();
         foreach my $resource_key (keys %{$resource_counts->{total}}) {
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
_refill_buffer begin\n");
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
_refill_buffer begin\n");
             my $num_total       = $resource_counts->{total}{$resource_key};
             my $num_in_buffer   = $resource_counts->{buffer}{$resource_key};
             if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
                 my $num_added = $self->_refill_buffer($resource_key);
                 $resource_counts->{buffer}{$resource_key} += $num_added;
             }
-            $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
_refill_buffer end\n");
+            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
_refill_buffer end\n");
         }
-        $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
foreach resource_count key end\n");
+        #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
foreach resource_count key end\n");
     }
 
     &App::sub_exit() if ($App::trace);

Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm    (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm    Thu Jan 17 
05:39:09 2008
@@ -305,41 +305,12 @@
     &App::sub_entry if ($App::trace);
     my ($self, $entry, $columns, $values) = @_;
 
-    my $status_attrib = $self->{status_attrib};
-    my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
-    my ($resource_counts, $resource_key, $release_without_acquire);
-    $resource_counts = $self->_resource_counts();
-    $resource_key = $self->_resource_key($entry);
-    if ($self->{type} eq "ARRAY") {
-        my $colidx = $self->_colidx();
-        my $status_idx = $colidx->{$status_attrib};
-        if ($entry->[$status_idx] ne $STATUS_ACQUIRED) {
-            $release_without_acquire = 1;
-        }
-    }
-    else {
-        if ($entry->{$status_attrib} ne $STATUS_ACQUIRED) {
-            $release_without_acquire = 1;
-        }
-    }
-    if ($release_without_acquire) {
-        $resource_counts->{total}{$resource_key}--;
-    }
-    ### TODO: figure out how to maintain numbers when $released is false, 
causing constraint issues
-    my $released = $self->_release_in_mem($entry, $columns, $values);
-    if ($released) {
-        $resource_counts->{buffer}{$resource_key}-- if 
($release_without_acquire);
-        $self->_maintain_queue_buffers(undef,$entry,$columns,$values);
-    }
-    else {
-        $released = $self->_release_in_db($entry,$columns,$values);
-        $self->_release_resources($entry) if ($released && 
!$release_without_acquire);
-        #$resource_counts->{total}{$resource_key}-- if 
(!$release_without_acquire);
-    }
+    $self->_maintain_queue_buffers("release",$entry,$columns,$values);
     $self->print() if ($self->{verbose});
     $self->{context}->trigger_event_loop_extension();
-    &App::sub_exit($released) if ($App::trace);
-    return($released);
+
+    &App::sub_exit(1) if ($App::trace);
+    return(1);
 }
 
 sub _release_in_db {
@@ -507,6 +478,9 @@
         $self->_update_ref($entry, $columns, $values) if ($acquired);
     }
     else {
+        ### THIS SHOULD NEVER HAPPEN 
+        my $context = $self->{context};
+        $context->log("WorkQueue: _acquire_entry: tried to acquire an entry 
whose $self->{status_attrib} == $self->{STATUS_ACQUIRED}\n");
         $acquired = 0;
     }
     &App::sub_exit($acquired) if ($App::trace);

Reply via email to