Author: spadkins
Date: Thu Jan 17 05:39:09 2008
New Revision: 10577
Modified:
p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm
Log:
a lot of refactoring, mostly surrounding the release processes and the related
_maintain_queue_buffers call
Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue.pm Thu Jan 17 05:39:09 2008
@@ -460,7 +460,6 @@
my ($self) = @_;
my $context = $self->{context};
- my $db = $self->_db();
my ($entry);
my $STATUS_UNACQUIRED = $self->{STATUS_UNACQUIRED};
my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
@@ -496,7 +495,10 @@
my ($acquired);
foreach my $e (@$entries) {
next if ($e->{$status_attrib} ne $STATUS_UNACQUIRED);
- if ($self->_acquire_resources($e)) {
+ if (!$e || !%$e) {
+ $context->log({level=>1}, "WorkQueue : $self->{name} :
EMPTY ENTRY [$e]\n");
+ }
+ elsif ($self->_acquire_resources($e)) {
$entry = $e;
$acquired = $self->_acquire_entry($entry);
if ($acquired) {
@@ -504,8 +506,13 @@
last;
}
else {
+ ### THIS SHOULD NEVER HAPPEN, SOMEDAY FIGURE OUT WHY
$self->_release_resources($entry);
- $context->log({level=>1}, "WorkQueue : $self->{name} :
ACQUISITION FAILED : [$entry->{shop_request_id}|$entry->{subrequest_id}] : last
sql stmt[$db->{sql}]\n");
+ if ($self->can("_db")) {
+ ### TODO: this is a debugging hack that shouldn't
stay in it
+ my $db = $self->_db();
+ $context->log({level=>1}, "WorkQueue :
$self->{name} : ACQUISITION FAILED :
[$entry->{shop_request_id}|$entry->{subrequest_id}] : last sql
stmt[$db->{sql}]\n");
+ }
$entry = undef;
# the following line appears to have been a bug
#$self->_maintain_queue_buffers("release",$entry);
@@ -599,6 +606,7 @@
my $STATUS_RELEASED = $self->{STATUS_RELEASED};
my $status_attrib = $self->{status_attrib};
my $data = $self->{data};
+ my $context = $self->{context};
my ($e, $ent, $entry_key);
my @columns = ( $self->{status_attrib} );
@@ -638,9 +646,14 @@
$self->_release_resources($ent);
}
else {
- $self->_release_resources($ent);
+ ### We only expect to get here via cancels, so we don't
want to release_resources
+ #$self->_release_resources($ent);
+ $context->log("WorkQueue: _release_in_mem: release with
$ent->{$status_attrib} not equal $STATUS_ACQUIRED\n");
+ }
+ my $released = $self->update($ent,[EMAIL PROTECTED],[EMAIL
PROTECTED]);
+ if (!$released) {
+ $context->log("WorkQueue: _release_in_mem: update of db
failed : " . join("|", %$ent) . "\n");
}
- $self->update($ent,[EMAIL PROTECTED],[EMAIL PROTECTED]);
splice(@$data, $e, 1);
print "RELEASED[M]: {", join("|",%$entry), "}\n" if ($verbose);
$released = 1;
@@ -1557,12 +1570,8 @@
if ($complies) {
foreach my $c (@$constraints) {
$key = $entry->[$c->[$CONSTR_KEY_IDX]];
- $limit = $c->[$CONSTR_LIMITS]{$key};
- $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined
$limit);
- if (defined $limit) {
- $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
- $c->[$CONSTR_COUNTS]{$key} += $count_incr;
- }
+ $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
+ $c->[$CONSTR_COUNTS]{$key} += $count_incr;
}
}
}
@@ -1583,12 +1592,8 @@
if ($complies) {
foreach my $c (@$constraints) {
$key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
- $limit = $c->[$CONSTR_LIMITS]{$key};
- $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined
$limit);
- if (defined $limit) {
- $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
- $c->[$CONSTR_COUNTS]{$key} += $count_incr;
- }
+ $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
+ $c->[$CONSTR_COUNTS]{$key} += $count_incr;
}
}
}
@@ -1653,27 +1658,20 @@
if ($self->{type} eq "ARRAY") {
foreach my $c (@$constraints) {
$key = $entry->[$c->[$CONSTR_KEY_IDX]];
- $limit = $c->[$CONSTR_LIMITS]{$key};
- $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
- if (defined $limit) {
- $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
- $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
- }
+ $count_incr = (defined $c->[$CONSTR_COUNT_IDX]) ?
$entry->[$c->[$CONSTR_COUNT_IDX]] : 1;
+ $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
}
}
else {
foreach my $c (@$constraints) {
$key = $entry->{$c->[$CONSTR_KEY_ATTRIB]};
- $limit = $c->[$CONSTR_LIMITS]{$key};
- $limit = $c->[$CONSTR_LIMITS]{_DEFAULT} if (!defined $limit);
- if (defined $limit) {
- $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
- $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
- }
+ $count_incr = (defined $c->[$CONSTR_COUNT_ATTRIB]) ?
$entry->{$c->[$CONSTR_COUNT_ATTRIB]} : 1;
+ $c->[$CONSTR_COUNTS]{$key} -= $count_incr;
}
}
}
- &App::sub_exit() if ($App::trace);
+ &App::sub_exit(1) if ($App::trace);
+ return(1);
}
sub _resource_counts {
@@ -1745,7 +1743,7 @@
my $resource_counts = $self->_resource_counts();
if ($op eq "push") {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
push begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
push begin\n");
$resource_counts->{total}{$resource_key}++;
if ($resource_counts->{buffer}{$resource_key} < $BUFFER_SIZE) {
$resource_counts->{buffer}{$resource_key}++;
@@ -1754,48 +1752,88 @@
else {
$self->_push_in_mem($entry,1); # release lowest
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
push end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
push end\n");
}
elsif ($op eq "acquire") {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
acquire begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
acquire begin\n");
$resource_counts->{total}{$resource_key}--;
$resource_counts->{buffer}{$resource_key}--;
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
acquire end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
acquire end\n");
}
elsif ($op eq "release") {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
release begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
release begin\n");
$self->_release_in_mem($entry, $columns, $values);
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
release end\n");
+
+ my $status_attrib = $self->{status_attrib};
+ my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
+ my $release_without_acquire = 0;
+ if ($self->{type} eq "ARRAY") {
+ my $colidx = $self->_colidx();
+ my $status_idx = $colidx->{$status_attrib};
+ if ($entry->[$status_idx] ne $STATUS_ACQUIRED) {
+ $release_without_acquire = 1;
+ }
+ }
+ else {
+ if ($entry->{$status_attrib} ne $STATUS_ACQUIRED) {
+ $release_without_acquire = 1;
+ }
+ }
+ ### TODO: figure out how to maintain numbers when $released is
false, causing constraint issues
+ my $released = $self->_release_in_mem($entry, $columns, $values);
+ if ($released) {
+ if ($release_without_acquire) {
+ $resource_counts->{total}{$resource_key}--;
+ $resource_counts->{buffer}{$resource_key}--;
+ }
+ else {
+ # do nothing
+ }
+ }
+ else {
+ my $nrows = $self->_release_in_db($entry,$columns,$values);
+ if ($release_without_acquire) {
+ $resource_counts->{total}{$resource_key}--;
+ }
+ else {
+ # do nothing
+ #$self->_release_resources($entry) if ($nrows &&
!$release_without_acquire);
+ }
+ my $context = $self->{context};
+ $context->log("WorkQueue: release: released something not in
memory entry=[$entry] nrows=[$nrows]
release_without_acquire=[$release_without_acquire]\n");
+ }
+
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
release end\n");
}
elsif ($op eq "unacquire") {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
unacquire begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
unacquire begin\n");
$resource_counts->{total}{$resource_key}++;
$resource_counts->{buffer}{$resource_key}++;
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
unacquire end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry
unacquire end\n");
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call
_refill_buffer begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call
_refill_buffer begin\n");
my $num_total = $resource_counts->{total}{$resource_key};
my $num_in_buffer = $resource_counts->{buffer}{$resource_key};
if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
my $num_added = $self->_refill_buffer($resource_key);
$resource_counts->{buffer}{$resource_key} += $num_added;
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call
_refill_buffer end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : entry call
_refill_buffer end\n");
}
else {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
foreach resource_count key begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
foreach resource_count key begin\n");
my $resource_counts = $self->_resource_counts();
foreach my $resource_key (keys %{$resource_counts->{total}}) {
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
_refill_buffer begin\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
_refill_buffer begin\n");
my $num_total = $resource_counts->{total}{$resource_key};
my $num_in_buffer = $resource_counts->{buffer}{$resource_key};
if ($num_total > $num_in_buffer && $num_in_buffer < $BUFFER_SIZE) {
my $num_added = $self->_refill_buffer($resource_key);
$resource_counts->{buffer}{$resource_key} += $num_added;
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
_refill_buffer end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
_refill_buffer end\n");
}
- $context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
foreach resource_count key end\n");
+ #$context->log({level=>3}, "WQ: _maintain_queue_buffers : no entry
foreach resource_count key end\n");
}
&App::sub_exit() if ($App::trace);
Modified: p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm
==============================================================================
--- p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm (original)
+++ p5ee/trunk/App-WorkQueue/lib/App/WorkQueue/Repository.pm Thu Jan 17
05:39:09 2008
@@ -305,41 +305,12 @@
&App::sub_entry if ($App::trace);
my ($self, $entry, $columns, $values) = @_;
- my $status_attrib = $self->{status_attrib};
- my $STATUS_ACQUIRED = $self->{STATUS_ACQUIRED};
- my ($resource_counts, $resource_key, $release_without_acquire);
- $resource_counts = $self->_resource_counts();
- $resource_key = $self->_resource_key($entry);
- if ($self->{type} eq "ARRAY") {
- my $colidx = $self->_colidx();
- my $status_idx = $colidx->{$status_attrib};
- if ($entry->[$status_idx] ne $STATUS_ACQUIRED) {
- $release_without_acquire = 1;
- }
- }
- else {
- if ($entry->{$status_attrib} ne $STATUS_ACQUIRED) {
- $release_without_acquire = 1;
- }
- }
- if ($release_without_acquire) {
- $resource_counts->{total}{$resource_key}--;
- }
- ### TODO: figure out how to maintain numbers when $released is false,
causing constraint issues
- my $released = $self->_release_in_mem($entry, $columns, $values);
- if ($released) {
- $resource_counts->{buffer}{$resource_key}-- if
($release_without_acquire);
- $self->_maintain_queue_buffers(undef,$entry,$columns,$values);
- }
- else {
- $released = $self->_release_in_db($entry,$columns,$values);
- $self->_release_resources($entry) if ($released &&
!$release_without_acquire);
- #$resource_counts->{total}{$resource_key}-- if
(!$release_without_acquire);
- }
+ $self->_maintain_queue_buffers("release",$entry,$columns,$values);
$self->print() if ($self->{verbose});
$self->{context}->trigger_event_loop_extension();
- &App::sub_exit($released) if ($App::trace);
- return($released);
+
+ &App::sub_exit(1) if ($App::trace);
+ return(1);
}
sub _release_in_db {
@@ -507,6 +478,9 @@
$self->_update_ref($entry, $columns, $values) if ($acquired);
}
else {
+ ### THIS SHOULD NEVER HAPPEN
+ my $context = $self->{context};
+ $context->log("WorkQueue: _acquire_entry: tried to acquire an entry
whose $self->{status_attrib} == $self->{STATUS_ACQUIRED}\n");
$acquired = 0;
}
&App::sub_exit($acquired) if ($App::trace);