yanz
Tue, 02 Feb 2010 17:08:48 -0800
Author: yanz Date: Wed Feb 3 01:08:15 2010 New Revision: 905856 URL: http://svn.apache.org/viewvc?rev=905856&view=rev Log: PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz) Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=905856&r1=905855&r2=905856&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Wed Feb 3 01:08:15 2010 @@ -39,6 +39,8 @@ BUG FIXES + PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz) + PIG-1167: Hadoop file glob support (yanz) PIG-1145: Merge Join on Large Table throws an EOF exception (yanz) Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=905856&r1=905855&r2=905856&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Wed Feb 3 01:08:15 2010 @@ -95,6 +95,8 @@ private final static String DELETED_CG_PREFIX = ".deleted-"; + public final static String DELETED_CG_SEPARATOR_PER_TABLE = ","; + // no public ctor for instantiating a BasicTable object private BasicTable() { // no-op @@ -142,7 +144,7 @@ /* Retry up to numCGs times accounting for other CG deleting threads or processes.*/ while (triedCount ++ < numCGs) { try { - schemaFile = new SchemaFile(path, conf); + schemaFile = new SchemaFile(path, null, conf); break; } catch (FileNotFoundException e) { LOG.info("Try " + triedCount + " times : " + e.getMessage()); @@ -198,7 +200,7 @@ } catch (IOException e) { // one remote possibility is that another user // already deleted CG. - SchemaFile tempSchema = new SchemaFile(path, conf); + SchemaFile tempSchema = new SchemaFile(path, null, conf); if (tempSchema.isCGDeleted(cgIdx)) { LOG.info(path + " : " + cgName + " is deleted by someone else. That is ok."); @@ -281,10 +283,15 @@ * Optional configuration parameters. * @throws IOException */ + public Reader(Path path, Configuration conf) throws IOException { + this(path, null, conf); + } + public Reader(Path path, String[] deletedCGs, Configuration conf) throws IOException { try { + boolean mapper = (deletedCGs != null); this.path = path; - schemaFile = new SchemaFile(path, conf); + schemaFile = new SchemaFile(path, deletedCGs, conf); metaReader = MetaFile.createReader(new Path(path, BT_META_FILE), conf); // create column group readers int numCGs = schemaFile.getNumOfPhysicalSchemas(); @@ -301,7 +308,7 @@ if (!schemaFile.isCGDeleted(nx)) { colGroups[nx] = new ColumnGroup.Reader(new Path(path, partition.getCGSchema(nx).getName()), - conf); + conf, mapper); if (firstValidCG < 0) { firstValidCG = nx; } @@ -311,7 +318,8 @@ else cgTuples[nx] = null; } - buildStatus(); + if (schemaFile.isSorted()) + buildStatus(); closed = false; } catch (Exception e) { @@ -410,7 +418,9 @@ /** * Get the status of the BasicTable. */ - public BasicTableStatus getStatus() { + public BasicTableStatus getStatus() throws IOException { + if (status == null) + buildStatus(); return status; } @@ -565,13 +575,16 @@ * * @param path * The path to the BasicTable. + * @deletedCGs + * The deleted column groups from front end; null if unavailable from front end * @param conf * @return The logical Schema of the table (all columns). * @throws IOException */ public static Schema getSchema(Path path, Configuration conf) throws IOException { - SchemaFile schF = new SchemaFile(path, conf); + // fake an empty deleted cg list as getSchema does not care about deleted cgs + SchemaFile schF = new SchemaFile(path, new String[0], conf); return schF.getLogical(); } @@ -653,7 +666,7 @@ * Get index of the column group that will be used for row-based split. * */ - public int getRowSplitCGIndex() { + public int getRowSplitCGIndex() throws IOException { // Try to find the largest non-deleted and used column group by projection; int largestCGIndex = -1; int splitCGIndex = -1; @@ -725,8 +738,12 @@ String getStorageString() { return schemaFile.getStorageString(); } + + public String getDeletedCGs() { + return schemaFile.getDeletedCGs(); + } - private void buildStatus() { + private void buildStatus() throws IOException { status = new BasicTableStatus(); if (firstValidCG >= 0) { status.beginKey = colGroups[firstValidCG].getStatus().getBeginKey(); @@ -913,11 +930,12 @@ int cgIdx = rowSplit.getCGIndex(); CGRowSplit cgSplit = new CGRowSplit(); - cgSplit.fileIndex = inputCGSplit.fileIndex; + cgSplit.name = inputCGSplit.name; // startByte and numBytes from inputCGSplit are ignored, since // they make sense for only one CG. cgSplit.startRow = inputCGSplit.startRow; cgSplit.numRows = inputCGSplit.numRows; + cgSplit.size = inputCGSplit.size; if (cgSplit.startRow >= 0) { //assume the rows are already set up. @@ -1337,9 +1355,11 @@ * thrown if the table is already closed, or is in the process of being * closed. */ + public Writer(Path path, Configuration conf) throws IOException { try { - schemaFile = new SchemaFile(path, conf); + // fake an empty deleted cg list as no cg should have been deleted now + schemaFile = new SchemaFile(path, new String[0], conf); int numCGs = schemaFile.getNumOfPhysicalSchemas(); partition = schemaFile.getPartition(); sorted = schemaFile.isSorted(); @@ -1650,8 +1670,8 @@ boolean[] cgDeletedFlags; // ctor for reading - public SchemaFile(Path path, Configuration conf) throws IOException { - readSchemaFile(path, conf); + public SchemaFile(Path path, String[] deletedCGs, Configuration conf) throws IOException { + readSchemaFile(path, deletedCGs, conf); } public Schema[] getPhysicalSchema() { @@ -1798,7 +1818,7 @@ outSchema.close(); } - private void readSchemaFile(Path path, Configuration conf) + private void readSchemaFile(Path path, String[] deletedCGs, Configuration conf) throws IOException { Path pathSchema = makeSchemaFilePath(path); if (!path.getFileSystem(conf).exists(pathSchema)) { @@ -1845,7 +1865,18 @@ throw new IOException("parser.RecordSchema failed :" + e.getMessage()); } sorted = WritableUtils.readVInt(in) == 1 ? true : false; - setCGDeletedFlags(path, conf); + if (deletedCGs == null) + setCGDeletedFlags(path, conf); + else { + for (String deletedCG : deletedCGs) + { + for (int i = 0; i < cgschemas.length; i++) + { + if (cgschemas[i].getName().equals(deletedCG)) + cgDeletedFlags[i] = true; + } + } + } if (version.compareTo(new Version((short)1, (short)0)) > 0) { int numSortColumns = WritableUtils.readVInt(in); @@ -1915,7 +1946,23 @@ } } - + String getDeletedCGs() { + StringBuilder sb = new StringBuilder(); + // comma separated + boolean first = true; + for (int i = 0; i < physical.length; i++) { + if (cgDeletedFlags[i]) + { + if (first) + first = false; + else { + sb.append(DELETED_CG_SEPARATOR_PER_TABLE); + } + sb.append(getName(i)); + } + } + return sb.toString(); + } } static public void dumpInfo(String file, PrintStream out, Configuration conf) Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=905856&r1=905855&r2=905856&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Wed Feb 3 01:08:15 2010 @@ -225,6 +225,7 @@ SplitColumn top; // directly associated with logical schema SplitColumn leaf; // corresponding to projection boolean closed; + boolean dirty; /** * Get the Column Group physical schema without loading the full CG index. @@ -255,13 +256,24 @@ */ public Reader(Path path, Configuration conf) throws IOException, ParseException { - this(path, true, conf); + this(path, conf, false); } - + + public Reader(Path path, Configuration conf, boolean mapper) throws IOException, + ParseException { + this(path, true, conf, mapper); + } + Reader(Path path, boolean dirty, Configuration conf) throws IOException, ParseException { + this(path, dirty, conf, false); + } + + Reader(Path path, boolean dirty, Configuration conf, boolean mapper) throws IOException, + ParseException { this.path = path; this.conf = conf; + this.dirty = dirty; fs = path.getFileSystem(conf); // check existence of path @@ -269,7 +281,7 @@ throw new IOException("Path doesn't exist: " + path); } - if (!fs.getFileStatus(path).isDir()) { + if (!mapper && !fs.getFileStatus(path).isDir()) { throw new IOException("Path exists but not a directory: " + path); } @@ -279,8 +291,8 @@ } projection = new Projection(cgschema.getSchema()); // default projection to CG schema. Path metaFilePath = makeMetaFilePath(path); - /* If index file is not existing or loading from an unsorted table. */ - if (!fs.exists(metaFilePath) || !cgschema.isSorted() ) { + /* If index file is not existing */ + if (!fs.exists(metaFilePath)) { // special case for unsorted CG that did not create index properly. if (cgschema.isSorted()) { throw new FileNotFoundException( @@ -288,15 +300,16 @@ } cgindex = buildIndex(fs, path, dirty, conf); } - else { + else if (cgschema.isSorted()) { MetaFile.Reader metaFile = MetaFile.createReader(metaFilePath, conf); try { cgindex = new CGIndex(); DataInputStream dis = metaFile.getMetaBlock(BLOCK_NAME_INDEX); try { cgindex.readFields(dis); - } - finally { + } catch (IOException e) { + throw new IOException("Index file read failure :"+ e.getMessage()); + } finally { dis.close(); } } @@ -429,6 +442,8 @@ } if (split == null) { + if (cgindex == null) + cgindex = buildIndex(fs, path, dirty, conf); return getScanner(new CGRangeSplit(0, cgindex.size()), closeReader); } if (split.len < 0) { @@ -474,6 +489,8 @@ return getBlockDistribution(new CGRangeSplit(0, cgindex.size())); } + if (cgindex == null) + cgindex = buildIndex(fs, path, dirty, conf); if ((split.start | split.len | (cgindex.size() - split.start - split.len)) < 0) { throw new IndexOutOfBoundsException("Bad split"); } @@ -509,10 +526,9 @@ } BlockDistribution ret = new BlockDistribution(); - if (split.fileIndex >= 0) + if (split.name != null) { - CGIndexEntry entry = cgindex.get(split.fileIndex); - FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName())); + FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.name)); BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes); for (BlockLocation l : locations) { @@ -532,17 +548,26 @@ void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length) throws IOException { - if (rowSplit.fileIndex < 0) + if (rowSplit.name == null) return; - Path tfPath = new Path(path, cgindex.get(rowSplit.fileIndex).getName()); - FileStatus tfile = fs.getFileStatus(tfPath); + Path tfPath = new Path(path, rowSplit.name); + long size = rowSplit.size; + if (size == 0) + { + /* the on disk table is sorted. Later this will be made unnecessary when + * CGIndexEntry serializes its bytes field and the meta file versioning is + * supported. + */ + FileStatus tfile = fs.getFileStatus(tfPath); + size = tfile.getLen(); + } TFile.Reader reader = null; try { reader = new TFile.Reader(fs.open(tfPath), - tfile.getLen(), conf); + size, conf); long startRow = reader.getRecordNumNear(startOffset); long endRow = reader.getRecordNumNear(startOffset + length); @@ -703,7 +728,9 @@ /** * Get the status of the ColumnGroup. */ - public BasicTableStatus getStatus() { + public BasicTableStatus getStatus() throws IOException { + if (cgindex == null) + cgindex = buildIndex(fs, path, dirty, conf); return cgindex.status; } @@ -715,10 +742,12 @@ * @return A list of range-based splits, whose size may be less than or * equal to n. */ - public List<CGRangeSplit> rangeSplit(int n) { + public List<CGRangeSplit> rangeSplit(int n) throws IOException { // The output of this method must be only dependent on the cgindex and // input parameter n - so that horizontally stitched column groups will // get aligned splits. + if (cgindex == null) + cgindex = buildIndex(fs, path, dirty, conf); int numFiles = cgindex.size(); if ((numFiles < n) || (n < 0)) { return rangeSplit(numFiles); @@ -752,8 +781,10 @@ long start = starts[i]; long length = lengths[i]; Path path = paths[i]; - int idx = cgindex.getFileIndex(path); - lst.add(new CGRowSplit(idx, start, length)); + if (cgindex == null) + cgindex = buildIndex(fs, this.path, dirty, conf); + long size = cgindex.get(cgindex.getFileIndex(path)).bytes; + lst.add(new CGRowSplit(path.getName(), start, length, size)); } return lst; @@ -796,7 +827,7 @@ * compressor is inside cgschema */ reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf); - if (rowRange != null && rowRange.fileIndex >= 0) { + if (rowRange != null) { scanner = reader.createScannerByRecordNum(rowRange.startRow, rowRange.startRow + rowRange.numRows); } else { @@ -921,6 +952,8 @@ CGScanner(CGRangeSplit split, boolean closeReader) throws IOException, ParseException { + if (cgindex== null) + cgindex = buildIndex(fs, path, dirty, conf); if (split == null) { beginIndex = 0; endIndex = cgindex.size(); @@ -940,15 +973,9 @@ */ CGScanner(CGRowSplit rowRange, boolean closeReader) throws IOException, ParseException { + beginIndex = 0; - endIndex = cgindex.size(); - if (rowRange != null && rowRange.fileIndex>= 0) { - if (rowRange.fileIndex >= cgindex.size()) { - throw new IllegalArgumentException("Part Index is out of range."); - } - beginIndex = rowRange.fileIndex; - endIndex = beginIndex+1; - } + endIndex = 1; init(rowRange, null, null, closeReader); } @@ -981,8 +1008,15 @@ for (int i = beginIndex; i < endIndex; ++i) { RawComparable begin = (i == beginIndex) ? beginKey : null; RawComparable end = (i == endIndex - 1) ? endKey : null; - TFileScanner scanner = - new TFileScanner(fs, cgindex.getPath(i, path), rowRange, + TFileScanner scanner; + if (rowRange != null) + scanner = + new TFileScanner(fs, new Path(path, rowRange.name), rowRange, + begin, end, + cgschema, logicalSchema, conf); + else + scanner = + new TFileScanner(fs, cgindex.getPath(i, path), null, begin, end, cgschema, logicalSchema, conf); // skip empty scanners. @@ -1160,16 +1194,18 @@ } public static class CGRowSplit implements Writable { - int fileIndex = -1; + String name; long startByte = -1; long numBytes = -1; long startRow = -1; long numRows = -1; + long size = 0; // size of the file in the selected CG - CGRowSplit(int fileIdx, long start, long len) { - this.fileIndex = fileIdx; + CGRowSplit(String name, long start, long len, long size) { + this.name = name; this.startByte = start; this.numBytes = len; + this.size = size; } public CGRowSplit() { @@ -1179,31 +1215,34 @@ @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("{fileIndex = " + fileIndex + "}\n"); + sb.append("{name = " + name + "}\n"); sb.append("{startByte = " + startByte + "}\n"); sb.append("{numBytes = " + numBytes + "}\n"); sb.append("{startRow = " + startRow + "}\n"); sb.append("{numRows = " + numRows + "}\n"); + sb.append("{size = " + size + "}\n"); return sb.toString(); } @Override public void readFields(DataInput in) throws IOException { - fileIndex = Utils.readVInt(in); + name = Utils.readString(in); startByte = Utils.readVLong(in); numBytes = Utils.readVLong(in); startRow = Utils.readVLong(in); numRows = Utils.readVLong(in); + size = Utils.readVLong(in); } @Override public void write(DataOutput out) throws IOException { - Utils.writeVInt(out, fileIndex); + Utils.writeString(out, name); Utils.writeVLong(out, startByte); Utils.writeVLong(out, numBytes); Utils.writeVLong(out, startRow); Utils.writeVLong(out, numRows); + Utils.writeVLong(out, size); } } @@ -1444,23 +1483,13 @@ private void createIndex() throws IOException { MetaFile.Writer metaFile = MetaFile.createWriter(makeMetaFilePath(path), conf); - if (cgschema.isSorted()) { - CGIndex index = buildIndex(fs, path, false, conf); - DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX); - try { - index.write(dos); - } - finally { - dos.close(); - } - } else { /* Create an empty data meta file for unsorted table. */ - DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX); - try { - Utils.writeString(dos, ""); - } - finally { - dos.close(); - } + CGIndex index = buildIndex(fs, path, false, conf); + DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX); + try { + index.write(dos); + } + finally { + dos.close(); } metaFile.close(); } @@ -1689,7 +1718,7 @@ static class CGIndexEntry implements RawComparable, Writable { int index; String name; - long rows; + long rows, bytes; RawComparable firstKey; RawComparable lastKey; @@ -1890,6 +1919,7 @@ status.rows += rows; index.add(range); sorted = false; + range.bytes = bytes; } // building dirty index @@ -1901,6 +1931,7 @@ next.name = name; index.add(next); sorted = false; + next.bytes = bytes; } int lowerBound(RawComparable key, final Comparator<RawComparable> comparator) @@ -1935,6 +1966,7 @@ for (int i = 0; i < n; ++i) { CGIndexEntry range = new CGIndexEntry(); range.readFields(in); + range.setIndex(i); index.add(range); } status.readFields(in); @@ -2035,6 +2067,8 @@ out.printf("%s : %s\n", e.getKey(), e.getValue()); } out.println("TFiles within the Column Group :"); + if (reader.cgindex == null) + reader.cgindex = buildIndex(reader.fs, reader.path, reader.dirty, conf); for (CGIndexEntry entry : reader.cgindex.index) { IOutils.indent(out, indent); out.printf(" *Name : %s\n", entry.name); Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java?rev=905856&r1=905855&r2=905856&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java Wed Feb 3 01:08:15 2010 @@ -106,7 +106,8 @@ @Override public TableScanner getScanner(BytesWritable begin, BytesWritable end, String projection, Configuration conf) throws IOException { - BasicTable.Reader reader = new BasicTable.Reader(path, conf); + String[] deletedCGs = getDeletedCGs(conf); + BasicTable.Reader reader = new BasicTable.Reader(path, deletedCGs, conf); try { reader.setProjection(projection); } catch (ParseException e) { Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=905856&r1=905855&r2=905856&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Wed Feb 3 01:08:15 2010 @@ -27,7 +27,6 @@ import org.apache.hadoop.zebra.io.BasicTable; import org.apache.hadoop.zebra.io.TableScanner; import org.apache.hadoop.zebra.parser.ParseException; -import org.apache.hadoop.zebra.types.Projection; import org.apache.hadoop.zebra.schema.Schema; /** @@ -105,7 +104,6 @@ * @see Schema * @return A TableScanner object. */ - @SuppressWarnings("unused") public TableScanner getScanner(BytesWritable begin, BytesWritable end, String projection, Configuration conf) throws IOException { @@ -127,7 +125,7 @@ public TableScanner getScanner(UnsortedTableSplit split, String projection, Configuration conf) throws IOException, ParseException { BasicTable.Reader reader = - new BasicTable.Reader(new Path(split.getPath()), conf); + new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf); reader.setProjection(projection); return reader.getScanner(split.getSplit(), true); } @@ -147,7 +145,7 @@ public TableScanner getScanner(RowTableSplit split, String projection, Configuration conf) throws IOException, ParseException, ParseException { BasicTable.Reader reader = - new BasicTable.Reader(new Path(split.getPath()), conf); + new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf); reader.setProjection(projection); return reader.getScanner(true, split.getSplit()); } @@ -240,4 +238,31 @@ * dump table info with indent */ protected abstract void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException; + + /** + * get the deleted cg for tables in union + * @param conf The Configuration object + * @return + */ + protected final String[] getDeletedCGsPerUnion(Configuration conf) { + return getDeletedCGs(conf, TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION); + } + + protected final String[] getDeletedCGs(Configuration conf) { + return getDeletedCGs(conf, BasicTable.DELETED_CG_SEPARATOR_PER_TABLE); + } + + private final String[] getDeletedCGs(Configuration conf, String separator) { + String[] deletedCGs = null; + String fe; + if ((fe = conf.get(TableInputFormat.INPUT_FE)) != null && fe.equals("true")) + { + String original = conf.get(TableInputFormat.INPUT_DELETEED_CGS, null); + if (original == null) + deletedCGs = new String[0]; // empty array needed to indicate it is fe checked + else + deletedCGs = original.split(separator, -1); + } + return deletedCGs; + } } Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=905856&r1=905855&r2=905856&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Wed Feb 3 01:08:15 2010 @@ -140,9 +140,12 @@ public class TableInputFormat implements InputFormat<BytesWritable, Tuple> { static Log LOG = LogFactory.getLog(TableInputFormat.class); - private static final String INPUT_EXPR = "mapred.lib.table.input.expr"; - private static final String INPUT_PROJ = "mapred.lib.table.input.projection"; - private static final String INPUT_SORT = "mapred.lib.table.input.sort"; + public static final String INPUT_EXPR = "mapred.lib.table.input.expr"; + public static final String INPUT_PROJ = "mapred.lib.table.input.projection"; + public static final String INPUT_SORT = "mapred.lib.table.input.sort"; + public static final String INPUT_FE = "mapred.lib.table.input.fe"; + public static final String INPUT_DELETEED_CGS = "mapred.lib.table.input.deleted_cgs"; + static final String DELETED_CG_SEPARATOR_PER_UNION = ";"; /** * Set the paths to the input table. @@ -642,8 +645,7 @@ } private static InputSplit[] getRowSplits(JobConf conf, int numSplits, - TableExpr expr, List<BasicTable.Reader> readers, - List<BasicTableStatus> status) throws IOException { + TableExpr expr, List<BasicTable.Reader> readers) throws IOException { ArrayList<InputSplit> ret = new ArrayList<InputSplit>(); DummyFileInputFormat helper = new DummyFileInputFormat(getMinSplitSize(conf)); @@ -715,25 +717,40 @@ new ArrayList<BasicTableStatus>(nLeaves); try { + StringBuilder sb = new StringBuilder(); + boolean sorted = expr.sortedSplitRequired(); + boolean first = true; for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) { LeafTableInfo leaf = it.next(); BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf); reader.setProjection(leaf.getProjection()); - BasicTableStatus s = reader.getStatus(); + if (sorted) + { + BasicTableStatus s = reader.getStatus(); + status.add(s); + } readers.add(reader); - status.add(s); + if (first) + first = false; + else { + sb.append(TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION); + } + sb.append(reader.getDeletedCGs()); } + conf.set(INPUT_FE, "true"); + conf.set(INPUT_DELETEED_CGS, sb.toString()); + if (readers.isEmpty()) { return new InputSplit[0]; } - if (expr.sortedSplitRequired()) { + if (sorted) { return getSortedSplits(conf, numSplits, expr, readers, status); } - return getRowSplits(conf, numSplits, expr, readers, status); + return getRowSplits(conf, numSplits, expr, readers); } catch (ParseException e) { throw new IOException("Projection parsing failed : "+e.getMessage()); } Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=905856&r1=905855&r2=905856&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java Wed Feb 3 01:08:15 2010 @@ -105,14 +105,20 @@ throw new IllegalArgumentException("Union of 0 table"); } ArrayList<BasicTable.Reader> readers = new ArrayList<BasicTable.Reader>(n); - final ArrayList<BasicTableStatus> status = - new ArrayList<BasicTableStatus>(n); + String[] deletedCGsInUnion = getDeletedCGsPerUnion(conf); + + if (deletedCGsInUnion != null && deletedCGsInUnion.length != n) + throw new IllegalArgumentException("Invalid string of deleted column group names: expected = "+ + n + " actual =" + deletedCGsInUnion.length); + for (int i = 0; i < n; ++i) { + String deletedCGs = (deletedCGsInUnion == null ? null : deletedCGsInUnion[i]); + String[] deletedCGList = (deletedCGs == null ? null : + deletedCGs.split(BasicTable.DELETED_CG_SEPARATOR_PER_TABLE)); BasicTableExpr expr = (BasicTableExpr) composite.get(i); BasicTable.Reader reader = - new BasicTable.Reader(expr.getPath(), conf); + new BasicTable.Reader(expr.getPath(), deletedCGList, conf); readers.add(reader); - status.add(reader.getStatus()); } String actualProjection = projection; Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=905856&r1=905855&r2=905856&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Wed Feb 3 01:08:15 2010 @@ -430,6 +430,9 @@ private TreeMap<String, String> configMap; private InputSplit split; + transient private final String[] zebraConfs = {TableInputFormat.INPUT_EXPR, + TableInputFormat.INPUT_PROJ, TableInputFormat.INPUT_SORT, + TableInputFormat.INPUT_DELETEED_CGS, TableInputFormat.INPUT_FE, "mapred.input.dir"}; transient private JobConf conf; transient private int numProjCols = 0; transient private RecordReader<BytesWritable, Tuple> scanner; @@ -437,16 +440,16 @@ transient private boolean sorted = false; TableSlice(JobConf conf, InputSplit split, boolean sorted) { - // hack: expecting JobConf contains nothing but a <string, string> - // key-value pair store. configMap = new TreeMap<String, String>(); - for (Iterator<Map.Entry<String, String>> it = conf.iterator(); it.hasNext();) { - Map.Entry<String, String> e = it.next(); - configMap.put(e.getKey(), e.getValue()); - } - - - + String value; + + for (String zebraConf : zebraConfs) + { + value = conf.get(zebraConf); + if (value != null) + configMap.put(zebraConf, value); + } + this.split = split; this.sorted = sorted; } @@ -500,14 +503,14 @@ @Override public void init(DataStorage store) throws IOException { - Configuration localConf = new Configuration(); + Configuration localConf = ConfigurationUtil.toConfiguration(store.getConfiguration()); for (Iterator<Map.Entry<String, String>> it = configMap.entrySet().iterator(); it.hasNext();) { Map.Entry<String, String> e = it.next(); localConf.set(e.getKey(), e.getValue()); } conf = new JobConf(localConf); - String projection; + String projection; try { projection = TableInputFormat.getProjection(conf); @@ -516,8 +519,8 @@ } numProjCols = Projection.getNumColumns(projection); TableInputFormat inputFormat = new TableInputFormat(); - if (sorted) - TableInputFormat.requireSortedTable(conf, null); + if (sorted) + TableInputFormat.requireSortedTable(conf, null); scanner = inputFormat.getRecordReader(split, conf, Reporter.NULL); key = new BytesWritable(); }