Author: rohini Date: Thu Aug 11 05:32:00 2022 New Revision: 1903330 URL: http://svn.apache.org/viewvc?rev=1903330&view=rev Log: PIG-5377: Move supportsParallelWriteToStoreLocation from StoreFunc to StoreFuncInterfce (kpriceyahoo via rohini)
Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld Modified: pig/trunk/CHANGES.txt pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java pig/trunk/src/org/apache/pig/StoreFunc.java pig/trunk/src/org/apache/pig/StoreFuncInterface.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java pig/trunk/src/org/apache/pig/builtin/AvroStorage.java pig/trunk/src/org/apache/pig/builtin/JsonStorage.java pig/trunk/src/org/apache/pig/builtin/OrcStorage.java pig/trunk/src/org/apache/pig/builtin/PigStorage.java pig/trunk/src/org/apache/pig/builtin/mock/Storage.java pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Thu Aug 11 05:32:00 2022 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-5377: Move supportsParallelWriteToStoreLocation from StoreFunc to StoreFuncInterfce (kpriceyahoo via rohini) + PIG-5398: SparkLauncher does not read SPARK_CONF_DIR/spark-defaults.conf (knoguchi) PIG-5397: Update spark2.version to 2.4.8 (knoguchi) Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Thu Aug 11 05:32:00 2022 @@ -822,4 +822,9 @@ public class AvroStorage extends FileInp classList.add(JSONParser.class); return FuncUtils.getShipFiles(classList); } + + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return true; + } } Modified: pig/trunk/src/org/apache/pig/StoreFunc.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/StoreFunc.java (original) +++ pig/trunk/src/org/apache/pig/StoreFunc.java Thu Aug 11 05:32:00 2022 @@ -197,47 +197,6 @@ public abstract class StoreFunc implemen } } - // TODO When dropping support for JDK 7 move this as a default method to StoreFuncInterface - /** - * DAG execution engines like Tez support optimizing union by writing to - * output location in parallel from tasks of different vertices. Commit is - * called once all the vertices in the union are complete. This eliminates - * need to have a separate phase to read data output from previous phases, - * union them and write out again. - * - * Enabling the union optimization requires the OutputFormat to - * - * 1) Support creation of different part file names for tasks of different - * vertices. Conflicting filenames can create data corruption and loss. - * For eg: If task 0 of vertex1 and vertex2 both create filename as - * part-r-00000, then one of the files will be overwritten when promoting - * from temporary to final location leading to data loss. - * FileOutputFormat has mapreduce.output.basename config which enables - * naming files differently in different vertices. Classes extending - * FileOutputFormat and those prefixing file names with mapreduce.output.basename - * value will not encounter conflict. Cases like HBaseStorage which write to key - * value store and do not produce files also should not face any conflict. - * - * 2) Support calling of commit once at the end takes care of promoting - * temporary files of the different vertices into the final location. - * For eg: FileOutputFormat commit algorithm handles promoting of files produced - * by tasks of different vertices into final output location without issues - * if there is no file name conflict. In cases like HBaseStorage, the - * TableOutputCommitter does nothing on commit. - * - * If custom OutputFormat used by the StoreFunc does not support the above - * two criteria, then false should be returned. Union optimization will be - * disabled for the StoreFunc. - * - * Default implementation returns null and in that case planner falls back - * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and - * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS} - * settings to determine if the StoreFunc supports it. - */ - public Boolean supportsParallelWriteToStoreLocation() { - return null; - } - /** * Issue a warning. Warning messages are aggregated and reported to * the user. Modified: pig/trunk/src/org/apache/pig/StoreFuncInterface.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/StoreFuncInterface.java (original) +++ pig/trunk/src/org/apache/pig/StoreFuncInterface.java Thu Aug 11 05:32:00 2022 @@ -166,4 +166,44 @@ public interface StoreFuncInterface { */ default void addCredentials(Credentials credentials, Configuration conf) { } + + /** + * DAG execution engines like Tez support optimizing union by writing to + * output location in parallel from tasks of different vertices. Commit is + * called once all the vertices in the union are complete. This eliminates + * need to have a separate phase to read data output from previous phases, + * union them and write out again. + * + * Enabling the union optimization requires the OutputFormat to + * + * 1) Support creation of different part file names for tasks of different + * vertices. Conflicting filenames can create data corruption and loss. + * For eg: If task 0 of vertex1 and vertex2 both create filename as + * part-r-00000, then one of the files will be overwritten when promoting + * from temporary to final location leading to data loss. + * FileOutputFormat has mapreduce.output.basename config which enables + * naming files differently in different vertices. Classes extending + * FileOutputFormat and those prefixing file names with mapreduce.output.basename + * value will not encounter conflict. Cases like HBaseStorage which write to key + * value store and do not produce files also should not face any conflict. + * + * 2) Support calling of commit once at the end takes care of promoting + * temporary files of the different vertices into the final location. + * For eg: FileOutputFormat commit algorithm handles promoting of files produced + * by tasks of different vertices into final output location without issues + * if there is no file name conflict. In cases like HBaseStorage, the + * TableOutputCommitter does nothing on commit. + * + * If custom OutputFormat used by the StoreFunc does not support the above + * two criteria, then false should be returned. Union optimization will be + * disabled for the StoreFunc. + * + * Default implementation returns null and in that case planner falls back + * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and + * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS} + * settings to determine if the StoreFunc supports it. + */ + default Boolean supportsParallelWriteToStoreLocation() { + return null; + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Thu Aug 11 05:32:00 2022 @@ -83,21 +83,9 @@ public class UnionOptimizer extends TezO private static final Log LOG = LogFactory.getLog(UnionOptimizer.class); private TezOperPlan tezPlan; - private static Set<String> builtinSupportedStoreFuncs = new HashSet<String>(); private List<String> supportedStoreFuncs; private List<String> unsupportedStoreFuncs; - static { - builtinSupportedStoreFuncs.add(PigStorage.class.getName()); - builtinSupportedStoreFuncs.add(JsonStorage.class.getName()); - builtinSupportedStoreFuncs.add(OrcStorage.class.getName()); - builtinSupportedStoreFuncs.add(HBaseStorage.class.getName()); - builtinSupportedStoreFuncs.add(AvroStorage.class.getName()); - builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.AvroStorage"); - builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.CSVExcelStorage"); - builtinSupportedStoreFuncs.add(Storage.class.getName()); - } - public UnionOptimizer(TezOperPlan plan, List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs) { super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan)); tezPlan = plan; @@ -129,42 +117,51 @@ public class UnionOptimizer extends TezO throws VisitorException { List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class); + // If any store function does not support parallel writes, then we cannot use this optimization for (POStoreTez store : stores) { String name = store.getStoreFunc().getClass().getName(); - if (store.getStoreFunc() instanceof StoreFunc) { - StoreFunc func = (StoreFunc) store.getStoreFunc(); - if (func.supportsParallelWriteToStoreLocation() != null) { - if (func.supportsParallelWriteToStoreLocation()) { - continue; - } else { - LOG.warn(name + " does not support union optimization." - + " Disabling it. There will be some performance degradation."); - return false; - } - } + Boolean supportsParallelWriteToStoreLocation = store.getStoreFunc().supportsParallelWriteToStoreLocation(); + + // We process exclusions first, then inclusions. This way, a user can explicitly disable parallel stores + // for a UDF that claims to support it, but cannot enable parallel stores for a UDF that claims not to. + // + // Logical flow: + // 1) If the store function is explicitly listed as unsupported, then return false + // 2) If the store function specifies itself as unsupported, then return false + // 3) If the store function specifies itself as supported, then continue (true case) + // 4) If the store function is explicitly listed as support, then continue (true case) + // 5) Otherwise, return false + + if (unsupportedStoreFuncs != null && unsupportedStoreFuncs.contains(name)) { + LOG.warn(name + " does not support union optimization." + + " Disabling it. There will be some performance degradation."); + return false; } - // If StoreFunc does not explicitly state support, then check supported and - // unsupported config settings. - if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) { - if (unsupportedStoreFuncs != null - && unsupportedStoreFuncs.contains(name)) { + + if (supportsParallelWriteToStoreLocation != null) { + if (supportsParallelWriteToStoreLocation) { + continue; + } else { + LOG.warn(name + " does not support union optimization." + + " Disabling it. There will be some performance degradation."); return false; } - if (supportedStoreFuncs != null - && !supportedStoreFuncs.contains(name)) { - if (!builtinSupportedStoreFuncs.contains(name)) { - LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS - + " does not contain " + name - + " and so disabling union optimization. There will be some performance degradation. " - + "If your storefunc does not hardcode part file names and can work with multiple vertices writing to the output location," - + " run pig with -D" - + PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS - + "=<Comma separated list of fully qualified StoreFunc class names> to enable the optimization. Refer PIG-4691"); - return false; - } - } } + + if (supportedStoreFuncs != null && supportedStoreFuncs.contains(name)) { + continue; + } + + LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS + + " does not contain " + name + + " and so disabling union optimization. There will be some performance degradation. " + + "If your storefunc does not hardcode part file names and can work with multiple vertices writing to the output location," + + " run pig with -D" + + PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS + + "=<Comma separated list of fully qualified StoreFunc class names> to enable the optimization. Refer PIG-4691"); + return false; } + return true; } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Thu Aug 11 05:32:00 2022 @@ -1337,4 +1337,9 @@ public class HBaseStorage extends LoadFu } return incremented; } + + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return true; + } } Modified: pig/trunk/src/org/apache/pig/builtin/AvroStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AvroStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/AvroStorage.java (original) +++ pig/trunk/src/org/apache/pig/builtin/AvroStorage.java Thu Aug 11 05:32:00 2022 @@ -717,4 +717,9 @@ public class AvroStorage extends LoadFun Class[] classList = new Class[] {Schema.class, AvroInputFormat.class}; return FuncUtils.getShipFiles(classList); } + + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return true; + } } Modified: pig/trunk/src/org/apache/pig/builtin/JsonStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/JsonStorage.java (original) +++ pig/trunk/src/org/apache/pig/builtin/JsonStorage.java Thu Aug 11 05:32:00 2022 @@ -319,4 +319,9 @@ public class JsonStorage extends StoreFu public List<String> getCacheFiles() { return null; } + + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return true; + } } Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original) +++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Thu Aug 11 05:32:00 2022 @@ -713,4 +713,9 @@ public class OrcStorage extends LoadFunc } return values; } + + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return true; + } } Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original) +++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Aug 11 05:32:00 2022 @@ -619,4 +619,9 @@ LoadPushDown, LoadMetadata, StoreMetadat mLog.warn("Could not delete output " + output); } } + + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return true; + } } Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original) +++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Thu Aug 11 05:32:00 2022 @@ -693,4 +693,8 @@ private MockRecordWriter mockRecordWrite } + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return true; + } } Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld (original) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld Thu Aug 11 05:32:00 2022 @@ -40,6 +40,6 @@ POValueOutputTez - scope-23 -> [scope-2 |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8 Tez vertex scope-20 # Plan on vertex -c: Store(file:///tmp/pigoutput:org.apache.pig.test.TestMultiQueryBasic$DummyStoreWithOutputFormat) - scope-17 +c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFunc) - scope-17 | |---POShuffledValueInputTez - scope-21 <- [scope-18, scope-19] Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld (original) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld Thu Aug 11 05:32:00 2022 @@ -40,6 +40,6 @@ POValueOutputTez - scope-23 -> [scope-2 |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8 Tez vertex scope-20 # Plan on vertex -c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFunc) - scope-17 +c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteDisabled) - scope-17 | |---POShuffledValueInputTez - scope-21 <- [scope-18, scope-19] Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld?rev=1903330&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld (added) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld Thu Aug 11 05:32:00 2022 @@ -0,0 +1,42 @@ +#-------------------------------------------------- +# There are 1 DAGs in the session +#-------------------------------------------------- +#-------------------------------------------------- +# TEZ DAG plan: pig-0_scope-0 +#-------------------------------------------------- +Tez vertex scope-18 -> Tez vertex group scope-24, +Tez vertex scope-19 -> Tez vertex group scope-24, +Tez vertex group scope-24 + +Tez vertex scope-18 +# Plan on vertex +c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteEnabled) - scope-25 -> scope-17 +| +|---a: New For Each(false,false)[bag] - scope-7 + | | + | Cast[int] - scope-2 + | | + | |---Project[bytearray][0] - scope-1 + | | + | Cast[chararray] - scope-5 + | | + | |---Project[bytearray][1] - scope-4 + | + |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0 +Tez vertex scope-19 +# Plan on vertex +c: Store(file:///tmp/pigoutput:org.apache.pig.tez.TestTezCompiler$TestDummyStoreFuncParallelWriteEnabled) - scope-26 -> scope-17 +| +|---c: New For Each(false,false)[bag] - scope-15 + | | + | Cast[int] - scope-10 + | | + | |---Project[bytearray][1] - scope-9 + | | + | Cast[chararray] - scope-13 + | | + | |---Project[bytearray][0] - scope-12 + | + |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8 +Tez vertex group scope-24 <- [scope-18, scope-19] -> null +# No plan on vertex group Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1903330&r1=1903329&r2=1903330&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original) +++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Thu Aug 11 05:32:00 2022 @@ -47,7 +47,6 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.plan.NodeIdGenerator; -import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat; import org.apache.pig.test.Util; import org.apache.pig.test.TestMapSideCogroup.DummyCollectableLoader; import org.apache.pig.test.TestMapSideCogroup.DummyIndexableLoader; @@ -942,10 +941,11 @@ public class TestTezCompiler { "store c into 'file:///tmp/pigoutput';"; // Union optimization should be turned off if PARALLEL clause is specified run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); + } @Test - public void testUnionUnSupportedStore() throws Exception { + public void testUnionIncludeExcludeStoreFunc() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + @@ -966,7 +966,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + - "store c into 'file:///tmp/pigoutput' using " + DummyStoreWithOutputFormat.class.getName() + "();"; + "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();"; // Plan should not have union optimization applied as only ORC is supported run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld"); @@ -976,11 +976,24 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + - "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();"; + "store c into 'file:///tmp/pigoutput' using " + + TestDummyStoreFuncParallelWriteDisabled.class.getName() + "();"; // Plan should not have union optimization applied as supportsParallelWriteToStoreLocation returns false run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld"); resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null); + query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + + "c = union onschema a, b;" + + "store c into 'file:///tmp/pigoutput' using " + + TestDummyStoreFuncParallelWriteEnabled.class.getName() + "();"; + + // Plan should have union optimization applied as supportsParallelWriteToStoreLocation returns true + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore3.gld"); + + resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName()); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null); query = @@ -1461,7 +1474,6 @@ public class TestTezCompiler { } public static class TestDummyStoreFunc extends StoreFunc { - @Override public OutputFormat getOutputFormat() throws IOException { return null; @@ -1479,12 +1491,22 @@ public class TestTezCompiler { @Override public void putNext(Tuple t) throws IOException { } + } + public static class TestDummyStoreFuncParallelWriteEnabled extends TestDummyStoreFunc { @Override public Boolean supportsParallelWriteToStoreLocation() { - return false; + return true; } + } + + public static class TestDummyStoreFuncParallelWriteDisabled extends TestDummyStoreFunc { + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return false; + } } + }