Author: daijy
Date: Fri Apr 24 21:09:37 2015
New Revision: 1675948

URL: http://svn.apache.org/r1675948
Log:
PIG-4511: Add columns to prune from PluckTuple

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/PluckTuple.java
    pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1675948&r1=1675947&r2=1675948&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 24 21:09:37 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4511: Add columns to prune from PluckTuple (jbabcock via daijy)
+
 PIG-4434: Improve auto-parallelism for tez (daijy)
 
 PIG-4495: Better multi-query planning in case of multiple edges (rohini)

Modified: pig/trunk/src/org/apache/pig/builtin/PluckTuple.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PluckTuple.java?rev=1675948&r1=1675947&r2=1675948&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PluckTuple.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PluckTuple.java Fri Apr 24 21:09:37 
2015
@@ -34,10 +34,10 @@ import com.google.common.collect.Lists;
 /**
  * This is a UDF which allows the user to specify a string prefix, and then
  * filter for the columns in a relation that begin with that prefix.
+ * 
+ * Additional arguments to this udf are columns to exclude from the relation 
matching this prefix (assuming this column is the end of the alias: e.g., if 
choose to exclude column y then exclude a::b::y using PluckTuple('a::','y'))
  *
  * Example:
- *
- * 1) Prefix
  * a = load 'a' as (x, y);
  * b = load 'b' as (x, y);
  * c = join a by x, b by x;
@@ -47,29 +47,28 @@ import com.google.common.collect.Lists;
  * c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray}
  * describe d;
  * d: {plucked::a::x: bytearray,plucked::a::y: bytearray}
- *
- * 2) Regex
- * a = load 'a' as (x, y);
- * b = load 'b' as (x, y);
- * c = join a by x, b by x;
- * DEFINE pluck PluckTuple('.*::y');
- * d = foreach c generate FLATTEN(pluck(*));
- * describe c;
- * c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray}
- * describe d;
- * d: {plucked::a::y: bytearray,plucked::a::y: bytearray}
  */
 public class PluckTuple extends EvalFunc<Tuple> {
     private static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-    private static Pattern pattern;
+    private static Pattern prefixPattern;
 
     private boolean isInitialized = false;
     private int[] indicesToInclude;
     private String prefix;
+    private List<String> columnsToExclude = Lists.newArrayList();
 
-    public PluckTuple(String prefix) {
-        this.prefix = prefix;
-        pattern = Pattern.compile(prefix);
+    public PluckTuple(String...args) {
+        int i = 0;
+        for (String arg : args) {
+            if (i == 0) {
+                this.prefix = arg;
+            }
+            else {
+                this.columnsToExclude.add(arg);
+            }
+            i++;
+        }
+        prefixPattern = Pattern.compile(prefix);
     }
 
     @Override
@@ -79,7 +78,9 @@ public class PluckTuple extends EvalFunc
             Schema inputSchema = getInputSchema();
             for (int i = 0; i < inputSchema.size(); i++) {
                 String alias = inputSchema.getField(i).alias;
-                if (alias.startsWith(prefix) || 
pattern.matcher(alias).matches()) {
+                String[] splitAlias = alias.split("::");
+                String lastAlias = splitAlias[splitAlias.length-1];
+                if ((alias.startsWith(prefix) || 
prefixPattern.matcher(alias).matches()) && ! 
this.columnsToExclude.contains(lastAlias)) {
                     indicesToInclude.add(i);
                 }
             }
@@ -103,12 +104,16 @@ public class PluckTuple extends EvalFunc
             List<Integer> indicesToInclude = Lists.newArrayList();
             for (int i = 0; i < inputSchema.size(); i++) {
                 String alias;
+                String[] splitAlias;
+                String lastAlias;
                 try {
                     alias = inputSchema.getField(i).alias;
+                    splitAlias = alias.split("::");
+                    lastAlias = splitAlias[splitAlias.length-1];
                 } catch (FrontendException e) {
                     throw new RuntimeException(e); // Should never happen
                 }
-                if (alias.startsWith(prefix) || 
pattern.matcher(alias).matches()) {
+                if ((alias.startsWith(prefix) || 
prefixPattern.matcher(alias).matches()) && ! 
this.columnsToExclude.contains(lastAlias)) {
                     indicesToInclude.add(i);
                 }
             }

Modified: pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java?rev=1675948&r1=1675947&r2=1675948&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestPluckTuple.java Fri Apr 24 
21:09:37 2015
@@ -26,12 +26,12 @@ import static org.junit.Assert.assertTru
 
 import java.util.Iterator;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.Utils;
-import org.apache.pig.test.Util;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,7 +40,7 @@ public class TestPluckTuple {
 
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(Util.getLocalTestMode());
+        pigServer = new PigServer(ExecType.LOCAL);
     }
 
     @Test
@@ -98,4 +98,53 @@ public class TestPluckTuple {
         assertEquals(exp2, it.next());
         assertFalse(it.hasNext());
     }
+
+    @Test
+    public void testExcludeColumns() throws Exception {
+        String query = "a1 = load 'a1' as (x:int,y:chararray,z:long);" +
+                "a2 = load 'a2' as (x:int,y:chararray,z:long);" +
+                "a3 = foreach a2 generate x, y;" +
+                "b = join a1 by x, a2 by x;" +
+                "define pluck PluckTuple('a[2|3]::.*','z');" +
+                "c = foreach b generate flatten(pluck(*));";
+        pigServer.registerQuery(query);
+        assertTrue(Schema.equals(pigServer.dumpSchema("a3"), 
pigServer.dumpSchema("c"), false, true));
+    }
+
+    @Test
+    public void testExcludeColumnsOutput() throws Exception {
+        Data data = resetData(pigServer);
+
+        Tuple exp1 = tuple(1, "hey", 2L);
+        Tuple exp2 = tuple(2, "woah", 3L);
+
+        Tuple exp1_excl = tuple(1, 2L);
+        Tuple exp2_excl = tuple(2, 3L);
+
+        data.set("a",
+            Utils.getSchemaFromString("x:int,y:chararray,z:long"),
+            exp1,
+            exp2,
+            tuple(3, "c", 4L)
+            );
+        data.set("b",
+            Utils.getSchemaFromString("x:int,y:chararray,z:long"),
+            tuple(1, "sasf", 5L),
+            tuple(2, "woah", 6L),
+            tuple(4, "c", 7L)
+            );
+
+        String query = "a = load 'a' using mock.Storage();" +
+            "b = load 'b' using mock.Storage();" +
+            "c = join a by x, b by x;" +
+            "define pluck PluckTuple('a::','y');" +
+            "d = foreach c generate flatten(pluck(*));";
+        pigServer.registerQuery(query);
+        Iterator<Tuple> it = pigServer.openIterator("d");
+        assertTrue(it.hasNext());
+        assertEquals(exp1_excl, it.next());
+        assertTrue(it.hasNext());
+        assertEquals(exp2_excl, it.next());
+        assertFalse(it.hasNext());
+    }
 }
\ No newline at end of file


Reply via email to