#!/usr/bin/perl
use lib qw(/usr/local/cassandra/interface/gen-perl /usr/local/cassandra/interface/gen-perl/Cassandra);
use strict;

use Carp;
$SIG{ __DIE__ } = sub { Carp::confess( @_ ) };

use Cassandra;

use Thrift::Socket;
use Thrift::BinaryProtocol;
use Thrift::BufferedTransport;

use Data::Dumper;
use Time::HiRes qw( gettimeofday tv_interval );
use Getopt::Std;
my %opt;
getopts('c:', \%opt);

$opt{c} ||= 1;
	
my @servers = (
	'10.3.2.38', 
	'10.3.2.39', 
	'10.3.2.40', 
	'10.3.2.41', 
);

my $t0 = [gettimeofday];
if ( $opt{c} <= 1) {
	ProcessTest($servers[0], 0);
} else {
	for(my $i=0; $i<$opt{c};$i++) {
		my $server = $servers[ $i % (1+$#servers) ];
		if ( fork()==0 ) {
			sleep(1); #Wait to fork all childs
			ProcessTest($server, $i);
			exit(0);
		}
	}

	my $status = 0;
	my $pid;
	while( ($pid=wait())>0 ) {
		$status ||= $?>>8;
	}
	print "All flow finish; status: $status\n";
}
my $elapsed = tv_interval ( $t0, [gettimeofday]);
print sprintf("ELAPSED: %.05f sec;", $elapsed); 

sub 
ProcessTest($$) {
	my ($server, $ith) = @_;

	my $transport;
	my $client;
	my $i=0;
	my $PID;
	for(;;) {
		$PID = ''; 
		eval {
			if (!defined $client) {
				my $socket = Thrift::Socket->new($server, 9160);
	   				$socket->setSendTimeout(1000);
	  				 $socket->setRecvTimeout(5000);
				$transport =  Thrift::BufferedTransport->new($socket, 1024, 1024);
				my $protocol = Thrift::BinaryProtocol->new($transport);
				$client = Cassandra::CassandraClient->new($protocol);

				$transport->open();
			}

			$PID = sprintf("%040lld", int(1000000 * rand()));
			my $cols = $client->get_slice(
							'Keyspace1',
							$PID,
							Cassandra::ColumnParent->new({
								column_family=>'Super1',
								super_column=>'x'
							}),
							Cassandra::SlicePredicate->new({
								#column_names=>['0000000000000000000001'],
									slice_range=>Cassandra::SliceRange->new({
									start=>  '1',
									finish=> '0',
									reversed=>1,
									count=>2
								})
							}),
							Cassandra::ConsistencyLevel::ONE
						);

			#print "KEY $PID ; ".Dumper($cols) if (ref($cols) && $#$cols >= 0);
			$i++;
			print "$i records by $ith child\n" if ($i % 1000 == 0);
		};

		if ($@) {
			print "KEY $PID ; ".Dumper($@);
			$transport->close();	
			undef $client;
		}
	}
	
}
