Author: jbellis
Date: Tue Mar 8 04:26:06 2011
New Revision: 1079093
URL: http://svn.apache.org/viewvc?rev=1079093&view=rev
Log:
add configurable rowlimit to Pig loadfunc
patch by Matt Kennedy; reviewed by jbellis for CASSANDRA-2276
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1079093&r1=1079092&r2=1079093&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Mar 8 04:26:06 2011
@@ -9,6 +9,7 @@
a key that does not exist (CASSANDRA-2168)
* track and migrate cached pages during compaction (CASSANDRA-1902)
* add incremental_backups option (CASSANDRA-1872)
+ * add configurable row limit to Pig loadfunc (CASSANDRA-2276)
0.7.3
Modified:
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1079093&r1=1079092&r2=1079093&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
(original)
+++
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Tue Mar 8 04:26:06 2011
@@ -58,14 +58,33 @@ public class CassandraStorage extends Lo
public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
- private final static int LIMIT = 1024;
private static final Log logger =
LogFactory.getLog(CassandraStorage.class);
private Configuration conf;
private RecordReader reader;
private RecordWriter writer;
+ private final int limit;
- @Override
+ public CassandraStorage()
+ {
+ this(1024);
+ }
+
+ /**
+ * @param limit: number of rows to fetch at a time
+ */
+ public CassandraStorage(int limit)
+ {
+ super();
+ this.limit = limit;
+ }
+
+ public int getLimit()
+ {
+ return limit;
+ }
+
+ @Override
public Tuple getNext() throws IOException
{
try
@@ -167,7 +186,7 @@ public class CassandraStorage extends Lo
@Override
public void setLocation(String location, Job job) throws IOException
{
- SliceRange range = new SliceRange(BOUND, BOUND, false, LIMIT);
+ SliceRange range = new SliceRange(BOUND, BOUND, false, limit);
SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
conf = job.getConfiguration();
ConfigHelper.setInputSlicePredicate(conf, predicate);