Author: spadkins
Date: Fri Jan 18 10:27:49 2008
New Revision: 10612

Modified:
   p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
   p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm

Log:
a bunch of logging changes and cleanup to make running in production cleaner, 
and one small consistency change to push_in_mem (so that ARRAY scenario matches 
HASH ref scenario

Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm       (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm       Fri Jan 18 10:27:49 2008
@@ -249,9 +249,13 @@
     my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
     my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
     my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
+    my $colidx;
+    my $status_idx;
     if ($ref eq "ARRAY") {
         $type = "ARRAY";
         print "PUSHED[M]: [", join("|",@$entry), "]\n" if ($verbose);
+        $colidx = $self->_colidx();
+        $status_idx = $colidx->{$status_attrib};
     }
     elsif ($ref) {
         $type = "HASH";
@@ -294,7 +298,14 @@
                             $inserted = 1;
                         }
                     }
-                    if (!$removed && ($ent->{$status_attrib} ne 
$STATUS_ACQUIRED)) {
+                    my $acquired;
+                    if ($ref eq "ARRAY") {
+                        $acquired = $ent->[$status_idx] eq $STATUS_ACQUIRED;
+                    }
+                    elsif ($ref) {
+                        $acquired = $ent->{$status_attrib} eq $STATUS_ACQUIRED;
+                    }
+                    if (!$removed && !$acquired) {
                         $removed_entry = $entries->[$i];
                         
$self->update($removed_entry,[$status_attrib],[$STATUS_UNBUFFERED]);
                         splice(@$entries, $i, 1);
@@ -483,7 +494,7 @@
                     $entry = $e;
                     $acquired = $self->_acquire_entry($entry);
                     if ($acquired) {
-                        $context->log({level=>3}, "WorkQueue : $self->{name} : 
ACQUIRED[M]: [", join("|",@$e), "]\n");
+                        $context->log({level=>3}, "$self->{name} : 
Acquired[M]: [", join("|",@$e), "]\n");
                         last;
                     }
                     else {
@@ -500,13 +511,13 @@
             foreach my $e (@$entries) {
                 next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
                 if (!$e || !%$e) {
-                    $context->log({level=>1}, "WorkQueue : $self->{name} : 
EMPTY ENTRY [$e]\n");
+                    $context->log({level=>1}, "$self->{name} : Empty entry 
[$e]\n");
                 }
                 elsif ($self->_acquire_resources($e)) {
                     $entry = $e;
                     $acquired = $self->_acquire_entry($entry);
                     if ($acquired) {
-                        $context->log({level=>3}, "WorkQueue : $self->{name} : 
ACQUIRED[M]: [" . join("|", $e->{shop_request_id}, $e->{subrequest_id}). "]\n");
+                        $context->log({level=>3}, "$self->{name} : 
Acquired[M]: [" . join("|", $e->{shop_request_id}, $e->{subrequest_id}). "]\n");
                         last;
                     }
                     else {
@@ -515,7 +526,7 @@
                         if ($self->can("_db")) {
                             ### TODO: this is a debugging hack that shouldn't 
stay in it
                             my $db = $self->_db();
-                            $context->log({level=>1}, "WorkQueue : 
$self->{name} : ACQUISITION FAILED : 
[$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql 
stmt[$db->{sql}]\n");
+                            $context->log({level=>1}, "$self->{name} : 
Acquisition failed : [$entry->{shop_request_id}|$entry->{subrequest_id}] : last 
sql stmt[$db->{sql}]\n");
                         }
                         $entry = undef;
                         # the following line appears to have been a bug
@@ -527,7 +538,7 @@
         }
     }
 
-    $context->log({level=>3}, "WorkQueue : $self->{name} : ACQUIRED[M]: 
undef\n") if (!$entry);
+    $context->log({level=>3}, "$self->{name} : Acquired[M]: undef\n") if 
(!$entry);
     &App::sub_exit($entry) if ($App::trace);
     return($entry);
 }
@@ -649,13 +660,11 @@
                     $self->_release_resources($ent);
                 }
                 else {
-                    ### We only expect to get here via cancels, so we don't 
want to release_resources
-                    #$self->_release_resources($ent);
-                    $context->log("WQ: _release_in_mem: release with 
$ent->{$status_attrib} not equal $STATUS_ACQUIRED : actual/group 
data_source[$entry->{actual_data_source}/$entry->{group_data_source}]\n");
+                    $context->log({level=>1},"Release with entry 
status[$ent->{$status_attrib}] not equal STATUS_ACQUIRED[$STATUS_ACQUIRED] : 
actual/group 
data_source[$entry->{actual_data_source}/$entry->{group_data_source}]\n");
                 }
                 my $nrows = $self->update($ent,[EMAIL PROTECTED],[EMAIL 
PROTECTED]);
                 if (!$nrows) {
-                    $context->log("WorkQueue: _release_in_mem: update of db 
failed : " . join("|", %$ent) . "\n");
+                    $context->log({level=>1},"Update of db failed : " . 
join("|", %$ent) . "\n");
                 }
                 splice(@$data, $e, 1);
                 print "RELEASED[M]: {", join("|",%$entry), "}\n" if ($verbose);
@@ -665,9 +674,9 @@
         }
     }
 
-    $context->log("WorkQueue: _release_in_mem: release[$released] 
key[$entry_key] entry[$entry->{subrequest_id} $entry->{actual_data_source}]\n");
+    $context->log({level=>3},"Release[$released] key[$entry_key] 
entry[$entry->{subrequest_id} $entry->{actual_data_source}]\n");
     if (!$released) {
-        $context->log("WorkQueue: _release_in_mem: entry_keys[" . join(" ", 
sort(map {$self->_hash_to_key($_)} @$data)) . "]\n");
+        $context->log({level=>1},"Not released: entry_keys[" . join(" ", 
sort(map {$self->_hash_to_key($_)} @$data)) . "]\n");
     }
 
     &App::sub_exit($released) if ($App::trace);
@@ -1751,7 +1760,6 @@
         my $resource_counts = $self->_resource_counts();
 
         if ($op eq "push") {
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
push begin\n");
             $resource_counts->{total}{$resource_key}++;
             if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
                 $resource_counts->{buffer}{$resource_key}++;
@@ -1760,16 +1768,12 @@
             else {
                 $self->_push_in_mem($entry,1); # release lowest
             }
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
push end\n");
         }
         elsif ($op eq "acquire") {
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
acquire begin\n");
             $resource_counts->{total}{$resource_key}--;
             $resource_counts->{buffer}{$resource_key}--;
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
acquire end\n");
         }
         elsif ($op eq "release") {
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
release begin\n");
             my $status_attrib = $self->{status_attrib};
             my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
             my $release_without_acquire = 0;
@@ -1803,42 +1807,32 @@
                 }
                 else {
                     # do nothing
-                    #$self->_release_resources($entry) if ($nrows && 
!$release_without_acquire);
                 }
                 my $context = $self->{context};
-                $context->log("WorkQueue: release: released something not in 
memory entry=[$entry] nrows=[$nrows] 
release_without_acquire=[$release_without_acquire]\n");
+                $context->log({level=>1},"Released something not in memory 
entry=[$entry] nrows=[$nrows] 
release_without_acquire=[$release_without_acquire]\n");
             }
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
release end\n");
         }
         elsif ($op eq "unacquire") {
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
unacquire begin\n");
             $resource_counts->{total}{$resource_key}++;
             $resource_counts->{buffer}{$resource_key}++;
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry 
unacquire end\n");
         }
-        #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call 
_refill_buffer begin\n");
         my $num_total       = $resource_counts->{total}{$resource_key};
         my $num_in_buffer   = $resource_counts->{buffer}{$resource_key};
         if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
             my $num_added = $self->_refill_buffer($resource_key);
             $resource_counts->{buffer}{$resource_key} += $num_added;
         }
-        #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call 
_refill_buffer end\n");
     }
     else {
-        #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
foreach resource_count key begin\n");
         my $resource_counts = $self->_resource_counts();
         foreach my $resource_key (keys %{$resource_counts->{total}}) {
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
_refill_buffer begin\n");
             my $num_total       = $resource_counts->{total}{$resource_key};
             my $num_in_buffer   = $resource_counts->{buffer}{$resource_key};
             if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
                 my $num_added = $self->_refill_buffer($resource_key);
                 $resource_counts->{buffer}{$resource_key} += $num_added;
             }
-            #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
_refill_buffer end\n");
         }
-        #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry 
foreach resource_count key end\n");
     }
 
     &App::sub_exit() if ($App::trace);
@@ -1957,3 +1951,4 @@
 
 
 
+

Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm    (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm    Fri Jan 18 
10:27:49 2008
@@ -92,7 +92,7 @@
     &App::sub_entry if ($App::trace);
     my ($self) = @_;
     my $context = $self->{context};
-    $context->log({level=>4}, "[$self->{name}] _heartbeat\n");
+    $context->log({level=>3}, "$self->{name} _heartbeat\n");
     $self->_refresh_queue();
     &App::sub_exit() if ($App::trace);
 }
@@ -101,7 +101,7 @@
     &App::sub_entry if ($App::trace);
     my ($self) = @_;
     my $context = $self->{context};
-    $context->log({level=>3},"[$self->{name}] _refresh_queue\n");
+    $context->log({level=>3},"$self->{name} _refresh_queue\n");
     $self->_refresh_status_counts();
     $self->_refresh_resource_counts();
     #$self->_maintain_queue_buffers();
@@ -327,7 +327,7 @@
     if (!$released) {
         my $context = $self->{context};
         my $db = $self->_db();
-        $context->log({level=>2}, "WorkQueue::Repository : $self->{name} : 
_release_in_db RELEASE FAILED : 
[$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql 
stmt[$db->{sql}]\n");
+        $context->log({level=>2}, "$self->{name} : _release_in_db RELEASE 
FAILED : [$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql 
stmt[$db->{sql}]\n");
     }
 
     &App::sub_exit($released) if ($App::trace);
@@ -478,13 +478,13 @@
         $self->_update_ref($entry, $columns, $values) if ($acquired);
         if (!$acquired) {
             my $context = $self->{context};
-            $context->log("WQR: _acquire_entry: ACQUIRE FAIL: last 
SQL[$db->{sql}]\n");
+            $context->log({level=>1},"$self->{name} : Acquire fail: last 
SQL[$db->{sql}]\n");
         }
     }
     else {
         ### THIS SHOULD NEVER HAPPEN 
         my $context = $self->{context};
-        $context->log("WorkQueue: _acquire_entry: tried to acquire an entry 
whose $self->{status_attrib} == $self->{STATUS_ACQUIRED}\n");
+        $context->log({level=>1},"Tried to acquire an entry whose 
$self->{status_attrib} == $self->{STATUS_ACQUIRED}\n");
         $acquired = 0;
     }
     &App::sub_exit($acquired) if ($App::trace);

Reply via email to