Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 922f5af10 -> 3b708f998
Add CqlRecordReader to take advantage of native CQL pagination patch by alexliu68; reviewed by pkolaczk for CASSANDRA-6311 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b708f99 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b708f99 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b708f99 Branch: refs/heads/cassandra-2.0 Commit: 3b708f9989274cbe9e0e2a5fda6f1d0a3d96ebee Parents: 922f5af Author: Sylvain Lebresne <[email protected]> Authored: Fri Mar 21 15:25:09 2014 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Fri Mar 21 15:26:41 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 4 +- examples/hadoop_cql3_word_count/bin/word_count | 3 +- .../bin/word_count_counters | 4 +- .../hadoop_cql3_word_count/src/WordCount.java | 77 ++- .../src/WordCountCounters.java | 54 +- .../cassandra/hadoop/cql3/CqlConfigHelper.java | 541 ++++++++++++++++++- .../cassandra/hadoop/cql3/CqlInputFormat.java | 80 +++ .../hadoop/cql3/CqlPagingRecordReader.java | 2 +- .../cassandra/hadoop/cql3/CqlRecordReader.java | 260 +++++++++ 10 files changed, 996 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f3f16e8..31fd319 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -24,6 +24,7 @@ * Improve PerRowSecondaryIndex performance (CASSANDRA-6876) * Extend triggers to support CAS updates (CASSANDRA-6882) * Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873) + * Add CqlRecordReader to take advantage of native CQL pagination (CASSANDRA-6311) Merged from 1.2: * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816) * add extra SSL cipher suites (CASSANDRA-6613) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 464dece..a15415b 100644 --- a/build.xml +++ b/build.xml @@ -380,6 +380,7 @@ <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" /> <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" /> <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" /> + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" /> <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" /> </dependencyManagement> <developer id="alakshman" name="Avinash Lakshman"/> @@ -410,7 +411,7 @@ <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/> <dependency groupId="org.apache.pig" artifactId="pig"/> - + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/> <dependency groupId="net.java.dev.jna" artifactId="jna"/> </artifact:pom> @@ -473,6 +474,7 @@ <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/> <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/> + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/> <!-- don't need jna to run, but nice to have --> <dependency groupId="net.java.dev.jna" artifactId="jna" optional="true"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/examples/hadoop_cql3_word_count/bin/word_count ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/bin/word_count b/examples/hadoop_cql3_word_count/bin/word_count index a0c5aa0..974a39a 100644 --- a/examples/hadoop_cql3_word_count/bin/word_count +++ b/examples/hadoop_cql3_word_count/bin/word_count @@ -56,6 +56,7 @@ if [ "x$JAVA" = "x" ]; then fi OUTPUT_REDUCER=cassandra +INPUT_MAPPER=native #echo $CLASSPATH -$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER +$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER input_mapper=$INPUT_MAPPER http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/examples/hadoop_cql3_word_count/bin/word_count_counters ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters index 7793477..0b69b40 100644 --- a/examples/hadoop_cql3_word_count/bin/word_count_counters +++ b/examples/hadoop_cql3_word_count/bin/word_count_counters @@ -54,5 +54,7 @@ if [ "x$JAVA" = "x" ]; then exit 1 fi +INPUT_MAPPER=native + #echo $CLASSPATH -$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters +$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters input_mapper=$INPUT_MAPPER http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/examples/hadoop_cql3_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java index bc81a53..519a98f 100644 --- a/examples/hadoop_cql3_word_count/src/WordCount.java +++ b/examples/hadoop_cql3_word_count/src/WordCount.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat; +import org.apache.cassandra.hadoop.cql3.CqlInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.conf.Configuration; @@ -37,10 +38,11 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; - +import com.datastax.driver.core.Row; import java.nio.charset.CharacterCodingException; /** @@ -60,7 +62,7 @@ import java.nio.charset.CharacterCodingException; public class WordCount extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(WordCount.class); - + static final String INPUT_MAPPER_VAR = "input_mapper"; static final String KEYSPACE = "cql3_worldcount"; static final String COLUMN_FAMILY = "inputs"; @@ -68,7 +70,6 @@ public class WordCount extends Configured implements Tool static final String OUTPUT_COLUMN_FAMILY = "output_words"; private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count"; - private static final String PRIMARY_KEY = "row_key"; public static void main(String[] args) throws Exception @@ -108,6 +109,30 @@ public class WordCount extends Configured implements Tool } } + public static class NativeTokenizerMapper extends Mapper<Long, Row, Text, IntWritable> + { + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + private ByteBuffer sourceColumn; + + protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) + throws IOException, InterruptedException + { + } + + public void map(Long key, Row row, Context context) throws IOException, InterruptedException + { + String value = row.getString("line"); + logger.debug("read {}:{}={} from {}", new Object[] {key, "line", value, context.getInputSplit()}); + StringTokenizer itr = new StringTokenizer(value); + while (itr.hasMoreTokens()) + { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } + public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException @@ -149,17 +174,41 @@ public class WordCount extends Configured implements Tool public int run(String[] args) throws Exception { String outputReducerType = "filesystem"; - if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR)) + String inputMapperType = "native"; + String outputReducer = null; + String inputMapper = null; + + if (args != null) { - String[] s = args[0].split("="); + if(args[0].startsWith(OUTPUT_REDUCER_VAR)) + outputReducer = args[0]; + if(args[0].startsWith(INPUT_MAPPER_VAR)) + inputMapper = args[0]; + + if (args.length == 2) + { + if(args[1].startsWith(OUTPUT_REDUCER_VAR)) + outputReducer = args[1]; + if(args[1].startsWith(INPUT_MAPPER_VAR)) + inputMapper = args[1]; + } + } + + if (outputReducer != null) + { + String[] s = outputReducer.split("="); if (s != null && s.length == 2) outputReducerType = s[1]; } logger.info("output reducer type: " + outputReducerType); - + if (inputMapper != null) + { + String[] s = inputMapper.split("="); + if (s != null && s.length == 2) + inputMapperType = s[1]; + } Job job = new Job(getConf(), "wordcount"); job.setJarByClass(WordCount.class); - job.setMapperClass(TokenizerMapper.class); if (outputReducerType.equalsIgnoreCase("filesystem")) { @@ -189,9 +238,19 @@ public class WordCount extends Configured implements Tool ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); } - job.setInputFormatClass(CqlPagingInputFormat.class); + if (inputMapperType.equalsIgnoreCase("native")) + { + job.setMapperClass(NativeTokenizerMapper.class); + job.setInputFormatClass(CqlInputFormat.class); + CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + COLUMN_FAMILY + " where token(id) > ? and token(id) <= ? allow filtering"); + } + else + { + job.setMapperClass(TokenizerMapper.class); + job.setInputFormatClass(CqlPagingInputFormat.class); + ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); + } - ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/examples/hadoop_cql3_word_count/src/WordCountCounters.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java index 542a473..74de9ab 100644 --- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java +++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat; +import org.apache.cassandra.hadoop.cql3.CqlInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -37,7 +38,7 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; - +import com.datastax.driver.core.Row; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.utils.ByteBufferUtil; @@ -51,6 +52,7 @@ public class WordCountCounters extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class); + static final String INPUT_MAPPER_VAR = "input_mapper"; static final String COUNTER_COLUMN_FAMILY = "input_words_count"; private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count_counters"; @@ -61,6 +63,24 @@ public class WordCountCounters extends Configured implements Tool System.exit(0); } + public static class SumNativeMapper extends Mapper<Long, Row, Text, LongWritable> + { + long sum = -1; + public void map(Long key, Row row, Context context) throws IOException, InterruptedException + { + if (sum < 0) + sum = 0; + + logger.debug("read " + key + ":count_num from " + context.getInputSplit()); + sum += Long.valueOf(row.getString("count_num")); + } + + protected void cleanup(Context context) throws IOException, InterruptedException { + if (sum > 0) + context.write(new Text("total_count"), new LongWritable(sum)); + } + } + public static class SumMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, LongWritable> { long sum = -1; @@ -95,7 +115,6 @@ public class WordCountCounters extends Configured implements Tool } } - public static class ReducerToFilesystem extends Reducer<Text, LongWritable, Text, LongWritable> { long sum = 0; @@ -110,25 +129,40 @@ public class WordCountCounters extends Configured implements Tool public int run(String[] args) throws Exception { + String inputMapperType = "native"; + if (args != null && args[0].startsWith(INPUT_MAPPER_VAR)) + { + String[] arg0 = args[0].split("="); + if (arg0 != null && arg0.length == 2) + inputMapperType = arg0[1]; + } Job job = new Job(getConf(), "wordcountcounters"); - job.setJarByClass(WordCountCounters.class); - job.setMapperClass(SumMapper.class); job.setCombinerClass(ReducerToFilesystem.class); job.setReducerClass(ReducerToFilesystem.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX)); - - job.setInputFormatClass(CqlPagingInputFormat.class); + job.setJarByClass(WordCountCounters.class); - ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY); CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3"); + if ("native".equals(inputMapperType)) + { + job.setMapperClass(SumNativeMapper.class); + job.setInputFormatClass(CqlInputFormat.class); + CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + WordCount.OUTPUT_COLUMN_FAMILY + " where token(word) > ? and token(word) <= ? allow filtering"); + } + else + { + job.setMapperClass(SumMapper.class); + job.setInputFormatClass(CqlPagingInputFormat.class); + ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); + } + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX)); job.waitForCompletion(true); return 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index cb61d05..3672c84 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -19,13 +19,69 @@ package org.apache.cassandra.hadoop.cql3; * under the License. * */ +import java.io.FileInputStream; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.hadoop.conf.Configuration; +import com.datastax.driver.core.AuthProvider; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.ProtocolOptions; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.google.common.base.Optional; +import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; + public class CqlConfigHelper { private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon , private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size"; private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause"; + private static final String INPUT_CQL = "cassandra.input.cql"; + + private static final String INPUT_NATIVE_PORT = "cassandra.input.native.port"; + private static final String INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST = "cassandra.input.native.core.connections.per.host"; + private static final String INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST = "cassandra.input.native.max.connections.per.host"; + private static final String INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.min.simult.reqs.per.connection"; + private static final String INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.max.simult.reqs.per.connection"; + private static final String INPUT_NATIVE_CONNECTION_TIMEOUT = "cassandra.input.native.connection.timeout"; + private static final String INPUT_NATIVE_READ_CONNECTION_TIMEOUT = "cassandra.input.native.read.connection.timeout"; + private static final String INPUT_NATIVE_RECEIVE_BUFFER_SIZE = "cassandra.input.native.receive.buffer.size"; + private static final String INPUT_NATIVE_SEND_BUFFER_SIZE = "cassandra.input.native.send.buffer.size"; + private static final String INPUT_NATIVE_SOLINGER = "cassandra.input.native.solinger"; + private static final String INPUT_NATIVE_TCP_NODELAY = "cassandra.input.native.tcp.nodelay"; + private static final String INPUT_NATIVE_REUSE_ADDRESS = "cassandra.input.native.reuse.address"; + private static final String INPUT_NATIVE_KEEP_ALIVE = "cassandra.input.native.keep.alive"; + private static final String INPUT_NATIVE_AUTH_PROVIDER = "cassandra.input.native.auth.provider"; + private static final String INPUT_NATIVE_SSL_TRUST_STORE_PATH = "cassandra.input.native.ssl.trust.store.path"; + private static final String INPUT_NATIVE_SSL_KEY_STORE_PATH = "cassandra.input.native.ssl.key.store.path"; + private static final String INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD = "cassandra.input.native.ssl.trust.store.password"; + private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password"; + private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites"; + private static final String OUTPUT_CQL = "cassandra.output.cql"; /** @@ -85,25 +141,496 @@ public class CqlConfigHelper conf.set(OUTPUT_CQL, cql); } - - + + public static void setInputCql(Configuration conf, String cql) + { + if (cql == null || cql.isEmpty()) + return; + + conf.set(INPUT_CQL, cql); + } + + public static Optional<Integer> getInputCoreConnections(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, conf); + } + + public static Optional<Integer> getInputMaxConnections(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, conf); + } + + public static int getInputNativePort(Configuration conf) + { + return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042")); + } + + public static Optional<Integer> getInputMinSimultReqPerConnections(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf); + } + + public static Optional<Integer> getInputMaxSimultReqPerConnections(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, conf); + } + + public static Optional<Integer> getInputNativeConnectionTimeout(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_CONNECTION_TIMEOUT, conf); + } + + public static Optional<Integer> getInputNativeReadConnectionTimeout(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, conf); + } + + public static Optional<Integer> getInputNativeReceiveBufferSize(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, conf); + } + + public static Optional<Integer> getInputNativeSendBufferSize(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_SEND_BUFFER_SIZE, conf); + } + + public static Optional<Integer> getInputNativeSolinger(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_SOLINGER, conf); + } + + public static Optional<Boolean> getInputNativeTcpNodelay(Configuration conf) + { + return getBooleanSetting(INPUT_NATIVE_TCP_NODELAY, conf); + } + + public static Optional<Boolean> getInputNativeReuseAddress(Configuration conf) + { + return getBooleanSetting(INPUT_NATIVE_REUSE_ADDRESS, conf); + } + + public static Optional<String> getInputNativeAuthProvider(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_AUTH_PROVIDER, conf); + } + + public static Optional<String> getInputNativeSSLTruststorePath(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PATH, conf); + } + + public static Optional<String> getInputNativeSSLKeystorePath(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PATH, conf); + } + + public static Optional<String> getInputNativeSSLKeystorePassword(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, conf); + } + + public static Optional<String> getInputNativeSSLTruststorePassword(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, conf); + } + + public static Optional<String> getInputNativeSSLCipherSuites(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_CIPHER_SUITES, conf); + } + + public static Optional<Boolean> getInputNativeKeepAlive(Configuration conf) + { + return getBooleanSetting(INPUT_NATIVE_KEEP_ALIVE, conf); + } + public static String getInputcolumns(Configuration conf) { return conf.get(INPUT_CQL_COLUMNS_CONFIG); } - - public static String getInputPageRowSize(Configuration conf) + + public static Optional<Integer> getInputPageRowSize(Configuration conf) { - return conf.get(INPUT_CQL_PAGE_ROW_SIZE_CONFIG); + return getIntSetting(INPUT_CQL_PAGE_ROW_SIZE_CONFIG, conf); } - + public static String getInputWhereClauses(Configuration conf) { return conf.get(INPUT_CQL_WHERE_CLAUSE_CONFIG); } - + + public static String getInputCql(Configuration conf) + { + return conf.get(INPUT_CQL); + } + public static String getOutputCql(Configuration conf) { return conf.get(OUTPUT_CQL); } + + public static Cluster getInputCluster(String host, Configuration conf) + { + int port = getInputNativePort(conf); + Optional<AuthProvider> authProvider = getAuthProvider(conf); + Optional<SSLOptions> sslOptions = getSSLOptions(conf); + LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host); + SocketOptions socketOptions = getReadSocketOptions(conf); + QueryOptions queryOptions = getReadQueryOptions(conf); + PoolingOptions poolingOptions = getReadPoolingOptions(conf); + + Cluster.Builder builder = Cluster.builder() + .addContactPoint(host) + .withPort(port) + .withCompression(ProtocolOptions.Compression.NONE); + + if (authProvider.isPresent()) + builder.withAuthProvider(authProvider.get()); + if (sslOptions.isPresent()) + builder.withSSL(sslOptions.get()); + + builder.withLoadBalancingPolicy(loadBalancingPolicy) + .withSocketOptions(socketOptions) + .withQueryOptions(queryOptions) + .withPoolingOptions(poolingOptions); + + return builder.build(); + } + + public static void setInputCoreConnections(Configuration conf, String connections) + { + conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections); + } + + public static void setInputMaxConnections(Configuration conf, String connections) + { + conf.set(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, connections); + } + + public static void setInputMinSimultReqPerConnections(Configuration conf, String reqs) + { + conf.set(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, reqs); + } + + public static void setInputMaxSimultReqPerConnections(Configuration conf, String reqs) + { + conf.set(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, reqs); + } + + public static void setInputNativeConnectionTimeout(Configuration conf, String timeout) + { + conf.set(INPUT_NATIVE_CONNECTION_TIMEOUT, timeout); + } + + public static void setInputNativeReadConnectionTimeout(Configuration conf, String timeout) + { + conf.set(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, timeout); + } + + public static void setInputNativeReceiveBufferSize(Configuration conf, String size) + { + conf.set(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, size); + } + + public static void setInputNativeSendBufferSize(Configuration conf, String size) + { + conf.set(INPUT_NATIVE_SEND_BUFFER_SIZE, size); + } + + public static void setInputNativeSolinger(Configuration conf, String solinger) + { + conf.set(INPUT_NATIVE_SOLINGER, solinger); + } + + public static void setInputNativeTcpNodelay(Configuration conf, String tcpNodelay) + { + conf.set(INPUT_NATIVE_TCP_NODELAY, tcpNodelay); + } + + public static void setInputNativeAuthProvider(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_AUTH_PROVIDER, authProvider); + } + + public static void setInputNativeSSLTruststorePath(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PATH, authProvider); + } + + public static void setInputNativeSSLKeystorePath(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_KEY_STORE_PATH, authProvider); + } + + public static void setInputNativeSSLKeystorePassword(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, authProvider); + } + + public static void setInputNativeSSLTruststorePassword(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, authProvider); + } + + public static void setInputNativeSSLCipherSuites(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_CIPHER_SUITES, authProvider); + } + + public static void setInputNativeReuseAddress(Configuration conf, String reuseAddress) + { + conf.set(INPUT_NATIVE_REUSE_ADDRESS, reuseAddress); + } + + public static void setInputNativeKeepAlive(Configuration conf, String keepAlive) + { + conf.set(INPUT_NATIVE_KEEP_ALIVE, keepAlive); + } + + public static void setInputNativePort(Configuration conf, String port) + { + conf.set(INPUT_NATIVE_PORT, port); + } + + private static PoolingOptions getReadPoolingOptions(Configuration conf) + { + Optional<Integer> coreConnections = getInputCoreConnections(conf); + Optional<Integer> maxConnections = getInputMaxConnections(conf); + Optional<Integer> maxSimultaneousRequests = getInputMaxSimultReqPerConnections(conf); + Optional<Integer> minSimultaneousRequests = getInputMinSimultReqPerConnections(conf); + + PoolingOptions poolingOptions = new PoolingOptions(); + + if (coreConnections.isPresent()) + poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnections.get()); + if (maxConnections.isPresent()) + poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnections.get()); + if (maxSimultaneousRequests.isPresent()) + poolingOptions.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, maxSimultaneousRequests.get()); + if (minSimultaneousRequests.isPresent()) + poolingOptions.setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, minSimultaneousRequests.get()); + + poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, 0) + .setMaxConnectionsPerHost(HostDistance.REMOTE, 0) + .setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0) + .setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0); + + return poolingOptions; + } + + private static QueryOptions getReadQueryOptions(Configuration conf) + { + String CL = ConfigHelper.getReadConsistencyLevel(conf); + Optional<Integer> fetchSize = getInputPageRowSize(conf); + QueryOptions queryOptions = new QueryOptions(); + if (CL != null && !CL.isEmpty()) + queryOptions.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.valueOf(CL)); + + if (fetchSize.isPresent()) + queryOptions.setFetchSize(fetchSize.get()); + return queryOptions; + } + + private static SocketOptions getReadSocketOptions(Configuration conf) + { + SocketOptions socketOptions = new SocketOptions(); + Optional<Integer> connectTimeoutMillis = getInputNativeConnectionTimeout(conf); + Optional<Integer> readTimeoutMillis = getInputNativeReadConnectionTimeout(conf); + Optional<Integer> receiveBufferSize = getInputNativeReceiveBufferSize(conf); + Optional<Integer> sendBufferSize = getInputNativeSendBufferSize(conf); + Optional<Integer> soLinger = getInputNativeSolinger(conf); + Optional<Boolean> tcpNoDelay = getInputNativeTcpNodelay(conf); + Optional<Boolean> reuseAddress = getInputNativeReuseAddress(conf); + Optional<Boolean> keepAlive = getInputNativeKeepAlive(conf); + + if (connectTimeoutMillis.isPresent()) + socketOptions.setConnectTimeoutMillis(connectTimeoutMillis.get()); + if (readTimeoutMillis.isPresent()) + socketOptions.setReadTimeoutMillis(readTimeoutMillis.get()); + if (receiveBufferSize.isPresent()) + socketOptions.setReceiveBufferSize(receiveBufferSize.get()); + if (sendBufferSize.isPresent()) + socketOptions.setSendBufferSize(sendBufferSize.get()); + if (soLinger.isPresent()) + socketOptions.setSoLinger(soLinger.get()); + if (tcpNoDelay.isPresent()) + socketOptions.setTcpNoDelay(tcpNoDelay.get()); + if (reuseAddress.isPresent()) + socketOptions.setReuseAddress(reuseAddress.get()); + if (keepAlive.isPresent()) + socketOptions.setKeepAlive(keepAlive.get()); + + return socketOptions; + } + + private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost) + { + return new LoadBalancingPolicy() + { + private Host origHost; + private Set<Host> liveRemoteHosts = Sets.newHashSet(); + + @Override + public void onAdd(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + origHost = host; + } + + @Override + public void onDown(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + origHost = null; + liveRemoteHosts.remove(host); + } + + @Override + public void onRemove(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + origHost = null; + liveRemoteHosts.remove(host); + } + + @Override + public void onUp(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + origHost = host; + liveRemoteHosts.add(host); + } + + @Override + public HostDistance distance(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + return HostDistance.LOCAL; + else + return HostDistance.REMOTE; + } + + @Override + public void init(Cluster cluster, Collection<Host> hosts) + { + for (Host host : hosts) + { + if (host.getAddress().getHostName().equals(stickHost)) + { + origHost = host; + break; + } + } + } + + @Override + public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) + { + if (origHost != null) + { + return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator()); + } + else + { + return liveRemoteHosts.iterator(); + } + } + }; + } + + private static Optional<AuthProvider> getAuthProvider(Configuration conf) + { + Optional<String> authProvider = getInputNativeAuthProvider(conf); + if (!authProvider.isPresent()) + return Optional.absent(); + + return Optional.of(getClientAuthProvider(authProvider.get())); + } + + private static Optional<SSLOptions> getSSLOptions(Configuration conf) + { + Optional<String> truststorePath = getInputNativeSSLTruststorePath(conf); + Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf); + Optional<String> truststorePassword = getInputNativeSSLTruststorePassword(conf); + Optional<String> keystorePassword = getInputNativeSSLKeystorePassword(conf); + Optional<String> cipherSuites = getInputNativeSSLCipherSuites(conf); + + if (truststorePath.isPresent() && keystorePath.isPresent() && truststorePassword.isPresent() && keystorePassword.isPresent()) + { + SSLContext context; + try + { + context = getSSLContext(truststorePath.get(), truststorePassword.get(), keystorePath.get(), keystorePassword.get()); + } + catch (UnrecoverableKeyException | KeyManagementException | + NoSuchAlgorithmException | KeyStoreException | CertificateException | IOException e) + { + throw new RuntimeException(e); + } + String[] css = SSLOptions.DEFAULT_SSL_CIPHER_SUITES; + if (cipherSuites.isPresent()) + css = cipherSuites.get().split(","); + return Optional.of(new SSLOptions(context,css)); + } + return Optional.absent(); + } + + private static Optional<Integer> getIntSetting(String parameter, Configuration conf) + { + String setting = conf.get(parameter); + if (setting == null) + return Optional.absent(); + return Optional.of(Integer.parseInt(setting)); + } + + private static Optional<Boolean> getBooleanSetting(String parameter, Configuration conf) + { + String setting = conf.get(parameter); + if (setting == null) + return Optional.absent(); + return Optional.of(Boolean.parseBoolean(setting)); + } + + private static Optional<String> getStringSetting(String parameter, Configuration conf) + { + String setting = conf.get(parameter); + if (setting == null) + return Optional.absent(); + return Optional.of(setting); + } + + private static AuthProvider getClientAuthProvider(String factoryClassName) + { + try + { + return (AuthProvider) Class.forName(factoryClassName).newInstance(); + } + catch (Exception e) + { + throw new RuntimeException("Failed to instantiate auth provider:" + factoryClassName, e); + } + } + + private static SSLContext getSSLContext(String truststorePath, String truststorePassword, String keystorePath, String keystorePassword) + throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException + { + + FileInputStream tsf = new FileInputStream(truststorePath); + FileInputStream ksf = new FileInputStream(keystorePath); + SSLContext ctx = SSLContext.getInstance("SSL"); + + KeyStore ts = KeyStore.getInstance("JKS"); + ts.load(tsf, truststorePassword.toCharArray()); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(ksf, keystorePassword.toCharArray()); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, keystorePassword.toCharArray()); + + ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + return ctx; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java new file mode 100644 index 0000000..e1cdf32 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop.cql3; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +import com.datastax.driver.core.Row; + +/** + * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. + * + * At minimum, you need to set the KS and CF in your Hadoop job Configuration. + * The ConfigHelper class is provided to make this + * simple: + * ConfigHelper.setInputColumnFamily + * + * You can also configure the number of rows per InputSplit with + * ConfigHelper.setInputSplitSize. The default split size is 64k rows. + * + * the number of CQL rows per page + * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You + * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL + * query, so you need set it big enough to minimize the network overhead, and also + * not too big to avoid out of memory issue. + * + * other native protocol connection parameters in CqlConfigHelper + */ +public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row> +{ + public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter) + throws IOException + { + TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID))) + { + @Override + public void progress() + { + reporter.progress(); + } + }; + + CqlRecordReader recordReader = new CqlRecordReader(); + recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); + return recordReader; + } + + @Override + public org.apache.hadoop.mapreduce.RecordReader<Long, Row> createRecordReader( + org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException, + InterruptedException + { + return new CqlRecordReader(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java index cee4b4b..b692280 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java @@ -117,7 +117,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, try { - pageRowSize = Integer.parseInt(CqlConfigHelper.getInputPageRowSize(conf)); + pageRowSize = CqlConfigHelper.getInputPageRowSize(conf).get(); } catch (NumberFormatException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b708f99/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java new file mode 100644 index 0000000..a19cf70 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop.cql3; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.utils.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +/** + * CqlRecordReader reads the rows return from the CQL query + * It uses CQL auto-paging. + * <p/> + * Return a Long as a local CQL row key starts from 0; + * <p/> + * Row as C* java driver CQL result set row + * 1) select clause must include partition key columns (to calculate the progress based on the actual CF row processed) + * 2) where clause must include token(partition_key1, ... , partition_keyn) > ? and + * token(partition_key1, ... , partition_keyn) <= ? (in the right order) + */ +public class CqlRecordReader extends RecordReader<Long, Row> + implements org.apache.hadoop.mapred.RecordReader<Long, Row> +{ + private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class); + + private ColumnFamilySplit split; + private RowIterator rowIterator; + + private Pair<Long, Row> currentRow; + private int totalRowCount; // total number of rows to fetch + private String keyspace; + private String cfName; + private String cqlQuery; + private Cluster cluster; + private Session session; + private IPartitioner partitioner; + + // partition keys -- key aliases + private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap(); + + public CqlRecordReader() + { + super(); + } + + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException + { + this.split = (ColumnFamilySplit) split; + Configuration conf = context.getConfiguration(); + totalRowCount = (this.split.getLength() < Long.MAX_VALUE) + ? (int) this.split.getLength() + : ConfigHelper.getInputSplitSize(conf); + cfName = ConfigHelper.getInputColumnFamily(conf); + keyspace = ConfigHelper.getInputKeyspace(conf); + cqlQuery = CqlConfigHelper.getInputCql(conf); + partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); + try + { + if (cluster != null) + return; + + // create connection using thrift + String[] locations = split.getLocations(); + Exception lastException = null; + for (String location : locations) + { + try + { + cluster = CqlConfigHelper.getInputCluster(location, conf); + break; + } + catch (Exception e) + { + lastException = e; + logger.warn("Failed to create authenticated client to {}", location); + } + } + if (cluster == null && lastException != null) + throw lastException; + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + session = cluster.connect(keyspace); + rowIterator = new RowIterator(); + logger.debug("created {}", rowIterator); + } + + public void close() + { + if (session != null) + session.close(); + } + + public Long getCurrentKey() + { + return currentRow.left; + } + + public Row getCurrentValue() + { + return currentRow.right; + } + + public float getProgress() + { + if (!rowIterator.hasNext()) + return 1.0F; + + // the progress is likely to be reported slightly off the actual but close enough + float progress = ((float) rowIterator.totalRead / totalRowCount); + return progress > 1.0F ? 1.0F : progress; + } + + public boolean nextKeyValue() throws IOException + { + if (!rowIterator.hasNext()) + { + logger.debug("Finished scanning {} rows (estimate was: {})", rowIterator.totalRead, totalRowCount); + return false; + } + + try + { + currentRow = rowIterator.next(); + } + catch (Exception e) + { + // throw it as IOException, so client can catch it and handle it at client side + IOException ioe = new IOException(e.getMessage()); + ioe.initCause(ioe.getCause()); + throw ioe; + } + return true; + } + + // Because the old Hadoop API wants us to write to the key and value + // and the new asks for them, we need to copy the output of the new API + // to the old. Thus, expect a small performance hit. + // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat + // and ColumnFamilyRecordReader don't support them, it should be fine for now. + public boolean next(Long key, Row value) throws IOException + { + if (nextKeyValue()) + { + key = getCurrentKey(); + value = getCurrentValue(); + return true; + } + return false; + } + + public long getPos() throws IOException + { + return (long) rowIterator.totalRead; + } + + public Long createKey() + { + return null; + } + + public Row createValue() + { + return null; + } + + /** CQL row iterator + * Input cql query + * 1) select clause must include key columns (if we use partition key based row count) + * 2) where clause must include token(partition_key1 ... partition_keyn) > ? and + * token(partition_key1 ... partition_keyn) <= ? + */ + private class RowIterator extends AbstractIterator<Pair<Long, Row>> + { + private long keyId = 0L; + protected int totalRead = 0; // total number of cf rows read + protected Iterator<Row> rows; + private Map<String, ByteBuffer> previousRowKey = new HashMap<String, ByteBuffer>(); // previous CF row key + + public RowIterator() + { + AbstractType type = partitioner.getTokenValidator(); + ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) ); + for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey()) + partitionBoundColumns.put(meta.getName(), Boolean.TRUE); + rows = rs.iterator(); + } + + protected Pair<Long, Row> computeNext() + { + if (rows == null || !rows.hasNext()) + return endOfData(); + + Row row = rows.next(); + Map<String, ByteBuffer> keyColumns = new HashMap<String, ByteBuffer>(); + for (String column : partitionBoundColumns.keySet()) + keyColumns.put(column, row.getBytesUnsafe(column)); + + // increase total CF row read + if (previousRowKey.isEmpty() && !keyColumns.isEmpty()) + { + previousRowKey = keyColumns; + totalRead++; + } + else + { + for (String column : partitionBoundColumns.keySet()) + { + if (BytesType.bytesCompare(keyColumns.get(column), previousRowKey.get(column)) != 0) + { + previousRowKey = keyColumns; + totalRead++; + break; + } + } + } + keyId ++; + return Pair.create(keyId, row); + } + } +}
