Author: spadkins
Date: Fri Jan 18 10:27:49 2008
New Revision: 10612
Modified:
p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm
Log:
a bunch of logging changes and cleanup to make running in production cleaner,
and one small consistency change to push_in_mem (so that ARRAY scenario matches
HASH ref scenario
Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm Fri Jan 18 10:27:49 2008
@@ -249,9 +249,13 @@
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
my $STATUS_UNBUFFERED = $self->{STATUS_UNBUFFERED};
+ my $colidx;
+ my $status_idx;
if ($ref eq "ARRAY") {
$type = "ARRAY";
print "PUSHED[M]: [", join("|",@$entry), "]\n" if ($verbose);
+ $colidx = $self->_colidx();
+ $status_idx = $colidx->{$status_attrib};
}
elsif ($ref) {
$type = "HASH";
@@ -294,7 +298,14 @@
$inserted = 1;
}
}
- if (!$removed && ($ent->{$status_attrib} ne
$STATUS_ACQUIRED)) {
+ my $acquired;
+ if ($ref eq "ARRAY") {
+ $acquired = $ent->[$status_idx] eq $STATUS_ACQUIRED;
+ }
+ elsif ($ref) {
+ $acquired = $ent->{$status_attrib} eq $STATUS_ACQUIRED;
+ }
+ if (!$removed && !$acquired) {
$removed_entry = $entries->[$i];
$self->update($removed_entry,[$status_attrib],[$STATUS_UNBUFFERED]);
splice(@$entries, $i, 1);
@@ -483,7 +494,7 @@
$entry = $e;
$acquired = $self->_acquire_entry($entry);
if ($acquired) {
- $context->log({level=>3}, "WorkQueue : $self->{name} :
ACQUIRED[M]: [", join("|",@$e), "]\n");
+ $context->log({level=>3}, "$self->{name} :
Acquired[M]: [", join("|",@$e), "]\n");
last;
}
else {
@@ -500,13 +511,13 @@
foreach my $e (@$entries) {
next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
if (!$e || !%$e) {
- $context->log({level=>1}, "WorkQueue : $self->{name} :
EMPTY ENTRY [$e]\n");
+ $context->log({level=>1}, "$self->{name} : Empty entry
[$e]\n");
}
elsif ($self->_acquire_resources($e)) {
$entry = $e;
$acquired = $self->_acquire_entry($entry);
if ($acquired) {
- $context->log({level=>3}, "WorkQueue : $self->{name} :
ACQUIRED[M]: [" . join("|", $e->{shop_request_id}, $e->{subrequest_id}). "]\n");
+ $context->log({level=>3}, "$self->{name} :
Acquired[M]: [" . join("|", $e->{shop_request_id}, $e->{subrequest_id}). "]\n");
last;
}
else {
@@ -515,7 +526,7 @@
if ($self->can("_db")) {
### TODO: this is a debugging hack that shouldn't
stay in it
my $db = $self->_db();
- $context->log({level=>1}, "WorkQueue :
$self->{name} : ACQUISITION FAILED :
[$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql
stmt[$db->{sql}]\n");
+ $context->log({level=>1}, "$self->{name} :
Acquisition failed : [$entry->{shop_request_id}|$entry->{subrequest_id}] : last
sql stmt[$db->{sql}]\n");
}
$entry = undef;
# the following line appears to have been a bug
@@ -527,7 +538,7 @@
}
}
- $context->log({level=>3}, "WorkQueue : $self->{name} : ACQUIRED[M]:
undef\n") if (!$entry);
+ $context->log({level=>3}, "$self->{name} : Acquired[M]: undef\n") if
(!$entry);
&App::sub_exit($entry) if ($App::trace);
return($entry);
}
@@ -649,13 +660,11 @@
$self->_release_resources($ent);
}
else {
- ### We only expect to get here via cancels, so we don't
want to release_resources
- #$self->_release_resources($ent);
- $context->log("WQ: _release_in_mem: release with
$ent->{$status_attrib} not equal $STATUS_ACQUIRED : actual/group
data_source[$entry->{actual_data_source}/$entry->{group_data_source}]\n");
+ $context->log({level=>1},"Release with entry
status[$ent->{$status_attrib}] not equal STATUS_ACQUIRED[$STATUS_ACQUIRED] :
actual/group
data_source[$entry->{actual_data_source}/$entry->{group_data_source}]\n");
}
my $nrows = $self->update($ent,[EMAIL PROTECTED],[EMAIL
PROTECTED]);
if (!$nrows) {
- $context->log("WorkQueue: _release_in_mem: update of db
failed : " . join("|", %$ent) . "\n");
+ $context->log({level=>1},"Update of db failed : " .
join("|", %$ent) . "\n");
}
splice(@$data, $e, 1);
print "RELEASED[M]: {", join("|",%$entry), "}\n" if ($verbose);
@@ -665,9 +674,9 @@
}
}
- $context->log("WorkQueue: _release_in_mem: release[$released]
key[$entry_key] entry[$entry->{subrequest_id} $entry->{actual_data_source}]\n");
+ $context->log({level=>3},"Release[$released] key[$entry_key]
entry[$entry->{subrequest_id} $entry->{actual_data_source}]\n");
if (!$released) {
- $context->log("WorkQueue: _release_in_mem: entry_keys[" . join(" ",
sort(map {$self->_hash_to_key($_)} @$data)) . "]\n");
+ $context->log({level=>1},"Not released: entry_keys[" . join(" ",
sort(map {$self->_hash_to_key($_)} @$data)) . "]\n");
}
&App::sub_exit($released) if ($App::trace);
@@ -1751,7 +1760,6 @@
my $resource_counts = $self->_resource_counts();
if ($op eq "push") {
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
push begin\n");
$resource_counts->{total}{$resource_key}++;
if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
$resource_counts->{buffer}{$resource_key}++;
@@ -1760,16 +1768,12 @@
else {
$self->_push_in_mem($entry,1); # release lowest
}
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
push end\n");
}
elsif ($op eq "acquire") {
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
acquire begin\n");
$resource_counts->{total}{$resource_key}--;
$resource_counts->{buffer}{$resource_key}--;
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
acquire end\n");
}
elsif ($op eq "release") {
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
release begin\n");
my $status_attrib = $self->{status_attrib};
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
my $release_without_acquire = 0;
@@ -1803,42 +1807,32 @@
}
else {
# do nothing
- #$self->_release_resources($entry) if ($nrows &&
!$release_without_acquire);
}
my $context = $self->{context};
- $context->log("WorkQueue: release: released something not in
memory entry=[$entry] nrows=[$nrows]
release_without_acquire=[$release_without_acquire]\n");
+ $context->log({level=>1},"Released something not in memory
entry=[$entry] nrows=[$nrows]
release_without_acquire=[$release_without_acquire]\n");
}
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
release end\n");
}
elsif ($op eq "unacquire") {
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
unacquire begin\n");
$resource_counts->{total}{$resource_key}++;
$resource_counts->{buffer}{$resource_key}++;
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
unacquire end\n");
}
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call
_refill_buffer begin\n");
my $num_total = $resource_counts->{total}{$resource_key};
my $num_in_buffer = $resource_counts->{buffer}{$resource_key};
if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
my $num_added = $self->_refill_buffer($resource_key);
$resource_counts->{buffer}{$resource_key} += $num_added;
}
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call
_refill_buffer end\n");
}
else {
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
foreach resource_count key begin\n");
my $resource_counts = $self->_resource_counts();
foreach my $resource_key (keys %{$resource_counts->{total}}) {
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
_refill_buffer begin\n");
my $num_total = $resource_counts->{total}{$resource_key};
my $num_in_buffer = $resource_counts->{buffer}{$resource_key};
if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
my $num_added = $self->_refill_buffer($resource_key);
$resource_counts->{buffer}{$resource_key} += $num_added;
}
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
_refill_buffer end\n");
}
- #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
foreach resource_count key end\n");
}
&App::sub_exit() if ($App::trace);
@@ -1957,3 +1951,4 @@
+
Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm Fri Jan 18
10:27:49 2008
@@ -92,7 +92,7 @@
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $context = $self->{context};
- $context->log({level=>4}, "[$self->{name}] _heartbeat\n");
+ $context->log({level=>3}, "$self->{name} _heartbeat\n");
$self->_refresh_queue();
&App::sub_exit() if ($App::trace);
}
@@ -101,7 +101,7 @@
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $context = $self->{context};
- $context->log({level=>3},"[$self->{name}] _refresh_queue\n");
+ $context->log({level=>3},"$self->{name} _refresh_queue\n");
$self->_refresh_status_counts();
$self->_refresh_resource_counts();
#$self->_maintain_queue_buffers();
@@ -327,7 +327,7 @@
if (!$released) {
my $context = $self->{context};
my $db = $self->_db();
- $context->log({level=>2}, "WorkQueue::Repository : $self->{name} :
_release_in_db RELEASE FAILED :
[$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql
stmt[$db->{sql}]\n");
+ $context->log({level=>2}, "$self->{name} : _release_in_db RELEASE
FAILED : [$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql
stmt[$db->{sql}]\n");
}
&App::sub_exit($released) if ($App::trace);
@@ -478,13 +478,13 @@
$self->_update_ref($entry, $columns, $values) if ($acquired);
if (!$acquired) {
my $context = $self->{context};
- $context->log("WQR: _acquire_entry: ACQUIRE FAIL: last
SQL[$db->{sql}]\n");
+ $context->log({level=>1},"$self->{name} : Acquire fail: last
SQL[$db->{sql}]\n");
}
}
else {
### THIS SHOULD NEVER HAPPEN
my $context = $self->{context};
- $context->log("WorkQueue: _acquire_entry: tried to acquire an entry
whose $self->{status_attrib} == $self->{STATUS_ACQUIRED}\n");
+ $context->log({level=>1},"Tried to acquire an entry whose
$self->{status_attrib} == $self->{STATUS_ACQUIRED}\n");
$acquired = 0;
}
&App::sub_exit($acquired) if ($App::trace);