#!/usr/bin/perl
use lib qw(/usr/local/cassandra/interface/gen-perl /usr/local/cassandra/interface/gen-perl/Cassandra);
use strict;

use Carp;
$SIG{ __DIE__ } = sub { Carp::confess( @_ ) };

use Cassandra;

use Thrift::Socket;
use Thrift::BinaryProtocol;
use Thrift::BufferedTransport;

use Data::Dumper;
use Time::HiRes qw( gettimeofday tv_interval );
use Getopt::Std;
my %opt;
getopts('c:', \%opt);

$opt{c} ||= 1;
	
my @servers = (
	'10.3.2.38', 
	'10.3.2.39', 
	'10.3.2.40', 
	'10.3.2.41', 
);

my $t0 = [gettimeofday];
if ( $opt{c} <= 1) {
	ProcessTest($servers[0], 0);
} else {
	for(my $i=0; $i<$opt{c};$i++) {
		my $server = $servers[ $i % (1+$#servers) ];
		if ( fork()==0 ) {
			sleep(1); #Wait to fork all childs
			ProcessTest($server, $i);
			exit(0);
		}
	}

	my $status = 0;
	my $pid;
	while( ($pid=wait())>0 ) {
		$status ||= $?>>8;
	}
	print "All flow finish; status: $status\n";
}
my $elapsed = tv_interval ( $t0, [gettimeofday]);
print sprintf("ELAPSED: %.05f sec;", $elapsed); 

sub _makeColumnList($) {
    my ($row) = @_;
					
	my @cfmap;

	foreach my $k (keys %$row) {
		push @cfmap, Cassandra::ColumnOrSuperColumn->new({
					column=>Cassandra::Column->new({
						name=>$k,
						value=>$row->{$k},
						timestamp=>time(),
					})
			});
	}
	die if $#cfmap < 0;
	return \@cfmap;
}

sub 
ProcessTest($$) {
	my ($server, $ith) = @_;

	my $transport;
	my $client;
	my $i=0;
	for(;;) {
		eval {
			if (!defined $client) {
				my $socket = Thrift::Socket->new($server, 9160);
	   				$socket->setSendTimeout(100000);
	  				 $socket->setRecvTimeout(500000);
				$transport =  Thrift::BufferedTransport->new($socket, 1024, 1024);
				my $protocol = Thrift::BinaryProtocol->new($transport);
				$client = Cassandra::CassandraClient->new($protocol);

				$transport->open();
			}

			my $PID = sprintf("%040lld", int(1000000 * rand()));
			#$client->batch_insert(
			#	'Keyspace1',
			#	$PID,
			#	{
			#		'Standard1' => _makeColumnList ({
			#			map {
			#				$_=>'0'x(int(1 + 100 * rand()))
			#			} (0..int(1+10*rand()))
			#		})
			#	},
			#	Cassandra::ConsistencyLevel::ONE
			#);

			my $PPID = sprintf("%040lld", int(1000000 * rand()));
			$client->insert(
				'Keyspace1',
				$PID,
				Cassandra::ColumnPath->new({
					column_family=>'Super1',
					super_column=>'x',
					column=>$PPID,
				}),
				rand(),
				time(),
				Cassandra::ConsistencyLevel::ONE
			);

			$i++;
			print "$i records by $ith child\n" if ($i % 1000 == 0);
		};

		if ($@) {
			print Dumper($@);
			$transport->close();	
			undef $client;
		}
	}
	
}
