Author: spadkins
Date: Thu Dec 4 15:16:56 2008
New Revision: 12137
Modified:
p5ee/trunk/App-Repository/lib/App/Repository/Oracle.pm
p5ee/trunk/App-Repository/t/DBI-import-ora.t
Log:
added support for sqlldr in import_rows() for Oracle
Modified: p5ee/trunk/App-Repository/lib/App/Repository/Oracle.pm
==============================================================================
--- p5ee/trunk/App-Repository/lib/App/Repository/Oracle.pm (original)
+++ p5ee/trunk/App-Repository/lib/App/Repository/Oracle.pm Thu Dec 4
15:16:56 2008
@@ -12,6 +12,8 @@
use strict;
use Data::Dumper;
+use Digest::SHA qw(sha1_hex);
+use Date::Format;
=head1 NAME
@@ -236,153 +238,371 @@
&App::sub_exit() if ($App::trace);
}
-#############################################################################
-# METHODS
-#############################################################################
+# $insert_sql = $rep->_mk_insert_rows_sql ($table, [EMAIL PROTECTED], [EMAIL
PROTECTED], \%options);
+# i.e. $options->{replace}
+sub TBD_mk_insert_rows_sql {
+ &App::sub_entry if ($App::trace);
+ my ($self, $table, $cols, $rows, $options) = @_;
-=head1 Methods: Import/Export Data From File
+ $self->_load_table_metadata($table) if (!defined
$self->{table}{$table}{loaded});
+ my $dbh = $self->{dbh};
-=cut
+ if ($#$cols == -1) {
+ if ($#$rows > -1 && ref($rows->[0]) eq "ARRAY") {
+ die "_mk_insert_rows_sql(): no columns specified";
+ }
+ else {
+ $cols = [ keys %{$rows->[0]} ];
+ }
+ }
+ my $column_defs = $self->{table}{$table}{column};
-#############################################################################
-# import_rows()
-#############################################################################
-
-=head2 import_rows()
-
- * Signature: $rep->import_rows($table, $file);
- * Signature: $rep->import_rows($table, $file, $options);
- * Param: $table string
- * Param: $file string
- * Param: $options named
- * Param: columns ARRAY names of columns of the fields in the
file
- * Param: import_method string [basic=invokes generic superclass to
do work,
- insert=loads with multiple-row
inserts,
- <otherwise>=use "load data infile"]
- * Param: local boolean file is on client machine rather than
database server
- * Param: replace boolean rows should replace existing rows
based on unique indexes
- * Param: field_sep char character which separates the fields
in the file (can by "\t")
- * Param: field_quote char character which optionally encloses
the fields in the file (i.e. '"')
- * Param: field_escape char character which escapes the quote
chars within quotes (i.e. "\")
- * Return: void
- * Throws: App::Exception::Repository
- * Since: 0.01
-
- Note: If you want to call this with $options->{local}, you will probably
- need to make sure that mysql_local_infile=1 is in your DSN. This might
- require a line like the following in your "app.conf" file.
-
- dbioptions = mysql_local_infile=1
-
- Sample Usage:
-
- $rep->import_rows("usr","usr.dat");
-
- # root:x:0:0:root:/root:/bin/bash
- $rep->import_rows("usr", "/etc/passwd" ,{
- field_sep => ":",
- columns => [ "username", "password", "uid", "gid", "comment",
"home_directory", "shell" ],
- });
+ my $insert = $options->{replace} ? "replace" : "insert";
+ my $sql = "$insert into $table\n (" . join(", ", @$cols) . ")\nvalues\n";
+ my ($value, $colnum, $quoted, $row, $col);
+ if ($rows && $#$rows > -1 && ref($rows->[0]) eq "ARRAY") {
+ for (my $rownum = 0; $rownum <= $#$rows; $rownum++) {
+ $row = $rows->[$rownum];
+ for ($colnum = 0; $colnum <= $#$cols; $colnum++) {
+ $col = $cols->[$colnum];
+ $value = $row->[$colnum];
+ if (!defined $value) {
+ $value = "NULL";
+ }
+ else {
+ $quoted = (defined $column_defs->{$col}{quoted}) ?
($column_defs->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
+ if ($quoted) {
+ $value = $dbh->quote($value);
+ }
+ }
+ if ($column_defs->{$col}{dbexpr_update}) {
+ $value = sprintf($column_defs->{$col}{dbexpr_update},
$value);
+ }
+ $sql .= ($colnum == 0) ? " ($value" : ", $value";
+ }
+ $sql .= ($rownum < $#$rows) ? "),\n" : ")\n";
+ }
+ }
+ else { # if $row is a HASH or OBJECT ...
+ for (my $rownum = 0; $rownum <= $#$rows; $rownum++) {
+ $row = $rows->[$rownum];
+ for ($colnum = 0; $colnum <= $#$cols; $colnum++) {
+ $col = $cols->[$colnum];
+ $value = $row->{$col};
+ if (!defined $value) {
+ $value = "NULL";
+ }
+ else {
+ $quoted = (defined $column_defs->{$col}{quoted}) ?
($column_defs->{$col}{quoted}) : ($value !~ /^-?[0-9.]+$/);
+ if ($quoted) {
+ $value = $dbh->quote($value);
+ }
+ }
+ if ($column_defs->{$col}{dbexpr_update}) {
+ $value = sprintf($column_defs->{$col}{dbexpr_update},
$value);
+ }
+ $sql .= ($colnum == 0) ? " ($value" : ", $value";
+ }
+ $sql .= ($rownum < $#$rows) ? "),\n" : ")\n";
+ }
+ }
+ if (!$options->{replace} && $options->{update}) {
+ my $update = $options->{update};
+ $sql .= "on duplicate key update";
+ my $first_update_column = 1;
+ for ($colnum = 0; $colnum <= $#$cols; $colnum++) {
+ $col = $cols->[$colnum];
+ if (!ref($update) || $update->{$col}) {
+ $sql .= "," if (!$first_update_column);
+ $first_update_column = 0;
+ $sql .= "\n $col = values($col)";
+ }
+ }
+ $sql .= "\n";
+ }
+ &App::sub_exit($sql) if ($App::trace);
+ $sql;
+}
-=cut
+# $nrows = $rep->_insert_rows ($table, [EMAIL PROTECTED], [EMAIL PROTECTED]);
+sub TBD_insert_rows {
+ &App::sub_entry if ($App::trace);
+ my ($self, $table, $cols, $rows, $options) = @_;
+ $self->{error} = "";
+ my ($sql, $retval, $nrows_this_insert);
+
+ my $dbh = $self->{dbh};
+ return 0 if (!defined $dbh);
+ my $nrows = 0;
+ my $ok = 1;
+ my $context_options = $self->{context}{options};
+ my $debug_sql = $context_options->{debug_sql};
+ my $explain_sql = $context_options->{explain_sql};
+ my ($timer, $elapsed_time);
+ if ($debug_sql) {
+ $timer = $self->_get_timer();
+ }
+ my $rows_ref = ref($rows);
+ if ($rows_ref eq "ARRAY") {
+ my $maxrows = $options->{maxrows} || 100;
+ my $rownum = 0;
+ my (@current_rows, $rownum2);
+ while ($rownum <= $#$rows) {
+ $rownum2 = $rownum + $maxrows - 1;
+ $rownum2 = $#$rows if ($rownum2 > $#$rows);
+ @current_rows = @{$rows}[($rownum .. $rownum2)];
+ $nrows_this_insert = $#current_rows + 1;
+
+ $sql = $self->_mk_insert_rows_sql($table, $cols, [EMAIL
PROTECTED], $options);
+ if ($debug_sql) {
+ print $App::DEBUG_FILE "DEBUG_SQL: _insert_rows()\n";
+ print $App::DEBUG_FILE $sql;
+ }
+ ### TODO: make this work with regex for retry
+ $retval = $dbh->do($sql);
+ if ($debug_sql) {
+ print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval]
$DBI::errstr\n";
+ print $App::DEBUG_FILE "\n";
+ }
-#############################################################################
-# export_rows()
-#############################################################################
-
-=head2 export_rows()
-
- * Signature: $rep->export_rows($table, $file);
- * Signature: $rep->export_rows($table, $file, $options);
- * Param: $table string
- * Param: $file string
- * Param: $options named
- * Param: columns ARRAY names of columns of the fields in the
file
- * Param: export_method string [basic=invokes generic superclass to
do work]
- * Param: field_sep char character which separates the fields
in the file (can by "\t")
- * Param: field_quote char character which optionally encloses
the fields in the file (i.e. '"')
- * Param: field_escape char character which escapes the quote
chars within quotes (i.e. "\")
- * Return: void
- * Throws: App::Exception::Repository
- * Since: 0.01
-
- Sample Usage:
-
- $rep->export_rows("usr","usr.dat");
-
- # root:x:0:0:root:/root:/bin/bash
- $rep->export_rows("usr", "passwd.dat" ,{
- field_sep => ":",
- columns => [ "username", "password", "uid", "gid", "comment",
"home_directory", "shell" ],
- });
+ # The MySQL "insert ... on duplicate key update ..." statement
returns 2 rows affected
+ # when the insert gets a collision and causes an update. So we
have to make this
+ # adjustment. I don't know if it affects the "replace ..."
statement in a similar way,
+ # but I figure this can't hurt.
+ if ($options->{update} || $options->{replace}) {
+ if ($retval > $nrows_this_insert) {
+ $retval = $nrows_this_insert;
+ }
+ }
-=cut
+ $nrows += $retval;
+ $rownum += $maxrows;
+ }
+ if ($nrows != $#$rows + 1) {
+ $ok = 0;
+ }
+ $self->{numrows} = $nrows;
+ }
+ else {
+ my ($fh);
+ if (!$rows_ref) {
+ my $file = $rows; # assume it is a file name
+ open(App::Repository::MySQL::FILE, $file) || die "Unable to open
$file for reading: $!";
+ $fh = \*App::Repository::MySQL::FILE;
+ }
+ else {
+ $fh = $rows; # assume it is a file handle
+ }
+ $rows = []; # we will be refilling this buffer
+ my %options = ( %$options ); # make a copy so it can be modified
+ $options{maxrows} = 100;
+ $nrows = 0;
+ while (1) {
+ $rows = $self->_read_rows_from_file($fh, $cols, \%options);
+ last if ($#$rows == -1);
+ $sql = $self->_mk_insert_rows_sql($table, $cols, $rows, $options);
+ if ($debug_sql) {
+ print $App::DEBUG_FILE "DEBUG_SQL: _insert_rows()\n";
+ print $App::DEBUG_FILE $sql;
+ }
+ ### TODO: make this work with regex for retry
+ $retval = $dbh->do($sql);
+ if ($debug_sql) {
+ print $App::DEBUG_FILE "DEBUG_SQL: retval [$retval]
$DBI::errstr\n";
+ print $App::DEBUG_FILE "\n";
+ }
-#SELECT ... INTO OUTFILE is the complement of LOAD DATA INFILE; the syntax for
the
-#export_options part of the statement consists of the same FIELDS and LINES
clauses
-#that are used with the LOAD DATA INFILE statement.
-#See Section 13.2.5, .LOAD DATA INFILE Syntax..
-
-#SELECT
-# [ALL | DISTINCT | DISTINCTROW ]
-# [HIGH_PRIORITY]
-# [STRAIGHT_JOIN]
-# [SQL_SMALL_RESULT] [SQL_BIG_RESULT] [SQL_BUFFER_RESULT]
-# [SQL_CACHE | SQL_NO_CACHE] [SQL_CALC_FOUND_ROWS]
-# select_expr, ...
-# [INTO OUTFILE 'file_name' export_options
-# | INTO DUMPFILE 'file_name']
-# [FROM table_references
-# [WHERE where_definition]
-# [GROUP BY {col_name | expr | position}
-# [ASC | DESC], ... [WITH ROLLUP]]
-# [HAVING where_definition]
-# [ORDER BY {col_name | expr | position}
-# [ASC | DESC] , ...]
-# [LIMIT {[offset,] row_count | row_count OFFSET offset}]
-# [PROCEDURE procedure_name(argument_list)]
-# [FOR UPDATE | LOCK IN SHARE MODE]]
+ $nrows += $retval;
+ if ($retval != $#$rows + 1) {
+ $ok = 0;
+ last;
+ }
+ }
+ $self->{numrows} = $nrows;
+ if (!$rows_ref) {
+ close(App::Repository::MySQL::FILE);
+ }
+ }
+ if ($debug_sql) {
+ $elapsed_time = $self->_read_timer($timer);
+ print $App::DEBUG_FILE "DEBUG_SQL: total rows [$nrows] ($elapsed_time
sec)\n";
+ }
+ $self->{sql} = $sql;
+ $self->{numrows} = $nrows;
+ &App::sub_exit($nrows) if ($App::trace);
+ return($nrows);
+}
-sub export_rows {
+sub import_rows {
&App::sub_entry if ($App::trace);
- my ($self, $table, $params, $file, $options) = @_;
+ my ($self, $table, $columns, $file, $options) = @_;
+ $columns = $self->_get_default_columns($table) if (!$columns);
- if ($options->{export_method} && $options->{export_method} eq "basic") {
- $self->SUPER::export_rows($table, $file, $options);
- }
+ my $nrows = 0;
+ my $import_method = $options->{import_method} || $self->{import_method} ||
"";
+ if ($import_method eq "basic") {
+ $nrows = $self->SUPER::import_rows($table, $columns, $file, $options);
+ }
+ # DOESN'T WORK YET
+ #elsif ($import_method eq "insert") {
+ # $nrows = $self->insert_rows($table, $columns, $file, $options);
+ #}
else {
- my $columns = $options->{columns} || $self->{table}{$table}{columns};
- my $where_clause = $self->_mk_where_clause($table, $params, $options);
- my $sql = "select\n " . join(",\n ", @$columns);
- $sql .= "\n$where_clause" if ($where_clause);
- $sql .= "\ninto outfile '$file'";
- if ($options->{field_sep} || $options->{field_quote} ||
$options->{field_escape}) {
- $sql .= "\nfields";
- $sql .= "\n terminated by '$options->{field_sep}'" if
($options->{field_sep});
- $sql .= "\n optionally enclosed by '$options->{field_quote}'" if
($options->{field_quote});
- $sql .= "\n escaped by '$options->{field_escape}'" if
($options->{field_escape});
- }
- $sql .= "\n";
my $context_options = $self->{context}{options};
+ my $prefix = $context_options->{prefix};
my $debug_sql = $context_options->{debug_sql};
my ($timer, $elapsed_time);
if ($debug_sql) {
$timer = $self->_get_timer();
- print $App::DEBUG_FILE "DEBUG_SQL: export_rows()\n";
- print $App::DEBUG_FILE $sql;
+ print $App::DEBUG_FILE "DEBUG_SQL: import_rows()\n";
+ print $App::DEBUG_FILE "$table (", join(",", @$columns), ")\n";
}
- my ($retval);
- eval {
- print STDERR "\n".("HERE"x12).Dumper($sql);
- $retval = $self->{dbh}->do($sql);
- };
+
+ my $datfile = $file;
+
+ my $filebase = $datfile;
+ $filebase =~ s!.*/!!;
+ $filebase =~ s/\.dat$//;
+
+ my $dbname = $self->{dbname};
+ my $column_hash = sha1_hex(join(",", @$columns));
+
+ my $ctlfile =
"$prefix/data/app/Repository/$dbname/$table.$column_hash.ctl";
+ if (! -f $ctlfile) {
+ mkdir("$prefix/data") if (! -d "$prefix/data");
+ mkdir("$prefix/data/app") if (! -d "$prefix/data/app");
+ mkdir("$prefix/data/app/Repository") if (! -d
"$prefix/data/app/Repository");
+ mkdir("$prefix/data/app/Repository/$dbname") if (! -d
"$prefix/data/app/Repository/$dbname");
+ $self->_write_import_control_file($ctlfile, $table, $columns,
$options);
+ }
+
+ my $datetime = time2str("%Y%m%d-%H%M%S", time());
+
+ mkdir("$prefix/log/import") if (! -d "$prefix/log/import");
+ my $badfile = "$prefix/log/import/$table-$datetime-$$.bad";
+ my $logfile = "$prefix/log/import/$table-$datetime-$$.log";
+ my $outfile = "$prefix/log/import/$table-$datetime-$$.out";
+
+ #my $sqlldr_options = " direct=TRUE parallel=TRUE silent";
+ my $sqlldr_options = "";
+ if ($context_options->{"app.Repository.$dbname.$table.import_rows"}) {
+ $sqlldr_options .= " " if ($sqlldr_options);
+ $sqlldr_options .= "rows=" .
$context_options->{"app.Repository.$dbname.$table.import_rows"};
+ }
+ if
($context_options->{"app.Repository.$dbname.$table.import_bindsize"}) {
+ $sqlldr_options .= " " if ($sqlldr_options);
+ $sqlldr_options .= "bindsize=" .
$context_options->{"app.Repository.$dbname.$table.import_bindsize"};
+ }
+
+ my $cmd = "sqlldr userid=$self->{dbuser}/$self->[EMAIL PROTECTED]
data=$datfile control=$ctlfile bad=$badfile log=$logfile errors=0
$sqlldr_options > $outfile 2>&1";
+ #print STDERR "sqlldr userid=$self->{dbuser}/$self->[EMAIL PROTECTED]
data=$datfile control=$ctlfile bad=$badfile log=$logfile errors=0
$sqlldr_options > $outfile 2>&1\n";
+ my $rc = system($cmd);
+ my $exit_value = $rc >> 8;
+
if ($debug_sql) {
$elapsed_time = $self->_read_timer($timer);
- print $App::DEBUG_FILE "DEBUG_SQL: export_rows=[$retval]
($elapsed_time sec) $DBI::errstr : [EMAIL PROTECTED]";
+ print $App::DEBUG_FILE "DEBUG_SQL: import_rows=[$nrows]
($elapsed_time sec) $DBI::errstr\n";
+ }
+
+ my $badfile_size = (-s $badfile || 0);
+ if ($rc || $badfile_size) {
+ # failed
+ die "ERROR: sqlldr [$datfile] ($filebase) failed. rc=[$rc]
exit=[$exit_value] badsize=[$badfile_size] : $table-$datetime-$$\n";
+ }
+ else {
+ my $import_log_results = $self->_read_import_log_file($logfile);
+ $nrows = $import_log_results->{rows_loaded} || 0;
+ if (!$nrows ||
+ $import_log_results->{rows_error} ||
+ $import_log_results->{rows_rejected} ||
+ $import_log_results->{rows_discarded}) {
+ die "ERROR: sqlldr [$datfile] ($filebase) failed.
rows_loaded=[$nrows] rows_error=[$import_log_results->{rows_error}]
rows_rejected=[$import_log_results->{rows_rejected}]
rows_discarded=[$import_log_results->{rows_discarded}] : $table-$datetime-$$\n";
+ }
+ unlink($badfile);
+ unlink($logfile);
+ unlink($outfile);
+ }
+ }
+
+ &App::sub_exit($nrows) if ($App::trace);
+ return($nrows);
+}
+
+# 120 Rows successfully loaded.
+# 0 Rows not loaded due to data errors.
+# 0 Rows not loaded because all WHEN clauses were failed.
+# 0 Rows not loaded because all fields were null.
+#
+#
+#Space allocated for bind array: 255936 bytes(248 rows)
+#Read buffer bytes: 1048576
+#
+#Total logical records skipped: 0
+#Total logical records read: 120
+#Total logical records rejected: 0
+#Total logical records discarded: 0
+
+sub _read_import_log_file {
+ &App::sub_entry if ($App::trace);
+ my ($self, $logfile) = @_;
+ open(FILE, "< $logfile") || die "Unable to open $logfile: $!";
+ my $import_log_results = {};
+ while (<FILE>) {
+ chomp;
+ if (/^ *(\d+) Rows successfully loaded/) {
+ $import_log_results->{rows_loaded} = $1;
+ }
+ elsif (/^ *(\d+) Rows not loaded due to data errors/) {
+ $import_log_results->{rows_error} = $1;
+ }
+ elsif (/^Total logical records skipped: *(\d+)/) {
+ $import_log_results->{rows_skipped} = $1;
+ }
+ elsif (/^Total logical records read: *(\d+)/) {
+ $import_log_results->{rows_read} = $1;
+ }
+ elsif (/^Total logical records rejected: *(\d+)/) {
+ $import_log_results->{rows_rejected} = $1;
+ }
+ elsif (/^Total logical records discarded: *(\d+)/) {
+ $import_log_results->{rows_discarded} = $1;
+ }
+ }
+ close(FILE);
+ &App::sub_exit($import_log_results) if ($App::trace);
+ return($import_log_results);
+}
+
+sub _write_import_control_file {
+ &App::sub_entry if ($App::trace);
+ my ($self, $ctlfile, $table, $columns, $options) = @_;
+ my $table_def = $self->get_table_def($table);
+ my $field_sep = $options->{field_sep} || "|";
+ open(FILE, "> $ctlfile") || die "Unable to open $ctlfile: $!";
+ print FILE <<EOF;
+Load Data
+Append
+Into Table $table
+Fields Terminated By '$field_sep'
+(
+EOF
+ my ($column, $type);
+ for (my $i = 0; $i <= $#$columns; $i++) {
+ $column = $columns->[$i];
+ print FILE " $column";
+ $type = $table_def->{column}{$column}{type};
+ if ($type eq "date") {
+ print FILE ' date "YYYY-MM-DD"';
+ }
+ elsif ($type eq "datetime") {
+ print FILE ' date "YYYY-MM-DD HH24:MI:SS"';
}
+ print FILE "," if ($i < $#$columns);
+ print FILE "\n";
}
-
+ print FILE ")\n";
+ close(FILE);
&App::sub_exit() if ($App::trace);
}
@@ -391,10 +611,10 @@
#+----+-------------+-------+-------+-------------------------------------+-------------------+---------+-------------+------+-------+
#| 1 | SIMPLE | t1 | const | hotel_prop_ds_ak1,hotel_prop_ds_ie1 |
hotel_prop_ds_ak1 | 9 | const,const | 1 | |
#+----+-------------+-------+-------+-------------------------------------+-------------------+---------+-------------+------+-------+
-sub explain_sql {
+sub TBDexplain_sql {
my ($self, $sql) = @_;
my $dbh = $self->{dbh};
- # TODO: Make this work for Oracle
+ # NOTE: MySQL "explain" only works for "select".
# We convert "update" and "delete" to "select" to explain them.
if (defined $dbh) {
if ($sql =~ s/^delete/select */is) {
Modified: p5ee/trunk/App-Repository/t/DBI-import-ora.t
==============================================================================
--- p5ee/trunk/App-Repository/t/DBI-import-ora.t (original)
+++ p5ee/trunk/App-Repository/t/DBI-import-ora.t Thu Dec 4 15:16:56 2008
@@ -74,7 +74,7 @@
is($rep->get("test_person",2,"first_name"), "susan", "2nd row got in
[susan]");
is($rep->import_rows("test_person", ["age","first_name","gender","state"],
- "$t_dir/files/DBI-import.01.dat", {field_sep => "|", import_method =>
"insert"}),
+ "$t_dir/files/DBI-import.01.dat", {field_sep => "|"}),
120,
"import from file [files/DBI-import.01.dat]");
is($rep->get("test_person",3,"first_name"), "mike", "3rd row got in
[mike]");