Author: knoguchi
Date: Tue May 14 20:53:05 2024
New Revision: 1917729

URL: http://svn.apache.org/viewvc?rev=1917729&view=rev
Log:
PIG-5453: FLATTEN shifting fields incorrectly (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/trunk/test/org/apache/pig/test/TestFlatten.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1917729&r1=1917728&r2=1917729&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue May 14 20:53:05 2024
@@ -42,6 +42,8 @@ PIG-5450: Pig-on-Spark3 E2E ORC test fai
 
 PIG-5452: Null handling of FLATTEN with user defined schema (as clause) 
(knoguchi)
 
+PIG-5453: FLATTEN shifting fields incorrectly (knoguchi)
+
 Release 0.18.0 - Unreleased
  
 INCOMPATIBLE CHANGES

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1917729&r1=1917728&r2=1917729&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Tue May 14 20:53:05 2024
@@ -549,7 +549,8 @@ public class POForEach extends PhysicalO
                 }
             } else if(in instanceof Tuple) {
                 Tuple t = (Tuple)in;
-                int size = t.size();
+                int size = flattenNumFields == null || flattenNumFields[i] == 
0 ?
+                           t.size() : Math.min(flattenNumFields[i], t.size());
                 for(int j = 0; j < size; ++j) {
                     if (knownSize) {
                         out.set(idx++, t.get(j));
@@ -557,6 +558,14 @@ public class POForEach extends PhysicalO
                         out.append(t.get(j));
                     }
                 }
+                // if size >= flattenNumFields[i], then fill the rest with null
+                for(int j = size; j < flattenNumFields[i]; ++j) {
+                    if (knownSize) {
+                        out.set(idx++, null);
+                    } else {
+                        out.append(null);
+                    }
+                }
             } else if (in instanceof Map.Entry) {
                 Map.Entry entry = (Map.Entry)in;
                 if (knownSize) {
@@ -726,7 +735,12 @@ public class POForEach extends PhysicalO
         clone.addOriginalLocation(alias, getOriginalLocations());
         clone.endOfAllInputProcessing = endOfAllInputProcessing;
         clone.mapSideOnly = mapSideOnly;
-        clone.flattenNumFields = flattenNumFields;
+        if( flattenNumFields != null )  {
+            clone.flattenNumFields = new int[flattenNumFields.length];
+            for(int i=0; i < flattenNumFields.length; i++) {
+                clone.flattenNumFields[i] = flattenNumFields[i];
+            }
+        }
         return clone;
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestFlatten.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFlatten.java?rev=1917729&r1=1917728&r2=1917729&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFlatten.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFlatten.java Tue May 14 20:53:05 2024
@@ -386,4 +386,43 @@ public class TestFlatten {
 
         Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
     }
+
+    @Test
+    public void testFlattenWithInconsistentNumberOfFields() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+            Storage.tuple(
+                1,
+                Storage.tuple("a","b","c")
+            ),
+            Storage.tuple(
+                2,
+                Storage.tuple("a","b","c")
+            ),
+            Storage.tuple(
+                3,
+                Storage.tuple("a","b")
+            ),
+            Storage.tuple(
+                4,
+                Storage.tuple("a","b","c","d")
+            ),
+        );
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as (a0:int, 
a1:tuple());");
+        pig.registerQuery("B = foreach A GENERATE a0, " + 
+                              "FLATTEN(a1) as b1:chararray, b2:chararray, 
b3:chararray, a0 as a4");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+        List<ExecJob> execJobs = pig.executeBatch();
+        for( ExecJob execJob : execJobs ) {
+          assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+        }
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "(1, 'a', 'b', 'c', 1)", "(2, 'a', 'b', 'c', 2)", 
+                "(3, 'a', 'b', null, 3)", "(4, 'a', 'b', 'c', 4)" });
+
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
 }


Reply via email to