Author: daijy
Date: Fri Apr 24 21:09:37 2015
New Revision: 1675948
URL: http://svn.apache.org/r1675948
Log:
PIG-4511: Add columns to prune from PluckTuple
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/PluckTuple.java
pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1675948&r1=1675947&r2=1675948&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 24 21:09:37 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4511: Add columns to prune from PluckTuple (jbabcock via daijy)
+
PIG-4434: Improve auto-parallelism for tez (daijy)
PIG-4495: Better multi-query planning in case of multiple edges (rohini)
Modified: pig/trunk/src/org/apache/pig/builtin/PluckTuple.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PluckTuple.java?rev=1675948&r1=1675947&r2=1675948&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PluckTuple.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PluckTuple.java Fri Apr 24 21:09:37
2015
@@ -34,10 +34,10 @@ import com.google.common.collect.Lists;
/**
* This is a UDF which allows the user to specify a string prefix, and then
* filter for the columns in a relation that begin with that prefix.
+ *
+ * Additional arguments to this udf are columns to exclude from the relation
matching this prefix (assuming this column is the end of the alias: e.g., if
choose to exclude column y then exclude a::b::y using PluckTuple('a::','y'))
*
* Example:
- *
- * 1) Prefix
* a = load 'a' as (x, y);
* b = load 'b' as (x, y);
* c = join a by x, b by x;
@@ -47,29 +47,28 @@ import com.google.common.collect.Lists;
* c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray}
* describe d;
* d: {plucked::a::x: bytearray,plucked::a::y: bytearray}
- *
- * 2) Regex
- * a = load 'a' as (x, y);
- * b = load 'b' as (x, y);
- * c = join a by x, b by x;
- * DEFINE pluck PluckTuple('.*::y');
- * d = foreach c generate FLATTEN(pluck(*));
- * describe c;
- * c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray}
- * describe d;
- * d: {plucked::a::y: bytearray,plucked::a::y: bytearray}
*/
public class PluckTuple extends EvalFunc<Tuple> {
private static final TupleFactory mTupleFactory =
TupleFactory.getInstance();
- private static Pattern pattern;
+ private static Pattern prefixPattern;
private boolean isInitialized = false;
private int[] indicesToInclude;
private String prefix;
+ private List<String> columnsToExclude = Lists.newArrayList();
- public PluckTuple(String prefix) {
- this.prefix = prefix;
- pattern = Pattern.compile(prefix);
+ public PluckTuple(String...args) {
+ int i = 0;
+ for (String arg : args) {
+ if (i == 0) {
+ this.prefix = arg;
+ }
+ else {
+ this.columnsToExclude.add(arg);
+ }
+ i++;
+ }
+ prefixPattern = Pattern.compile(prefix);
}
@Override
@@ -79,7 +78,9 @@ public class PluckTuple extends EvalFunc
Schema inputSchema = getInputSchema();
for (int i = 0; i < inputSchema.size(); i++) {
String alias = inputSchema.getField(i).alias;
- if (alias.startsWith(prefix) ||
pattern.matcher(alias).matches()) {
+ String[] splitAlias = alias.split("::");
+ String lastAlias = splitAlias[splitAlias.length-1];
+ if ((alias.startsWith(prefix) ||
prefixPattern.matcher(alias).matches()) && !
this.columnsToExclude.contains(lastAlias)) {
indicesToInclude.add(i);
}
}
@@ -103,12 +104,16 @@ public class PluckTuple extends EvalFunc
List<Integer> indicesToInclude = Lists.newArrayList();
for (int i = 0; i < inputSchema.size(); i++) {
String alias;
+ String[] splitAlias;
+ String lastAlias;
try {
alias = inputSchema.getField(i).alias;
+ splitAlias = alias.split("::");
+ lastAlias = splitAlias[splitAlias.length-1];
} catch (FrontendException e) {
throw new RuntimeException(e); // Should never happen
}
- if (alias.startsWith(prefix) ||
pattern.matcher(alias).matches()) {
+ if ((alias.startsWith(prefix) ||
prefixPattern.matcher(alias).matches()) && !
this.columnsToExclude.contains(lastAlias)) {
indicesToInclude.add(i);
}
}
Modified: pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java?rev=1675948&r1=1675947&r2=1675948&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java Fri Apr 24
21:09:37 2015
@@ -26,12 +26,12 @@ import static org.junit.Assert.assertTru
import java.util.Iterator;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
-import org.apache.pig.test.Util;
import org.junit.Before;
import org.junit.Test;
@@ -40,7 +40,7 @@ public class TestPluckTuple {
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(Util.getLocalTestMode());
+ pigServer = new PigServer(ExecType.LOCAL);
}
@Test
@@ -98,4 +98,53 @@ public class TestPluckTuple {
assertEquals(exp2, it.next());
assertFalse(it.hasNext());
}
+
+ @Test
+ public void testExcludeColumns() throws Exception {
+ String query = "a1 = load 'a1' as (x:int,y:chararray,z:long);" +
+ "a2 = load 'a2' as (x:int,y:chararray,z:long);" +
+ "a3 = foreach a2 generate x, y;" +
+ "b = join a1 by x, a2 by x;" +
+ "define pluck PluckTuple('a[2|3]::.*','z');" +
+ "c = foreach b generate flatten(pluck(*));";
+ pigServer.registerQuery(query);
+ assertTrue(Schema.equals(pigServer.dumpSchema("a3"),
pigServer.dumpSchema("c"), false, true));
+ }
+
+ @Test
+ public void testExcludeColumnsOutput() throws Exception {
+ Data data = resetData(pigServer);
+
+ Tuple exp1 = tuple(1, "hey", 2L);
+ Tuple exp2 = tuple(2, "woah", 3L);
+
+ Tuple exp1_excl = tuple(1, 2L);
+ Tuple exp2_excl = tuple(2, 3L);
+
+ data.set("a",
+ Utils.getSchemaFromString("x:int,y:chararray,z:long"),
+ exp1,
+ exp2,
+ tuple(3, "c", 4L)
+ );
+ data.set("b",
+ Utils.getSchemaFromString("x:int,y:chararray,z:long"),
+ tuple(1, "sasf", 5L),
+ tuple(2, "woah", 6L),
+ tuple(4, "c", 7L)
+ );
+
+ String query = "a = load 'a' using mock.Storage();" +
+ "b = load 'b' using mock.Storage();" +
+ "c = join a by x, b by x;" +
+ "define pluck PluckTuple('a::','y');" +
+ "d = foreach c generate flatten(pluck(*));";
+ pigServer.registerQuery(query);
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ assertTrue(it.hasNext());
+ assertEquals(exp1_excl, it.next());
+ assertTrue(it.hasNext());
+ assertEquals(exp2_excl, it.next());
+ assertFalse(it.hasNext());
+ }
}
\ No newline at end of file