Author: rohini
Date: Mon Feb 27 22:12:47 2017
New Revision: 1784664
URL: http://svn.apache.org/viewvc?rev=1784664&view=rev
Log:
Removing schema alias and :: coming from parent relation (szita via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
pig/trunk/test/org/apache/pig/test/TestSchema.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Feb 27 22:12:47 2017
@@ -34,6 +34,8 @@ PIG-5067: Revisit union on numeric type
Â
IMPROVEMENTS
+PIG-5110: Removing schema alias and :: coming from parent relation (szita via
rohini)
+
PIG-5085: Support FLATTEN of maps (szita via rohini)
PIG-5126. Add doc about pig in zeppelin (zjffdu)
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
URL:
http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Mon Feb 27
22:12:47 2017
@@ -5409,7 +5409,9 @@ DUMP X;
<section id="disambiguate">
<title>Disambiguate Operator</title>
-<p>Use the disambiguate operator ( :: ) to identify field names after JOIN,
COGROUP, CROSS, or FLATTEN operators.</p>
+<p>After JOIN, COGROUP, CROSS, or FLATTEN operations, the field names have the
orginial alias and the disambiguate
+ operator ( :: ) prepended in the schema. The disambiguate operator is used
to identify field names in case there
+ is a ambiguity.</p>
<p>In this example, to disambiguate y, use A::y or B::y. In cases where
there is no ambiguity, such as z, the :: is not necessary but is still
supported.</p>
@@ -5417,8 +5419,14 @@ DUMP X;
A = load 'data1' as (x, y);
B = load 'data2' as (x, y, z);
C = join A by x, B by x;
-D = foreach C generate y; -- which y?
+D = foreach C generate A::y, z; -- Cannot simply refer to y as it can refer to
A::y or B::y
</source>
+<p> In cases where the schema is stored as part of the StoreFunc like
PigStorage, JsonStorage, AvroStorage or OrcStorage,
+ users generally have to use an extra FOREACH before STORE to rename the
field names and remove the disambiguate
+ operator from the names. To automatically remove the disambiguate operator
from the schema for the STORE operation,
+ the <i>pig.store.schema.disambiguate</i> Pig property can be set to
"false". It is the responsibility of the user
+ to make sure that there is no conflict in the field names when using this
setting.
+</p>
</section>
<!-- ===================================================================
-->
@@ -5444,7 +5452,7 @@ D = foreach C generate y; -- which y?
to bags. For example, if we apply the expression GENERATE $0,
FLATTEN($1) to the input tuple (a, m[k1#1, k2#2, k3#3]),
we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.
</p>
-
+
<p>Also note that the flatten of empty bag will result in that row being
discarded; no output is generated.
(See also <a href="perf.html#nulls">Drop Nulls Before a Join</a>.) </p>
@@ -6537,7 +6545,7 @@ B = FOREACH A GENERATE a, FLATTEN(m);
C = FILTER B by m::value == 5;
â¦â¦
</source>
-
+
</section>
<section id="nestedblock">
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb 27 22:12:47 2017
@@ -501,6 +501,13 @@ public class PigConfiguration {
*/
public static final String PIG_TEZ_CONFIGURE_AM_MEMORY =
"pig.tez.configure.am.memory";
+ /**
+ * If set to false, automatic schema disambiguation gets disabled i.e.
group::name will be just name
+ */
+ public static final String PIG_STORE_SCHEMA_DISAMBIGUATE =
"pig.store.schema.disambiguate";
+
+ public static final String PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT = "true";
+
// Deprecated settings of Pig 0.13
/**
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
(original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java Mon
Feb 27 22:12:47 2017
@@ -36,6 +36,7 @@ public class LOStore extends LogicalRela
private boolean isTmpStore;
private SortInfo sortInfo;
private final StoreFuncInterface storeFunc;
+ private boolean disambiguationEnabled = true;
public LOStore(LogicalPlan plan, FileSpec outputFileSpec,
StoreFuncInterface storeFunc, String signature) {
super("LOStore", plan);
@@ -43,6 +44,12 @@ public class LOStore extends LogicalRela
this.storeFunc = storeFunc;
this.signature = signature;
}
+
+ public LOStore(LogicalPlan plan, FileSpec outputFileSpec,
StoreFuncInterface storeFunc, String signature,
+ boolean disambiguationEnabled) {
+ this(plan, outputFileSpec, storeFunc, signature);
+ this.disambiguationEnabled = disambiguationEnabled;
+ }
public FileSpec getOutputSpec() {
return output;
@@ -55,6 +62,17 @@ public class LOStore extends LogicalRela
@Override
public LogicalSchema getSchema() throws FrontendException {
schema =
((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema();
+
+ if (!disambiguationEnabled && schema != null && schema.getFields() !=
null) {
+ //If requested try and remove parent alias substring including
colon(s)
+ for (LogicalSchema.LogicalFieldSchema field : schema.getFields()) {
+ if (field.alias == null || !field.alias.contains(":")) {
+ continue;
+ }
+ field.alias =
field.alias.substring(field.alias.lastIndexOf(":") + 1);
+ }
+ }
+
return schema;
}
Modified:
pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
(original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Mon
Feb 27 22:12:47 2017
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -94,7 +95,10 @@ public class ScalarVisitor extends AllEx
StoreFuncInterface stoFunc =
(StoreFuncInterface)PigContext.instantiateFuncFromSpec(interStorageFuncSpec);
String sig = LogicalPlanBuilder.newOperatorKey(scope);
stoFunc.setStoreFuncUDFContextSignature(sig);
- store = new LOStore(lp, fileSpec, stoFunc, sig);
+ boolean disambiguationEnabled =
Boolean.parseBoolean(pigContext.getProperties().
+
getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
+
+ store = new LOStore(lp, fileSpec, stoFunc, sig,
disambiguationEnabled);
store.setTmpStore(true);
lp.add( store );
lp.connect( refOp, store );
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Mon Feb 27
22:12:47 2017
@@ -1003,8 +1003,10 @@ public class LogicalPlanBuilder {
fileNameMap.put(fileNameKey, absolutePath);
}
FileSpec fileSpec = new FileSpec(absolutePath, funcSpec);
+ boolean disambiguationEnabled =
Boolean.parseBoolean(pigContext.getProperties().
+
getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
- LOStore op = new LOStore(plan, fileSpec, stoFunc, signature);
+ LOStore op = new LOStore(plan, fileSpec, stoFunc, signature,
disambiguationEnabled);
return buildOp(loc, op, alias, inputAlias, null);
} catch(Exception ex) {
throw new ParserValidationException(intStream, loc, ex);
Modified: pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java Mon Feb 27
22:12:47 2017
@@ -70,8 +70,10 @@ public class QueryParserUtils {
fileName = removeQuotes( fileName );
FileSpec fileSpec = new FileSpec( fileName, funcSpec );
String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope);
+ boolean disambiguationEnabled =
Boolean.parseBoolean(pigContext.getProperties().
+
getProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT));
stoFunc.setStoreFuncUDFContextSignature(sig);
- LOStore store = new LOStore(lp, fileSpec, stoFunc, sig);
+ LOStore store = new LOStore(lp, fileSpec, stoFunc, sig,
disambiguationEnabled);
store.setAlias(alias);
try {
Modified: pig/trunk/test/org/apache/pig/test/TestSchema.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=1784664&r1=1784663&r2=1784664&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSchema.java Mon Feb 27 22:12:47 2017
@@ -29,7 +29,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.UUID;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.data.DataType;
@@ -42,10 +44,28 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.MergeMode;
import org.apache.pig.parser.ParserException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestSchema {
+ private static MiniGenericCluster cluster;
+ private static PigServer pigServer;
+
+ @BeforeClass
+ public static void setupTestCluster() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
+ }
+
+ @AfterClass
+ public static void tearDownTestCluster() throws Exception {
+ cluster.shutDown();
+ }
+
@Test
public void testSchemaEqual1() {
@@ -660,8 +680,6 @@ public class TestSchema {
@Test
public void testSchemaSerialization() throws IOException {
- MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
- PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
String inputFileName = "testSchemaSerialization-input.txt";
String[] inputData = new String[] { "foo\t1", "hello\t2" };
Util.createInputFile(cluster, inputFileName, inputData);
@@ -673,7 +691,6 @@ public class TestSchema {
Tuple t = it.next();
assertEquals("{a: {(f1: chararray,f2: int)}}", t.get(0));
}
- cluster.shutDown();
}
@Test
@@ -938,4 +955,79 @@ public class TestSchema {
assertTrue(schemaString.equals(s2));
}
}
+
+ @Test
+ public void testDisabledDisambiguationContainsNoColons() throws
IOException {
+ resetDisambiguationTestPropertyOverride();
+
+ String inputFileName = "testPrepend-input.txt";
+ String[] inputData = new String[]{"apple\t1\tred",
"orange\t2\torange", "kiwi\t3\tgreen", "orange\t4\torange"};
+ Util.createInputFile(cluster, inputFileName, inputData);
+
+ String script = "A = LOAD '" + inputFileName + "' AS (fruit:chararray,
foo:int, color: chararray);" +
+ "B = LOAD '" + inputFileName + "' AS (id:chararray, bar:int);"
+
+ "C = GROUP A BY (fruit,color);" +
+ "D = FOREACH C GENERATE FLATTEN(group), AVG(A.foo);" +
+ "D2 = FOREACH C GENERATE FLATTEN(group), AVG(A.foo) as
avgFoo;" +
+ "E = JOIN B BY id, D BY group::fruit;" +
+ "F = UNION ONSCHEMA B, D2;" +
+ "G = CROSS B, D2;";
+
+ Util.registerMultiLineQuery(pigServer, script);
+
+ //Prepending should happen with default settings
+ assertEquals("{B::id: chararray,B::bar: int,D::group::fruit:
chararray,D::group::color: chararray,double}",
pigServer.dumpSchema("E").toString());
+
+ //Override prepend property setting (check for flatten, join)
+
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,
"false");
+ assertEquals("{id: chararray,bar: int,fruit: chararray,color:
chararray,double}", pigServer.dumpSchema("E").toString());
+ assertTrue(pigServer.openIterator("E").hasNext());
+
+ //Check for union and cross
+ assertEquals("{id: chararray,bar: int,fruit: chararray,color:
chararray,avgFoo: double}", pigServer.dumpSchema("F").toString());
+ assertEquals("{id: chararray,bar: int,fruit: chararray,color:
chararray,avgFoo: double}", pigServer.dumpSchema("G").toString());
+
+ }
+
+ @Test
+ public void testEnabledDisambiguationPassesForDupeAliases() throws
IOException {
+ resetDisambiguationTestPropertyOverride();
+
+ checkForDupeAliases();
+
+ //Should pass with default settings
+ assertEquals("{A::id: chararray,A::val: int,B::id: chararray,B::val:
int}", pigServer.dumpSchema("C").toString());
+ assertTrue(pigServer.openIterator("C").hasNext());
+ }
+
+ @Test
+ public void testDisabledDisambiguationFailsForDupeAliases() throws
IOException {
+ resetDisambiguationTestPropertyOverride();
+
+ try {
+ checkForDupeAliases();
+ //Should fail with prepending disabled
+
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE,
"false");
+ pigServer.dumpSchema("C");
+ } catch (FrontendException e){
+ Assert.assertEquals("Duplicate schema alias: id in
\"fake\"",e.getCause().getMessage());
+ }
+ }
+
+ private static void checkForDupeAliases() throws IOException {
+ String inputFileName = "testPrependFail-input" +
UUID.randomUUID().toString() + ".txt";
+ String[] inputData = new String[]{"foo\t1", "bar\t2"};
+ Util.createInputFile(cluster, inputFileName, inputData);
+
+ String script = "A = LOAD '" + inputFileName + "' AS (id:chararray,
val:int);" +
+ "B = LOAD '" + inputFileName + "' AS (id:chararray, val:int);"
+
+ "C = JOIN A by id, B by id;";
+
+ Util.registerMultiLineQuery(pigServer, script);
+ }
+
+ private static void resetDisambiguationTestPropertyOverride() {
+ //Reset possible overrides
+
pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE);
+ }
}