Committed by =?UTF-8?q?Dagfinn=20Ilmari=20Manns=C3=A5ker?= <[email protected]>
Subject: [DBD::Pg 2/2] Rewrite foreign_key_info to be just one query
Do all the data munging in SQL, and just spit the result straight out,
instead of three separate queries with lots of munging in perl-space.
---
Pg.pm | 294 ++++++++++++++++++---------------------------------------
t/03dbmethod.t | 8 +-
2 files changed, 97 insertions(+), 205 deletions(-)
diff --git a/Pg.pm b/Pg.pm
index 856958a..7bad619 100644
--- a/Pg.pm
+++ b/Pg.pm
@@ -867,11 +867,6 @@ use 5.008001;
my $dbh = shift;
## PK: catalog, schema, table, FK: catalog, schema, table, attr
-
- my $oldname = $dbh->{FetchHashKeyName};
-
- local $dbh->{FetchHashKeyName} = 'NAME_lc';
-
## Each of these may be undef or empty
my $pschema = $_[1] || '';
my $ptable = $_[2] || '';
@@ -880,213 +875,112 @@ use 5.008001;
my $args = $_[6];
## Must have at least one named table
- return undef if !$ptable and !$ftable;
+ return undef if !length($ptable) and !length($ftable);
## If only the primary table is given, we return only those
columns
## that are used as foreign keys, even if that means that we
return
## unique keys but not primary one. We also return all the
foreign
## tables/columns that are referencing them, of course.
-
- ## The first step is to find the oid of each specific table in
the args:
- ## Return undef if no matching relation found
- my %oid;
- for ([$ptable, $pschema, 'P'], [$ftable, $fschema, 'F']) {
- if (length $_->[0]) {
- my $SQL = "SELECT c.oid AS schema FROM
pg_catalog.pg_class c, pg_catalog.pg_namespace n\n".
- 'WHERE c.relnamespace = n.oid AND
c.relname = ' . $dbh->quote($_->[0]);
- if (length $_->[1]) {
- $SQL .= ' AND n.nspname = ' .
$dbh->quote($_->[1]);
+ ## If no schema is given, respect search_path by using
pg_table_is_visible()
+ my @where;
+ for ([$ptable, $pschema, 'uk'], [$ftable, $fschema, 'fk']) {
+ my ($table, $schema, $type) = @$_;
+ if (length $table) {
+ push @where, "${type}_class.relname = " .
$dbh->quote($table);
+ if (length $schema) {
+ push @where, "${type}_ns.nspname = " .
$dbh->quote($schema);
}
else {
- $SQL .= ' AND
pg_catalog.pg_table_is_visible(c.oid)'
+ push @where,
"pg_catalog.pg_table_is_visible(${type}_class.oid)"
}
- my $info = $dbh->selectall_arrayref($SQL);
- return undef if ! @$info;
- $oid{$_->[2]} = $info->[0][0];
- }
- }
-
- ## We now need information about each constraint we care about.
- ## Foreign table: only 'f' / Primary table: only 'p' or 'u'
- my $WHERE = q{((contype IN ('p','u')};
- if (length $ptable) {
- $WHERE .= " AND conrelid=$oid{'P'}::oid";
- }
- else {
- $WHERE .= " AND conrelid IN (SELECT DISTINCT confrelid
FROM pg_catalog.pg_constraint WHERE conrelid=$oid{'F'}::oid)";
- if (length $pschema) {
- $WHERE .= ' AND n2.nspname = ' .
$dbh->quote($pschema);
- }
- }
-
- $WHERE .= ")\n \t\t\t\tOR \n \t\t\t\t(contype = 'f'";
- if (length $ftable) {
- $WHERE .= " AND conrelid=$oid{'F'}::oid";
- if (length $ptable) {
- $WHERE .= " AND confrelid=$oid{'P'}::oid";
- }
- }
- else {
- $WHERE .= " AND confrelid = $oid{'P'}::oid";
- if (length $fschema) {
- $WHERE .= ' AND n2.nspname = ' .
$dbh->quote($fschema);
}
}
- $WHERE .= '))';
-
- ## Grab everything except specific column names:
- my $fk_sql = qq{
- SELECT conrelid, confrelid, contype, conkey, confkey,
- pg_catalog.quote_ident(c.relname) AS t_name,
pg_catalog.quote_ident(n2.nspname) AS t_schema,
- pg_catalog.quote_ident(n.nspname) AS c_schema,
pg_catalog.quote_ident(conname) AS c_name,
- CASE
- WHEN confupdtype = 'c' THEN 0
- WHEN confupdtype = 'r' THEN 1
- WHEN confupdtype = 'n' THEN 2
- WHEN confupdtype = 'a' THEN 3
- WHEN confupdtype = 'd' THEN 4
- ELSE -1
- END AS update,
- CASE
- WHEN confdeltype = 'c' THEN 0
- WHEN confdeltype = 'r' THEN 1
- WHEN confdeltype = 'n' THEN 2
- WHEN confdeltype = 'a' THEN 3
- WHEN confdeltype = 'd' THEN 4
- ELSE -1
- END AS delete,
- CASE
- WHEN condeferrable = 'f' THEN 7
- WHEN condeferred = 't' THEN 6
- WHEN condeferred = 'f' THEN 5
- ELSE -1
- END AS defer
- FROM pg_catalog.pg_constraint k, pg_catalog.pg_class c,
pg_catalog.pg_namespace n, pg_catalog.pg_namespace n2
- WHERE $WHERE
- AND k.connamespace = n.oid
- AND k.conrelid = c.oid
- AND c.relnamespace = n2.oid
- ORDER BY conrelid ASC
- };
-
- my $sth = $dbh->prepare($fk_sql);
- $sth->execute();
-
- ## We have to make sure expand_array is on for the items below
to work
- my $oldexpand = $dbh->FETCH('pg_expand_array');
- $oldexpand or $dbh->STORE('pg_expand_array', 1);
-
- my $info = $sth->fetchall_arrayref({});
- $oldexpand or $dbh->STORE('pg_expand_array', 0);
- return undef if ! defined $info or ! @$info;
- ## Return undef if just ptable given but no fk found
- return undef if ! length $ftable and ! grep { $_->{'contype'}
eq 'f'} @$info;
-
- ## Figure out which columns we need information about
- my %colnum;
- for my $row (@$info) {
- for (@{$row->{'conkey'}}) {
- $colnum{$row->{'conrelid'}}{$_}++;
- }
- if ($row->{'contype'} eq 'f') {
- for (@{$row->{'confkey'}}) {
- $colnum{$row->{'confrelid'}}{$_}++;
- }
- }
- }
- ## Get the information about the columns computed above
+ my $WHERE = join ' AND ', @where;
my $SQL = qq{
- SELECT a.attrelid, a.attnum, pg_catalog.quote_ident(a.attname) AS
colname,
- pg_catalog.quote_ident(t.typname) AS typename
- FROM pg_catalog.pg_attribute a, pg_catalog.pg_type t
- WHERE a.atttypid = t.oid
- AND (\n};
-
- $SQL .= join "\n\t\t\t\tOR\n" => map {
- my $cols = join ',' => keys %{$colnum{$_}};
- "\t\t\t\t( a.attrelid = '$_' AND a.attnum IN ($cols) )"
- } sort keys %colnum;
-
- $sth = $dbh->prepare(qq{$SQL )});
- $sth->execute();
- my $attribs = $sth->fetchall_arrayref({});
-
- ## Make a lookup hash
- my %attinfo;
- for (@$attribs) {
- $attinfo{"$_->{'attrelid'}"}{"$_->{'attnum'}"} = $_;
- }
-
- ## This is an array in case we have identical oid/column
combos. Lowest oid wins
- my %ukey;
- for my $c (grep { $_->{'contype'} ne 'f' } @$info) {
- ## Munge multi-column keys into sequential order
- my $multi = join ' ' => sort @{$c->{'conkey'}};
- push @{$ukey{$c->{'conrelid'}}{$multi}}, $c;
- }
-
- ## Finally, return as a SQL/CLI structure:
- my $fkinfo = [];
- my $x=0;
- for my $t (sort { $a->{'c_name'} cmp $b->{'c_name'} } grep {
$_->{'contype'} eq 'f' } @$info) {
- ## We need to find which constraint row (if any)
matches our confrelid-confkey combo
- ## by checking out ukey hash. We sort for proper
matching of { 1 2 } vs. { 2 1 }
- ## No match means we have a pure index constraint
- my $u;
- my $multi = join ' ' => sort @{$t->{'confkey'}};
- if (exists $ukey{$t->{'confrelid'}}{$multi}) {
- $u = $ukey{$t->{'confrelid'}}{$multi}->[0];
- }
- else {
- ## Mark this as an index so we can fudge things
later on
- $multi = 'index';
- ## Grab the first one found, modify later on as
needed
- $u = ((values
%{$ukey{$t->{'confrelid'}}})[0]||[])->[0];
- ## Bail in case there was no match
- next if ! ref $u;
- }
+ SELECT
+ NULL, pg_catalog.quote_ident(uk_ns.nspname),
pg_catalog.quote_ident(uk_class.relname),
pg_catalog.quote_ident(uk_col.attname),
+ NULL, pg_catalog.quote_ident(fk_ns.nspname),
pg_catalog.quote_ident(fk_class.relname),
pg_catalog.quote_ident(fk_col.attname),
+ colnum.i,
+ CASE constr.confupdtype
+ WHEN 'c' THEN 0 WHEN 'r' THEN 1 WHEN
'n' THEN 2 WHEN 'a' THEN 3 WHEN 'd' THEN 4 ELSE -1
+ END,
+ CASE constr.confdeltype
+ WHEN 'c' THEN 0 WHEN 'r' THEN 1 WHEN
'n' THEN 2 WHEN 'a' THEN 3 WHEN 'd' THEN 4 ELSE -1
+ END,
+ pg_catalog.quote_ident(constr.conname),
pg_catalog.quote_ident(uk_constr.conname),
+ CASE
+ WHEN constr.condeferrable = 'f' THEN 7
+ WHEN constr.condeferred = 't' THEN 6
+ WHEN constr.condeferred = 'f' THEN 5
+ ELSE -1
+ END,
+ CASE coalesce(uk_constr.contype, 'u')
+ WHEN 'u' THEN 'UNIQUE' WHEN 'p' THEN
'PRIMARY'
+ END,
+ pg_catalog.quote_ident(uk_type.typname),
pg_catalog.quote_ident(fk_type.typname)
+ FROM pg_catalog.pg_constraint constr
+ JOIN pg_catalog.pg_class uk_class ON
constr.confrelid = uk_class.oid
+ JOIN pg_catalog.pg_namespace uk_ns ON
uk_class.relnamespace = uk_ns.oid
+ JOIN pg_catalog.pg_class fk_class ON
constr.conrelid = fk_class.oid
+ JOIN pg_catalog.pg_namespace fk_ns ON
fk_class.relnamespace = fk_ns.oid
+ -- can't do unnest() until 8.4, and would need
WITH ORDINALITY to get the array indices,
+ -- wich isn't available until 9.4 at the
earliest, so we join against a series table instead
+ JOIN pg_catalog.generate_series(1,
pg_catalog.current_setting('max_index_keys')::integer) colnum(i)
+ ON colnum.i <=
pg_catalog.array_upper(constr.conkey,1)
+ JOIN pg_catalog.pg_attribute uk_col ON
uk_col.attrelid = constr.confrelid AND uk_col.attnum = constr.confkey[colnum.i]
+ JOIN pg_catalog.pg_type uk_type ON
uk_col.atttypid = uk_type.oid
+ JOIN pg_catalog.pg_attribute fk_col ON
fk_col.attrelid = constr.conrelid AND fk_col.attnum = constr.conkey[colnum.i]
+ JOIN pg_catalog.pg_type fk_type ON
fk_col.atttypid = fk_type.oid
+
+ -- We can't match confkey from the fk
constraint to conkey of the unique constraint,
+ -- because the unique constraint might not
exist or there might be more than one
+ -- matching one. However, there must be at
least a unique _index_ on the key
+ -- columns, so we look for that; but we can't
find it via pg_index, since there may
+ -- again be more than one matching index.
+
+ -- So instead, we look at pg_depend for the
dependency that was created by the fk
+ -- constraint. This dependency is of type 'n'
(normal) and ties the pg_constraint
+ -- row oid to the pg_class oid for the index
relation (a single arbitrary one if
+ -- more than one matching unique index existed
at the time the constraint was
+ -- created). Fortunately, the constraint does
not create dependencies on the
+ -- referenced table itself, but on the
_columns_ of the referenced table, so the
+ -- index can be distinguished easily. Then we
look for another pg_depend entry,
+ -- this time an 'i' (implementation) dependency
from a pg_constraint oid (the unique
+ -- constraint if one exists) to the index oid;
but we have to allow for the
+ -- possibility that this one doesn't exist.
- Andrew Gierth (RhodiumToad)
+
+ JOIN pg_catalog.pg_depend dep ON (
+ dep.classid =
'pg_catalog.pg_constraint'::regclass
+ AND dep.objid = constr.oid
+ AND dep.objsubid = 0
+ AND dep.deptype = 'n'
+ AND dep.refclassid =
'pg_catalog.pg_class'::regclass
+ AND dep.refobjsubid=0
+ )
+ JOIN pg_catalog.pg_class idx ON (
+ idx.oid = dep.refobjid AND
idx.relkind='i'
+ )
+ LEFT JOIN pg_catalog.pg_depend dep2 ON (
+ dep2.classid =
'pg_catalog.pg_class'::regclass
+ AND dep2.objid = idx.oid
+ AND dep2.objsubid = 0
+ AND dep2.deptype = 'i'
+ AND dep2.refclassid =
'pg_catalog.pg_constraint'::regclass
+ AND dep2.refobjsubid = 0
+ )
+ LEFT JOIN pg_catalog.pg_constraint uk_constr ON
(
+ uk_constr.oid = dep2.refobjid AND
uk_constr.contype IN ('p','u')
+ )
+ WHERE $WHERE
+ AND uk_class.relkind = 'r'
+ AND fk_class.relkind = 'r'
+ AND constr.contype = 'f'
+ ORDER BY constr.conname, colnum.i
+ };
+ my $fkinfo = $dbh->selectall_arrayref($SQL);
- my $conkey = $t->{'conkey'};
- my $confkey = $t->{'confkey'};
- for (my $y=0; $conkey->[$y]; $y++) {
- # UK_TABLE_CAT
- $fkinfo->[$x][0] = undef;
- # UK_TABLE_SCHEM
- $fkinfo->[$x][1] = $u->{'t_schema'};
- # UK_TABLE_NAME
- $fkinfo->[$x][2] = $u->{'t_name'};
- # UK_COLUMN_NAME
- $fkinfo->[$x][3] =
$attinfo{$t->{'confrelid'}}{$confkey->[$y]}{'colname'};
- # FK_TABLE_CAT
- $fkinfo->[$x][4] = undef;
- # FK_TABLE_SCHEM
- $fkinfo->[$x][5] = $t->{'t_schema'};
- # FK_TABLE_NAME
- $fkinfo->[$x][6] = $t->{'t_name'};
- # FK_COLUMN_NAME
- $fkinfo->[$x][7] =
$attinfo{$t->{'conrelid'}}{$conkey->[$y]}{'colname'};
- # ORDINAL_POSITION
- $fkinfo->[$x][8] = $y+1;
- # UPDATE_RULE
- $fkinfo->[$x][9] = "$t->{'update'}";
- # DELETE_RULE
- $fkinfo->[$x][10] = "$t->{'delete'}";
- # FK_NAME
- $fkinfo->[$x][11] = $t->{'c_name'};
- # UK_NAME (may be undef if an index with no
named constraint)
- $fkinfo->[$x][12] = $multi eq 'index' ? undef :
$u->{'c_name'};
- # DEFERRABILITY
- $fkinfo->[$x][13] = "$t->{'defer'}";
- # UNIQUE_OR_PRIMARY
- $fkinfo->[$x][14] = ($u->{'contype'} eq 'p' and
$multi ne 'index') ? 'PRIMARY' : 'UNIQUE';
- # UK_DATA_TYPE
- $fkinfo->[$x][15] =
$attinfo{$t->{'confrelid'}}{$confkey->[$y]}{'typename'};
- # FK_DATA_TYPE
- $fkinfo->[$x][16] =
$attinfo{$t->{'conrelid'}}{$conkey->[$y]}{'typename'};
- $x++;
- } ## End each column in this foreign key
- } ## End each foreign key
+ return undef unless $fkinfo && @{$fkinfo};
my @cols = (qw(
UK_TABLE_CAT UK_TABLE_SCHEM UK_TABLE_NAME UK_COLUMN_NAME
@@ -1095,7 +989,7 @@ use 5.008001;
DEFERABILITY UNIQUE_OR_PRIMARY UK_DATA_TYPE FK_DATA_TYPE
));
- if ($oldname eq 'NAME_lc') {
+ if ($dbh->{FetchHashKeyName} eq 'NAME_lc') {
for my $col (@cols) {
$col = lc $col;
}
diff --git a/t/03dbmethod.t b/t/03dbmethod.t
index ff4c4eb..041e11c 100644
--- a/t/03dbmethod.t
+++ b/t/03dbmethod.t
@@ -832,11 +832,9 @@ is ($sth, undef, $t);
## Create a pk table
-# The order of the tables returned by the OID query in foreign_key_info
-# seems to be influenced by schema creation order, so create the schemas
-# in the opposite order of the search_path, so we have at least a vague
-# chance of testing that we respect the search_path order. Also create
-# the tables in the opposite order, for good measure
+# Create identical tables and relations in multiple schemas, and in the
+# opposite order of the search_path, so we have at least a vague chance
+# of testing that we respect the search_path order.
$dbh->do("CREATE SCHEMA $schema3");
$dbh->do("CREATE SCHEMA $schema2");
$dbh->do("SET search_path = $schema2,$schema3");
--
1.8.4