Madhu Reddy wrote:

> Hi,
>   I have following sorting program...
> basically it will split the large files into small
> file and creates thread..each thread will sort files
> after that merge back all sorted files...
>
> this program works fine on single CPU machine...
> same program giving problem on 8 CPU machine...
>

Hi Madhu,

Is this NT Server?  If not, your problem may lie in an OS incompatibiity, since 
worksttion versions of NT will not support more than four CPUs.

It is clear from the output that the problem occurs here on line 47 [marked in 
reformatted script]:
  push(@threads, new threads(\&sort_it,$_));

It could be that the primary thread goes on to:

$_->join for(@threads);
my_print("Sorting completed, merging started\n");

Before the first sort_it thread returns.  Or the first thread moves on before the 
others finish.  I don't know the fine points of Perl threads, but I don't see anything 
explicitly telling the application to wait for all threads to complete.  The problem 
might arise when the application goes on to this line [#113]
  my @files = sort {$ref->{$a} <=> $ref->{$b}} keys %{$ref};
Since the next lines are never executed:
  my $merged_to = $files[0];
  my_print("merging : $files[0]\n");

The more I think about it, the more I think you need some explicit wait command when 
you launch the threads or before moving on to the
$_->join for(@threads);
statement.

Joseph

Somewhat easier to read copy of your application follows [Thanks to smart-tabbing 
options of
Programmers File Editor]:


BTW, you do NOT need to put the actual path in on Win32.  It gets association 
information
from the Registry.  use the standard 'nix path for compatibility:
#!/usr/bin/perl -w

#!C:\perl_5.8\bin\perl -w    #Perl translates slashes automatically.  Avoid 
backslashes.
#use strict;

use threads;
use threads::shared;

my $counter = 0;
my @tmp_files;
my @buffer;
my %bounds:shared  = ();
my $i = 0;

my $tm1 = &my_time();
my $file = 'D:\Madhu\Tmp\abi_feeder.dat';
open(FILE,$file) || die $!;
while(<FILE>){

 push(@buffer,$_);
 if(@buffer > 1000000){
    $counter++;                      #LINE 20
    my $tmp = "tmp_$counter.txt";
    open(TMP,">$tmp") || die $!;
    for(@buffer){
      print TMP $_;
    }
    close(TMP);
    push(@tmp_files,$tmp);
    @buffer = ();
  }
}

if(@buffer){

  my $tmp = "tmp_" . ++$counter . ".txt";
  open(TMP,">$tmp") || die $!;
  for(@buffer){
    print TMP $_;
  }
  push(@tmp_files,$tmp);
  close(TMP);                       #LINE 40
  @buffer = ();
}


my @threads = ();
for(@tmp_files){
  push(@threads, new threads(\&sort_it,$_));
}


$_->join for(@threads);
my_print("Sorting completed, merging started\n");

#$thrs = scalar(@threads);
#print "no of threads : $thrs\n";

#sleep(10);

my @keys = keys %bounds;
#my @vals = values %bounds;              #LINE 60

my $n_keys = scalar(@keys);
 my_print("no of keys : $n_keys\n");
 #print "vals : @vals\n";

merge_it(\%bounds);
my_print("merge completed\n");
my $tm2 = &my_time();
print "\n\n----------------Report ------------\n";
print "---------------- Sort Start : $tm1\n";
print "---------------- Sort End   : $tm2\n";
print "-----------------------------------\n";


sub sort_it{

#        my $ref = shift;
#        my $tmp = shift;

  my $chunk = shift;                       #LINE 80
  my $first = 1;
  my $tid = threads->self->tid();
  my_print("thread $tid Sorting chunk : $chunk\n");
  my @buf = ();
  open(TMP,"$chunk") || die $!;
  push (@buf, $_) while(<TMP>);
  close(TMP);
  open(TMP,">$chunk") || die $!;
  for(sort {my $fields1 = substr($a,10,10);
    my $fields2 = substr($b,10,10);
    $fields1 <=> $fields2 } @buf){
    if($first){
      {   #lock
          lock(%bounds);
          $bounds{$chunk} = substr($_,10,10);
      } # unlock
      $first = 0;
    }
    print TMP $_;
  }                       #LINE 100
  close(TMP);
#               my @keys = keys %bounds;
#                print "keys : @keys\n";
  my_print("thread $tid Sorting chunk : $chunk COMPLETED\n");
}


sub merge_it{
  my $ref = shift;
  my @files = sort {$ref->{$a} <=> $ref->{$b}} keys %{$ref};     # LINE 113
 my $merged_to = $files[0];
          my_print("merging : $files[0]\n");
  for(my $i=1; $i<@files; $i++){
    open(FIRST,$merged_to) || dir $!;
    open(SECOND,$files[$i]) || dir $!;
                                    my_print ("merging : $files[$i]\n");
    my $merged_tmp = "merged_tmp$i.txt";
    open(MERGED,">$merged_tmp") || die $!;
    my $line1 = <FIRST>;
    my $line2 = <SECOND>;                       #LINE 120
    while(1){
      if(!defined($line1) && defined($line2)){
        print MERGED $line2;
        print MERGED while(<SECOND>);
          last;
      }
      if(!defined($line2) && defined($line1)){
        print MERGED $line1;
        print MERGED while(<FIRST>);
        last;
      }
      last if(!defined($line1) && !defined($line2));
      my $value1 = substr($line1,10,10);
      my $value2 = substr($line2,10,10);
      if($value1 == $value2){
        print MERGED $line1;
        print MERGED $line2;
        $line1 = <FIRST>;
        $line2 = <SECOND>;
      }elsif($value1 > $value2){               #LINE 140
        while($value1 > $value2){
          print MERGED $line2;
          $line2 = <SECOND>;
          last unless(defined $line2);
          $value2 = substr($line2,10,10);
        }
      }else{
        while($value1 < $value2){
          print MERGED $line1;
          $line1 = <FIRST>;
          last unless(defined $line1);
          $value1 = substr($line1,10,10);
        }
      }
    }
    close(FIRST);
    close(SECOND);
    close(MERGED);
    unlink $merged_to;
    unlink $files[$i];                       #LINE 160

    $merged_to = $merged_tmp;

  }

}

sub my_print
{
  my $cur_time =  my_time();
  print  "$cur_time @_ ";

}

 sub my_time()
{
  my $time= time   ;
  my $daytime =  localtime($time)   ;
  return $daytime ;
}                                            #LINE 180



-- 
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to