Author: rohini
Date: Thu Aug 11 05:32:00 2022
New Revision: 1903330

URL: http://svn.apache.org/viewvc?rev=1903330&view=rev
Log:
PIG-5377: Move supportsParallelWriteToStoreLocation from StoreFunc to 
StoreFuncInterfce (kpriceyahoo via rohini)

Added:
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld
Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/trunk/src/org/apache/pig/StoreFunc.java
    pig/trunk/src/org/apache/pig/StoreFuncInterface.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
    pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
    pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
    pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 11 05:32:00 2022
@@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley
  
 IMPROVEMENTS
 
+PIG-5377: Move supportsParallelWriteToStoreLocation from StoreFunc to 
StoreFuncInterfce (kpriceyahoo via rohini)
+
 PIG-5398: SparkLauncher does not read SPARK_CONF_DIR/spark-defaults.conf 
(knoguchi)
 
 PIG-5397: Update spark2.version to 2.4.8 (knoguchi)

Modified: 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
 (original)
+++ 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
 Thu Aug 11 05:32:00 2022
@@ -822,4 +822,9 @@ public class AvroStorage extends FileInp
         classList.add(JSONParser.class);
         return FuncUtils.getShipFiles(classList);
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/StoreFunc.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFunc.java Thu Aug 11 05:32:00 2022
@@ -197,47 +197,6 @@ public abstract class StoreFunc implemen
         }
     }
 
-    // TODO When dropping support for JDK 7 move this as a default method to 
StoreFuncInterface
-    /**
-     * DAG execution engines like Tez support optimizing union by writing to
-     * output location in parallel from tasks of different vertices. Commit is
-     * called once all the vertices in the union are complete. This eliminates
-     * need to have a separate phase to read data output from previous phases,
-     * union them and write out again.
-     *
-     * Enabling the union optimization requires the OutputFormat to
-     *
-     *      1) Support creation of different part file names for tasks of 
different
-     * vertices. Conflicting filenames can create data corruption and loss.
-     * For eg: If task 0 of vertex1 and vertex2 both create filename as
-     * part-r-00000, then one of the files will be overwritten when promoting
-     * from temporary to final location leading to data loss.
-     *      FileOutputFormat has mapreduce.output.basename config which enables
-     *  naming files differently in different vertices. Classes extending
-     *  FileOutputFormat and those prefixing file names with 
mapreduce.output.basename
-     *  value will not encounter conflict. Cases like HBaseStorage which write 
to key
-     *  value store and do not produce files also should not face any conflict.
-     *
-     *      2) Support calling of commit once at the end takes care of 
promoting
-     * temporary files of the different vertices into the final location.
-     * For eg: FileOutputFormat commit algorithm handles promoting of files 
produced
-     * by tasks of different vertices into final output location without issues
-     * if there is no file name conflict. In cases like HBaseStorage, the
-     * TableOutputCommitter does nothing on commit.
-     *
-     * If custom OutputFormat used by the StoreFunc does not support the above
-     * two criteria, then false should be returned. Union optimization will be
-     * disabled for the StoreFunc.
-     *
-     * Default implementation returns null and in that case planner falls back
-     * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and
-     * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS}
-     * settings to determine if the StoreFunc supports it.
-     */
-    public Boolean supportsParallelWriteToStoreLocation() {
-        return null;
-    }
-
     /**
      * Issue a warning.  Warning messages are aggregated and reported to
      * the user.

Modified: pig/trunk/src/org/apache/pig/StoreFuncInterface.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFuncInterface.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFuncInterface.java Thu Aug 11 05:32:00 
2022
@@ -166,4 +166,44 @@ public interface StoreFuncInterface {
      */
     default void addCredentials(Credentials credentials, Configuration conf) {
     }
+
+    /**
+     * DAG execution engines like Tez support optimizing union by writing to
+     * output location in parallel from tasks of different vertices. Commit is
+     * called once all the vertices in the union are complete. This eliminates
+     * need to have a separate phase to read data output from previous phases,
+     * union them and write out again.
+     *
+     * Enabling the union optimization requires the OutputFormat to
+     *
+     *      1) Support creation of different part file names for tasks of 
different
+     * vertices. Conflicting filenames can create data corruption and loss.
+     * For eg: If task 0 of vertex1 and vertex2 both create filename as
+     * part-r-00000, then one of the files will be overwritten when promoting
+     * from temporary to final location leading to data loss.
+     *      FileOutputFormat has mapreduce.output.basename config which enables
+     *  naming files differently in different vertices. Classes extending
+     *  FileOutputFormat and those prefixing file names with 
mapreduce.output.basename
+     *  value will not encounter conflict. Cases like HBaseStorage which write 
to key
+     *  value store and do not produce files also should not face any conflict.
+     *
+     *      2) Support calling of commit once at the end takes care of 
promoting
+     * temporary files of the different vertices into the final location.
+     * For eg: FileOutputFormat commit algorithm handles promoting of files 
produced
+     * by tasks of different vertices into final output location without issues
+     * if there is no file name conflict. In cases like HBaseStorage, the
+     * TableOutputCommitter does nothing on commit.
+     *
+     * If custom OutputFormat used by the StoreFunc does not support the above
+     * two criteria, then false should be returned. Union optimization will be
+     * disabled for the StoreFunc.
+     *
+     * Default implementation returns null and in that case planner falls back
+     * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and
+     * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS}
+     * settings to determine if the StoreFunc supports it.
+     */
+    default Boolean supportsParallelWriteToStoreLocation() {
+        return null;
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Thu Aug 11 05:32:00 2022
@@ -83,21 +83,9 @@ public class UnionOptimizer extends TezO
 
     private static final Log LOG = LogFactory.getLog(UnionOptimizer.class);
     private TezOperPlan tezPlan;
-    private static Set<String> builtinSupportedStoreFuncs = new 
HashSet<String>();
     private List<String> supportedStoreFuncs;
     private List<String> unsupportedStoreFuncs;
 
-    static {
-        builtinSupportedStoreFuncs.add(PigStorage.class.getName());
-        builtinSupportedStoreFuncs.add(JsonStorage.class.getName());
-        builtinSupportedStoreFuncs.add(OrcStorage.class.getName());
-        builtinSupportedStoreFuncs.add(HBaseStorage.class.getName());
-        builtinSupportedStoreFuncs.add(AvroStorage.class.getName());
-        
builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.AvroStorage");
-        
builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.CSVExcelStorage");
-        builtinSupportedStoreFuncs.add(Storage.class.getName());
-    }
-
     public UnionOptimizer(TezOperPlan plan, List<String> supportedStoreFuncs, 
List<String> unsupportedStoreFuncs) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, 
TezOperPlan>(plan));
         tezPlan = plan;
@@ -129,42 +117,51 @@ public class UnionOptimizer extends TezO
             throws VisitorException {
         List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, 
POStoreTez.class);
 
+        // If any store function does not support parallel writes, then we 
cannot use this optimization
         for (POStoreTez store : stores) {
             String name = store.getStoreFunc().getClass().getName();
-            if (store.getStoreFunc() instanceof StoreFunc) {
-                StoreFunc func = (StoreFunc) store.getStoreFunc();
-                if (func.supportsParallelWriteToStoreLocation() != null) {
-                    if (func.supportsParallelWriteToStoreLocation()) {
-                        continue;
-                    } else {
-                        LOG.warn(name + " does not support union optimization."
-                                + " Disabling it. There will be some 
performance degradation.");
-                        return false;
-                    }
-                }
+            Boolean supportsParallelWriteToStoreLocation = 
store.getStoreFunc().supportsParallelWriteToStoreLocation();
+
+            // We process exclusions first, then inclusions. This way, a user 
can explicitly disable parallel stores
+            // for a UDF that claims to support it, but cannot enable parallel 
stores for a UDF that claims not to.
+            //
+            // Logical flow:
+            // 1) If the store function is explicitly listed as unsupported, 
then return false
+            // 2) If the store function specifies itself as unsupported, then 
return false
+            // 3) If the store function specifies itself as supported, then 
continue (true case)
+            // 4) If the store function is explicitly listed as support, then 
continue (true case)
+            // 5) Otherwise, return false
+
+            if (unsupportedStoreFuncs != null && 
unsupportedStoreFuncs.contains(name)) {
+                LOG.warn(name + " does not support union optimization."
+                         + " Disabling it. There will be some performance 
degradation.");
+                return false;
             }
-            // If StoreFunc does not explicitly state support, then check 
supported and
-            // unsupported config settings.
-            if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
-                if (unsupportedStoreFuncs != null
-                        && unsupportedStoreFuncs.contains(name)) {
+
+            if (supportsParallelWriteToStoreLocation != null) {
+                if (supportsParallelWriteToStoreLocation) {
+                    continue;
+                } else {
+                    LOG.warn(name + " does not support union optimization."
+                             + " Disabling it. There will be some performance 
degradation.");
                     return false;
                 }
-                if (supportedStoreFuncs != null
-                        && !supportedStoreFuncs.contains(name)) {
-                    if (!builtinSupportedStoreFuncs.contains(name)) {
-                        
LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
-                                + " does not contain " + name
-                                + " and so disabling union optimization. There 
will be some performance degradation. "
-                                + "If your storefunc does not hardcode part 
file names and can work with multiple vertices writing to the output location,"
-                                + " run pig with -D"
-                                + 
PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
-                                + "=<Comma separated list of fully qualified 
StoreFunc class names> to enable the optimization. Refer PIG-4691");
-                        return false;
-                    }
-                }
             }
+
+            if (supportedStoreFuncs != null && 
supportedStoreFuncs.contains(name)) {
+                continue;
+            }
+
+            LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+                     + " does not contain " + name
+                     + " and so disabling union optimization. There will be 
some performance degradation. "
+                     + "If your storefunc does not hardcode part file names 
and can work with multiple vertices writing to the output location,"
+                     + " run pig with -D"
+                     + PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+                     + "=<Comma separated list of fully qualified StoreFunc 
class names> to enable the optimization. Refer PIG-4691");
+            return false;
         }
+
         return true;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Thu Aug 
11 05:32:00 2022
@@ -1337,4 +1337,9 @@ public class HBaseStorage extends LoadFu
         }
         return incremented;
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AvroStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AvroStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/AvroStorage.java Thu Aug 11 05:32:00 
2022
@@ -717,4 +717,9 @@ public class AvroStorage extends LoadFun
       Class[] classList = new Class[] {Schema.class, AvroInputFormat.class};
       return FuncUtils.getShipFiles(classList);
   }
+
+  @Override
+  public Boolean supportsParallelWriteToStoreLocation() {
+    return true;
+  }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonStorage.java Thu Aug 11 05:32:00 
2022
@@ -319,4 +319,9 @@ public class JsonStorage extends StoreFu
     public List<String> getCacheFiles() {
         return null;
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Thu Aug 11 05:32:00 
2022
@@ -713,4 +713,9 @@ public class OrcStorage extends LoadFunc
         }
         return values;
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Aug 11 05:32:00 
2022
@@ -619,4 +619,9 @@ LoadPushDown, LoadMetadata, StoreMetadat
             mLog.warn("Could not delete output " + output);
         }
     }
+
+    @Override
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return true;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Thu Aug 11 05:32:00 
2022
@@ -693,4 +693,8 @@ private MockRecordWriter mockRecordWrite
 
   }
 
+  @Override
+  public Boolean supportsParallelWriteToStoreLocation() {
+    return true;
+  }
 }

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
 Thu Aug 11 05:32:00 2022
@@ -40,6 +40,6 @@ POValueOutputTez - scope-23   ->       [scope-2
     |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
 Tez vertex scope-20
 # Plan on vertex
-c: 
Store(file:///tmp/pigoutput:org.apache.pig.test.TestMultiQueryBasic$DummyStoreWithOutputFormat)
 - scope-17
+c: 
Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFunc)
 - scope-17
 |
 |---POShuffledValueInputTez - scope-21 <-       [scope-18, scope-19]

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld
 Thu Aug 11 05:32:00 2022
@@ -40,6 +40,6 @@ POValueOutputTez - scope-23   ->       [scope-2
     |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
 Tez vertex scope-20
 # Plan on vertex
-c: 
Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFunc)
 - scope-17
+c: 
Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteDisabled)
 - scope-17
 |
 |---POShuffledValueInputTez - scope-21 <-       [scope-18, scope-19]

Added: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld?rev=1903330&view=auto
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld
 (added)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld
 Thu Aug 11 05:32:00 2022
@@ -0,0 +1,42 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-18    ->      Tez vertex group scope-24,
+Tez vertex scope-19    ->      Tez vertex group scope-24,
+Tez vertex group scope-24
+
+Tez vertex scope-18
+# Plan on vertex
+c: 
Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteEnabled)
 - scope-25   ->       scope-17
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[chararray] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
+# Plan on vertex
+c: 
Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteEnabled)
 - scope-26   ->       scope-17
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][1] - scope-9
+    |   |
+    |   Cast[chararray] - scope-13
+    |   |
+    |   |---Project[bytearray][0] - scope-12
+    |
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex group scope-24      <-       [scope-18, scope-19]   ->       null
+# No plan on vertex group

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1903330&r1=1903329&r2=1903330&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Thu Aug 11 05:32:00 
2022
@@ -47,7 +47,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
 import org.apache.pig.test.Util;
 import org.apache.pig.test.TestMapSideCogroup.DummyCollectableLoader;
 import org.apache.pig.test.TestMapSideCogroup.DummyIndexableLoader;
@@ -942,10 +941,11 @@ public class TestTezCompiler {
                 "store c into 'file:///tmp/pigoutput';";
         // Union optimization should be turned off if PARALLEL clause is 
specified
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+
     }
 
     @Test
-    public void testUnionUnSupportedStore() throws Exception {
+    public void testUnionIncludeExcludeStoreFunc() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
@@ -966,7 +966,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/pigoutput' using " + 
DummyStoreWithOutputFormat.class.getName() + "();";
+                "store c into 'file:///tmp/pigoutput' using " + 
TestDummyStoreFunc.class.getName() + "();";
         // Plan should not have union optimization applied as only ORC is 
supported
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
 
@@ -976,11 +976,24 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/pigoutput' using " + 
TestDummyStoreFunc.class.getName() + "();";
+                "store c into 'file:///tmp/pigoutput' using " +
+                TestDummyStoreFuncParallelWriteDisabled.class.getName() + 
"();";
         // Plan should not have union optimization applied as 
supportsParallelWriteToStoreLocation returns false
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld");
 
         resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, 
null);
+        query =
+            "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+            "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+            "c = union onschema a, b;" +
+            "store c into 'file:///tmp/pigoutput' using " +
+            TestDummyStoreFuncParallelWriteEnabled.class.getName() + "();";
+
+        // Plan should have union optimization applied as 
supportsParallelWriteToStoreLocation returns true
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld");
+
+        resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, 
PigStorage.class.getName());
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, 
null);
         query =
@@ -1461,7 +1474,6 @@ public class TestTezCompiler {
     }
 
     public static class TestDummyStoreFunc extends StoreFunc {
-
         @Override
         public OutputFormat getOutputFormat() throws IOException {
             return null;
@@ -1479,12 +1491,22 @@ public class TestTezCompiler {
         @Override
         public void putNext(Tuple t) throws IOException {
         }
+    }
 
+    public static class TestDummyStoreFuncParallelWriteEnabled extends 
TestDummyStoreFunc {
         @Override
         public Boolean supportsParallelWriteToStoreLocation() {
-            return false;
+            return true;
         }
+    }
 
+
+    public static class TestDummyStoreFuncParallelWriteDisabled extends 
TestDummyStoreFunc {
+        @Override
+        public Boolean supportsParallelWriteToStoreLocation() {
+            return false;
+        }
     }
+
 }
 


Reply via email to