Author: rohini Date: Mon Feb 27 22:12:47 2017 New Revision: 1784664 URL: http://svn.apache.org/viewvc?rev=1784664&view=rev Log: Removing schema alias and :: coming from parent relation (szita via rohini)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml pig/trunk/src/org/apache/pig/PigConfiguration.java pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java pig/trunk/test/org/apache/pig/test/TestSchema.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1784664&r1=1784663&r2=1784664&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Mon Feb 27 22:12:47 2017 @@ -34,6 +34,8 @@ PIG-5067: Revisit union on numeric type  IMPROVEMENTS +PIG-5110: Removing schema alias and :: coming from parent relation (szita via rohini) + PIG-5085: Support FLATTEN of maps (szita via rohini) PIG-5126. Add doc about pig in zeppelin (zjffdu) Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1784664&r1=1784663&r2=1784664&view=diff ============================================================================== --- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original) +++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Mon Feb 27 22:12:47 2017 @@ -5409,7 +5409,9 @@ DUMP X; <section id="disambiguate"> <title>Disambiguate Operator</title> -<p>Use the disambiguate operator ( :: ) to identify field names after JOIN, COGROUP, CROSS, or FLATTEN operators.</p> +<p>After JOIN, COGROUP, CROSS, or FLATTEN operations, the field names have the orginial alias and the disambiguate + operator ( :: ) prepended in the schema. The disambiguate operator is used to identify field names in case there + is a ambiguity.</p> <p>In this example, to disambiguate y, use A::y or B::y. In cases where there is no ambiguity, such as z, the :: is not necessary but is still supported.</p> @@ -5417,8 +5419,14 @@ DUMP X; A = load 'data1' as (x, y); B = load 'data2' as (x, y, z); C = join A by x, B by x; -D = foreach C generate y; -- which y? +D = foreach C generate A::y, z; -- Cannot simply refer to y as it can refer to A::y or B::y </source> +<p> In cases where the schema is stored as part of the StoreFunc like PigStorage, JsonStorage, AvroStorage or OrcStorage, + users generally have to use an extra FOREACH before STORE to rename the field names and remove the disambiguate + operator from the names. To automatically remove the disambiguate operator from the schema for the STORE operation, + the <i>pig.store.schema.disambiguate</i> Pig property can be set to "false". It is the responsibility of the user + to make sure that there is no conflict in the field names when using this setting. +</p> </section> <!-- =================================================================== --> @@ -5444,7 +5452,7 @@ D = foreach C generate y; -- which y? to bags. For example, if we apply the expression GENERATE $0, FLATTEN($1) to the input tuple (a, m[k1#1, k2#2, k3#3]), we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result. </p> - + <p>Also note that the flatten of empty bag will result in that row being discarded; no output is generated. (See also <a href="perf.html#nulls">Drop Nulls Before a Join</a>.) </p> @@ -6537,7 +6545,7 @@ B = FOREACH A GENERATE a, FLATTEN(m); C = FILTER B by m::value == 5; â¦â¦ </source> - + </section> <section id="nestedblock"> Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1784664&r1=1784663&r2=1784664&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/PigConfiguration.java (original) +++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb 27 22:12:47 2017 @@ -501,6 +501,13 @@ public class PigConfiguration { */ public static final String PIG_TEZ_CONFIGURE_AM_MEMORY = "pig.tez.configure.am.memory"; + /** + * If set to false, automatic schema disambiguation gets disabled i.e. group::name will be just name + */ + public static final String PIG_STORE_SCHEMA_DISAMBIGUATE = "pig.store.schema.disambiguate"; + + public static final String PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT = "true"; + // Deprecated settings of Pig 0.13 /** Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=1784664&r1=1784663&r2=1784664&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java Mon Feb 27 22:12:47 2017 @@ -36,6 +36,7 @@ public class LOStore extends LogicalRela private boolean isTmpStore; private SortInfo sortInfo; private final StoreFuncInterface storeFunc; + private boolean disambiguationEnabled = true; public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature) { super("LOStore", plan); @@ -43,6 +44,12 @@ public class LOStore extends LogicalRela this.storeFunc = storeFunc; this.signature = signature; } + + public LOStore(LogicalPlan plan, FileSpec outputFileSpec, StoreFuncInterface storeFunc, String signature, + boolean disambiguationEnabled) { + this(plan, outputFileSpec, storeFunc, signature); + this.disambiguationEnabled = disambiguationEnabled; + } public FileSpec getOutputSpec() { return output; @@ -55,6 +62,17 @@ public class LOStore extends LogicalRela @Override public LogicalSchema getSchema() throws FrontendException { schema = ((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema(); + + if (!disambiguationEnabled && schema != null && schema.getFields() != null) { + //If requested try and remove parent alias substring including colon(s) + for (LogicalSchema.LogicalFieldSchema field : schema.getFields()) { + if (field.alias == null || !field.alias.contains(":")) { + continue; + } + field.alias = field.alias.substring(field.alias.lastIndexOf(":") + 1); + } + } + return schema; } Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1784664&r1=1784663&r2=1784664&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Mon Feb 27 22:12:47 2017 @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import org.apache.pig.FuncSpec; +import org.apache.pig.PigConfiguration; import org.apache.pig.StoreFuncInterface; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; @@ -94,7 +95,10 @@ public class ScalarVisitor extends AllEx StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(interStorageFuncSpec); String sig = LogicalPlanBuilder.newOperatorKey(scope); stoFunc.setStoreFuncUDFContextSignature(sig); - store = new LOStore(lp, fileSpec, stoFunc, sig); + boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties(). + getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT)); + + store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled); store.setTmpStore(true); lp.add( store ); lp.connect( refOp, store ); Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1784664&r1=1784663&r2=1784664&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original) +++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Mon Feb 27 22:12:47 2017 @@ -1003,8 +1003,10 @@ public class LogicalPlanBuilder { fileNameMap.put(fileNameKey, absolutePath); } FileSpec fileSpec = new FileSpec(absolutePath, funcSpec); + boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties(). + getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT)); - LOStore op = new LOStore(plan, fileSpec, stoFunc, signature); + LOStore op = new LOStore(plan, fileSpec, stoFunc, signature, disambiguationEnabled); return buildOp(loc, op, alias, inputAlias, null); } catch(Exception ex) { throw new ParserValidationException(intStream, loc, ex); Modified: pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java?rev=1784664&r1=1784663&r2=1784664&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java (original) +++ pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java Mon Feb 27 22:12:47 2017 @@ -70,8 +70,10 @@ public class QueryParserUtils { fileName = removeQuotes( fileName ); FileSpec fileSpec = new FileSpec( fileName, funcSpec ); String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope); + boolean disambiguationEnabled = Boolean.parseBoolean(pigContext.getProperties(). + getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT)); stoFunc.setStoreFuncUDFContextSignature(sig); - LOStore store = new LOStore(lp, fileSpec, stoFunc, sig); + LOStore store = new LOStore(lp, fileSpec, stoFunc, sig, disambiguationEnabled); store.setAlias(alias); try { Modified: pig/trunk/test/org/apache/pig/test/TestSchema.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=1784664&r1=1784663&r2=1784664&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestSchema.java (original) +++ pig/trunk/test/org/apache/pig/test/TestSchema.java Mon Feb 27 22:12:47 2017 @@ -29,7 +29,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.UUID; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.ResourceSchema; import org.apache.pig.data.DataType; @@ -42,10 +44,28 @@ import org.apache.pig.impl.util.Utils; import org.apache.pig.newplan.logical.relational.LogicalSchema; import org.apache.pig.newplan.logical.relational.LogicalSchema.MergeMode; import org.apache.pig.parser.ParserException; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; public class TestSchema { + private static MiniGenericCluster cluster; + private static PigServer pigServer; + + @BeforeClass + public static void setupTestCluster() throws Exception { + cluster = MiniGenericCluster.buildCluster(); + pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + } + + @AfterClass + public static void tearDownTestCluster() throws Exception { + cluster.shutDown(); + } + @Test public void testSchemaEqual1() { @@ -660,8 +680,6 @@ public class TestSchema { @Test public void testSchemaSerialization() throws IOException { - MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); String inputFileName = "testSchemaSerialization-input.txt"; String[] inputData = new String[] { "foo\t1", "hello\t2" }; Util.createInputFile(cluster, inputFileName, inputData); @@ -673,7 +691,6 @@ public class TestSchema { Tuple t = it.next(); assertEquals("{a: {(f1: chararray,f2: int)}}", t.get(0)); } - cluster.shutDown(); } @Test @@ -938,4 +955,79 @@ public class TestSchema { assertTrue(schemaString.equals(s2)); } } + + @Test + public void testDisabledDisambiguationContainsNoColons() throws IOException { + resetDisambiguationTestPropertyOverride(); + + String inputFileName = "testPrepend-input.txt"; + String[] inputData = new String[]{"apple\t1\tred", "orange\t2\torange", "kiwi\t3\tgreen", "orange\t4\torange"}; + Util.createInputFile(cluster, inputFileName, inputData); + + String script = "A = LOAD '" + inputFileName + "' AS (fruit:chararray, foo:int, color: chararray);" + + "B = LOAD '" + inputFileName + "' AS (id:chararray, bar:int);" + + "C = GROUP A BY (fruit,color);" + + "D = FOREACH C GENERATE FLATTEN(group), AVG(A.foo);" + + "D2 = FOREACH C GENERATE FLATTEN(group), AVG(A.foo) as avgFoo;" + + "E = JOIN B BY id, D BY group::fruit;" + + "F = UNION ONSCHEMA B, D2;" + + "G = CROSS B, D2;"; + + Util.registerMultiLineQuery(pigServer, script); + + //Prepending should happen with default settings + assertEquals("{B::id: chararray,B::bar: int,D::group::fruit: chararray,D::group::color: chararray,double}", pigServer.dumpSchema("E").toString()); + + //Override prepend property setting (check for flatten, join) + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false"); + assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,double}", pigServer.dumpSchema("E").toString()); + assertTrue(pigServer.openIterator("E").hasNext()); + + //Check for union and cross + assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("F").toString()); + assertEquals("{id: chararray,bar: int,fruit: chararray,color: chararray,avgFoo: double}", pigServer.dumpSchema("G").toString()); + + } + + @Test + public void testEnabledDisambiguationPassesForDupeAliases() throws IOException { + resetDisambiguationTestPropertyOverride(); + + checkForDupeAliases(); + + //Should pass with default settings + assertEquals("{A::id: chararray,A::val: int,B::id: chararray,B::val: int}", pigServer.dumpSchema("C").toString()); + assertTrue(pigServer.openIterator("C").hasNext()); + } + + @Test + public void testDisabledDisambiguationFailsForDupeAliases() throws IOException { + resetDisambiguationTestPropertyOverride(); + + try { + checkForDupeAliases(); + //Should fail with prepending disabled + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE, "false"); + pigServer.dumpSchema("C"); + } catch (FrontendException e){ + Assert.assertEquals("Duplicate schema alias: id in \"fake\"",e.getCause().getMessage()); + } + } + + private static void checkForDupeAliases() throws IOException { + String inputFileName = "testPrependFail-input" + UUID.randomUUID().toString() + ".txt"; + String[] inputData = new String[]{"foo\t1", "bar\t2"}; + Util.createInputFile(cluster, inputFileName, inputData); + + String script = "A = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" + + "B = LOAD '" + inputFileName + "' AS (id:chararray, val:int);" + + "C = JOIN A by id, B by id;"; + + Util.registerMultiLineQuery(pigServer, script); + } + + private static void resetDisambiguationTestPropertyOverride() { + //Reset possible overrides + pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE); + } }