import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import scala.Tuple3;
public class CassandraSparkConnectionTest implements Serializable{
public static void main(String[] args) throws IOException {
new CassandraSparkConnectionTest().process();
}
@SuppressWarnings({ "unchecked", "serial" })
public void process() throws IOException {
String host = "localhost";
String port = "9160";
JavaSparkContext sparkContext = new JavaSparkContext("local",
"cassandraSparkConnectionTest", System.getenv("SPARK_HOME"),
JavaSparkContext.jarOfClass(CassandraSparkConnectivity.class));
Job job = new Job();
job.setInputFormatClass(ColumnFamilyInputFormat.class);
ConfigHelper.setInputInitialAddress(job.getConfiguration(),
host);
ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
host);
ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
ConfigHelper.setInputColumnFamily(job.getConfiguration(),
"casDemo",
"Words");
//ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
"casDemo",
"WordCount");
ConfigHelper.setInputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
//ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange(toByteBuffer(""),
toByteBuffer(""),
false, 20);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
predicate);
Map<ByteBuffer, Column> valueClass = new TreeMap<ByteBuffer,
Column>();
JavaPairRDD<ByteBuffer, TreeMap<ByteBuffer, Column>> rdd =
sparkContext
.newAPIHadoopRDD(job.getConfiguration(),
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class,
valueClass.getClass());
JavaPairRDD<ByteBuffer, Column> pair = rdd.map (new
PairFunction <Tuple2
<ByteBuffer, TreeMap<ByteBuffer, Column>>, ByteBuffer, Column> () {
@Override
public Tuple2<ByteBuffer, Column> call(
Tuple2<ByteBuffer,
TreeMap<ByteBuffer, Column>> paramT)
throws Exception {
System.out.println(ByteBufferUtil.string(paramT._1()));
Set<ByteBuffer> keys = paramT._2.keySet();
for(ByteBuffer key : keys) {
System.out.println("\t" +
ByteBufferUtil.string(key));
Column col = paramT._2().get(key);
System.out.println("\t" +
ByteBufferUtil.string(col.value()));
}
return null; //Add code
}
});
pair.collect();
System.out.println("Done.");
}
public static Tuple3<String, String, String> extractKey(String s) {
Matcher m = null;
List<String> key = Collections.emptyList();
if (m.find()) {
String ip = m.group(1);
String user = m.group(3);
String query = m.group(5);
if (!user.equalsIgnoreCase("-")) {
return new Tuple3<String, String, String>(ip,
user, query);
}
}
return new Tuple3<String, String, String>(null, null, null);
}
public static ByteBuffer toByteBuffer(String value)
throws UnsupportedEncodingException {
if (value == null) {
value = "";
}
return ByteBuffer.wrap(value.getBytes("UTF-8"));
}
public static String toString(ByteBuffer buffer)
throws UnsupportedEncodingException {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes, "UTF-8");
}
}
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/JAVA-Cassanra-Test-example-tp4490.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.