pradeepkth
Wed, 18 Nov 2009 16:58:45 -0800
Author: pradeepkth Date: Thu Nov 19 00:58:17 2009 New Revision: 882021 URL: http://svn.apache.org/viewvc?rev=882021&view=rev Log: PIG-966: load-store-redesign branch: change SampleLoader and subclasses to work with new LoadFunc interface (thejas via pradeepkth) Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=882021&r1=882020&r2=882021&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 19 00:58:17 2009 @@ -44,7 +44,7 @@ import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.builtin.PoissonSampleLoader; import org.apache.pig.impl.builtin.MergeJoinIndexer; -import org.apache.pig.impl.builtin.TupleSize; +import org.apache.pig.impl.builtin.GetMemNumRows; import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.builtin.RandomSampleLoader; import org.apache.pig.impl.io.FileLocalizer; @@ -1387,6 +1387,7 @@ throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2."); } + //change plan to store the first join input into a temp file FileSpec fSpec = getTempFileSpec(); MapReduceOper mro = compiledInputs[0]; POStore str = getStore(); @@ -1460,7 +1461,10 @@ } // run POPartitionRearrange for second join table - lr = new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp); + POPartitionRearrange pr = + new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp); + pr.setPigContext(pigContext); + lr = pr; try { lr.setIndex(1); } catch (ExecException e) { @@ -1817,7 +1821,7 @@ PhysicalPlan ep = new PhysicalPlan(); POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps, - new FuncSpec(TupleSize.class.getName(), (String[])null)); + new FuncSpec(GetMemNumRows.class.getName(), (String[])null)); uf.setResultType(DataType.TUPLE); ep.add(uf); ep.add(prjStar); Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=882021&r1=882020&r2=882021&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Thu Nov 19 00:58:17 2009 @@ -25,6 +25,7 @@ import java.util.Iterator; +import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.BufferedPositionedInputStream; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.builtin.BinStorage; @@ -32,6 +33,7 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; @@ -66,14 +68,15 @@ * */ private static final long serialVersionUID = 1L; - private String partitionFile; - private Integer totalReducers = -1; - // ReducerMap will store the tuple, max reducer index & min reducer index - private static Map<Object, Pair<Integer, Integer> > reducerMap = new HashMap<Object, Pair<Integer, Integer> >(); - private boolean loaded; - - protected static final BagFactory mBagFactory = BagFactory.getInstance(); - + private String partitionFile; + private Integer totalReducers = -1; + // ReducerMap will store the tuple, max reducer index & min reducer index + private static Map<Object, Pair<Integer, Integer> > reducerMap = new HashMap<Object, Pair<Integer, Integer> >(); + private boolean loaded; + + protected static final BagFactory mBagFactory = BagFactory.getInstance(); + private PigContext pigContext; + public POPartitionRearrange(OperatorKey k) { this(k, -1, null); } @@ -102,17 +105,22 @@ partitionFile = file; } - /* Loads the key distribution file obtained from the sampler */ - private void loadPartitionFile() throws RuntimeException { - try { - Integer [] redCnt = new Integer[1]; - reducerMap = MapRedUtil.loadPartitionFile(partitionFile, redCnt, null, DataType.NULL); - totalReducers = redCnt[0]; - loaded = true; - } catch (Exception e) { - throw new RuntimeException(e); - } - } + /* Loads the key distribution file obtained from the sampler */ + private void loadPartitionFile() throws RuntimeException { + try { + Integer [] redCnt = new Integer[1]; + + reducerMap = MapRedUtil.loadPartitionFile(partitionFile, + redCnt, + ConfigurationUtil.toConfiguration(pigContext.getProperties()), + DataType.NULL + ); + totalReducers = redCnt[0]; + loaded = true; + } catch (Exception e) { + throw new RuntimeException(e); + } + } @Override public String name() { @@ -234,6 +242,20 @@ } /** + * @param pigContext the pigContext to set + */ + public void setPigContext(PigContext pigContext) { + this.pigContext = pigContext; + } + + /** + * @return the pigContext + */ + public PigContext getPigContext() { + return pigContext; + } + + /** * Make a deep copy of this operator. * @throws CloneNotSupportedException */ Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java?rev=882021&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java Thu Nov 19 00:58:17 2009 @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.builtin; + +import java.io.IOException; +import java.lang.reflect.Type; + +import org.apache.pig.EvalFunc; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; + +/** + * UDF to get memory size of a tuple and extracts number of rows value from + * special tuple created by PoissonSampleLoader + * It is used by skewed join. + * + */ + +public class GetMemNumRows extends EvalFunc<Tuple>{ + + private TupleFactory factory; + + public GetMemNumRows() { + factory = TupleFactory.getInstance(); + } + + /** + * @param in - input tuple + * @return - tuple having size in memory of this tuple and numRows if this + * is specially marked tuple having number of rows field + */ + public Tuple exec(Tuple in) throws IOException { + if (in == null) { + return null; + } + long memSize = in.getMemorySize(); + long numRows = 0; + + + // if this is specially marked tuple, get the number of rows + int tSize = in.size(); + if(tSize >=2 && + in.get(tSize-2).equals(PoissonSampleLoader.NUMROWS_TUPLE_MARKER)){ + numRows = (Long)in.get(tSize-1); + } + + //create tuple to be returned + Tuple t = factory.newTuple(2); + t.set(0, memSize); + t.set(1, numRows); + return t; + } + + public Type getReturnType(){ + return Tuple.class; + } +} Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=882021&r1=882020&r2=882021&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Thu Nov 19 00:58:17 2009 @@ -33,7 +33,6 @@ import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.util.Pair; /** @@ -82,8 +81,6 @@ private String inputFile_; - private long inputFileSize_; - private long totalSampleCount_; private double heapPercentage_; @@ -100,7 +97,6 @@ public PartitionSkewedKeys(String[] args) { totalReducers_ = -1; currentIndex_ = 0; - inputFileSize_ = -1; if (args != null && args.length > 0) { heapPercentage_ = Double.parseDouble(args[0]); @@ -123,187 +119,177 @@ * first field in the input tuple is the number of reducers * * second field is the *sorted* bag of samples + * this should be called only once */ public Map<String, Object> exec(Tuple in) throws IOException { - // get size of input file in bytes - if (inputFileSize_ == -1) { - try { - inputFileSize_ = FileLocalizer.getSize(inputFile_); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - Map<String, Object> output = new HashMap<String, Object>(); - - if (in == null || in.size() == 0) { - return null; - } - - totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_); - log.info("Maximum of available memory is " + totalMemory_); - - ArrayList<Tuple> reducerList = new ArrayList<Tuple>(); - - Tuple currentTuple = null; - long count = 0; - long totalMSize = 0; - long totalDSize = 0; - try { - totalReducers_ = (Integer) in.get(0); - DataBag samples = (DataBag) in.get(1); - - totalSampleCount_ = samples.size(); - - log.info("inputFileSize: " + inputFileSize_); - log.info("totalSample: " + totalSampleCount_); - log.info("totalReducers: " + totalReducers_); - - int maxReducers = 0; - Iterator<Tuple> iter = samples.iterator(); - while (iter.hasNext()) { - Tuple t = iter.next(); - if (hasSameKey(currentTuple, t) || currentTuple == null) { - count++; - totalMSize += getMemorySize(t); - totalDSize += getDiskSize(t); - } else { - Pair<Tuple, Integer> p = calculateReducers(currentTuple, - count, totalMSize, totalDSize); - Tuple rt = p.first; - if (rt != null) { - reducerList.add(rt); - } - if (maxReducers < p.second) { - maxReducers = p.second; - } - count = 1; - totalMSize = getMemorySize(t); - totalDSize = getDiskSize(t); - } - - currentTuple = t; - } - - // add last key - if (count > 0) { - Pair<Tuple, Integer> p = calculateReducers(currentTuple, count, - totalMSize, totalDSize); - Tuple rt = p.first; - if (rt != null) { - reducerList.add(rt); - } - if (maxReducers < p.second) { - maxReducers = p.second; - } - } - - if (maxReducers > totalReducers_) { - if(pigLogger != null) { - pigLogger.warn(this,"You need at least " + maxReducers - + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW); - } else { - log.warn("You need at least " + maxReducers - + " reducers to avoid spillage and run this job efficiently."); - } - } - - output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList)); - output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_)); - - log.info(output.toString()); - if (log.isDebugEnabled()) { - log.debug(output.toString()); - } - - return output; - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } + if (in == null || in.size() == 0) { + return null; + } + Map<String, Object> output = new HashMap<String, Object>(); + + totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_); + log.info("Maximum of available memory is " + totalMemory_); + + ArrayList<Tuple> reducerList = new ArrayList<Tuple>(); + + Tuple currentTuple = null; + long count = 0; + + // total size in memory for tuples in sample + long totalSampleMSize = 0; + + //total input rows for the join + long totalInputRows = 0; + + try { + totalReducers_ = (Integer) in.get(0); + DataBag samples = (DataBag) in.get(1); + + totalSampleCount_ = samples.size(); + + log.info("totalSample: " + totalSampleCount_); + log.info("totalReducers: " + totalReducers_); + + int maxReducers = 0; + + // first iterate the samples to find total number of rows + Iterator<Tuple> iter1 = samples.iterator(); + while (iter1.hasNext()) { + Tuple t = iter1.next(); + totalInputRows += (Long)t.get(t.size() - 1); + } + + // now iterate samples to do the reducer calculation + Iterator<Tuple> iter2 = samples.iterator(); + while (iter2.hasNext()) { + Tuple t = iter2.next(); + if (hasSameKey(currentTuple, t) || currentTuple == null) { + count++; + totalSampleMSize += getMemorySize(t); + } else { + Pair<Tuple, Integer> p = calculateReducers(currentTuple, + count, totalSampleMSize, totalInputRows); + Tuple rt = p.first; + if (rt != null) { + reducerList.add(rt); + } + if (maxReducers < p.second) { + maxReducers = p.second; + } + count = 1; + totalSampleMSize = getMemorySize(t); + } + + currentTuple = t; + } + + // add last key + if (count > 0) { + Pair<Tuple, Integer> p = calculateReducers(currentTuple, count, + totalSampleMSize, totalInputRows); + Tuple rt = p.first; + if (rt != null) { + reducerList.add(rt); + } + if (maxReducers < p.second) { + maxReducers = p.second; + } + } + + if (maxReducers > totalReducers_) { + if(pigLogger != null) { + pigLogger.warn(this,"You need at least " + maxReducers + + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW); + } else { + log.warn("You need at least " + maxReducers + + " reducers to avoid spillage and run this job efficiently."); + } + } + + output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList)); + output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_)); + + log.info(output.toString()); + if (log.isDebugEnabled()) { + log.debug(output.toString()); + } + + return output; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } } private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple, - long count, long totalMSize, long totalDSize) { - // get average memory size per tuple - double avgM = totalMSize / (double) count; - // get average disk size per tuple - double avgD = totalDSize / (double) count; - - // get the number of tuples that can fit into memory - long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_; - - // get the number of total tuples for this key - long tupleCount = (long) (((double) count) / totalSampleCount_ - * inputFileSize_ / avgD); - - - int redCount = (int) Math.round(Math.ceil((double) tupleCount - / tupleMCount)); - - if (log.isDebugEnabled()) - { - log.debug("avgM: " + avgM); - log.debug("avgD: " + avgD); - log.debug("count: " + count); - log.debug("A reducer can take " + tupleMCount + " tuples and " - + tupleCount + " tuples are find for " + currentTuple); - log.debug("key " + currentTuple + " need " + redCount + " reducers"); - } + long count, long totalMSize, long totalTuples) { + // get average memory size per tuple + double avgM = totalMSize / (double) count; + + // get the number of tuples that can fit into memory + long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_; + + // estimate the number of total tuples for this key + long keyTupleCount = (long) ( ((double) count/ totalSampleCount_) * + totalTuples); + + + int redCount = (int) Math.round(Math.ceil((double) keyTupleCount + / tupleMCount)); + + if (log.isDebugEnabled()) + { + log.debug("avgM: " + avgM); + log.debug("tuple count: " + keyTupleCount); + log.debug("count: " + count); + log.debug("A reducer can take " + tupleMCount + " tuples and " + + keyTupleCount + " tuples are find for " + currentTuple); + log.debug("key " + currentTuple + " need " + redCount + " reducers"); + } + + // this is not a skewed key + if (redCount == 1) { + return new Pair<Tuple, Integer>(null, 1); + } + + Tuple t = this.mTupleFactory.newTuple(currentTuple.size()); + int i = 0; + try { + // set keys + for (; i < currentTuple.size() - 2; i++) { + t.set(i, currentTuple.get(i)); + } + + // set the min index of reducer for this key + t.set(i++, currentIndex_); + currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1; + if (currentIndex_ < 0) { + currentIndex_ += totalReducers_; + } + // set the max index of reducer for this key + t.set(i++, currentIndex_); + } catch (ExecException e) { + throw new RuntimeException("Failed to set value to tuple." + e); + } - // this is not a skewed key - if (redCount == 1) { - return new Pair<Tuple, Integer>(null, 1); - } + currentIndex_ = (currentIndex_ + 1) % totalReducers_; - Tuple t = this.mTupleFactory.newTuple(currentTuple.size()); - int i = 0; - try { - // set keys - for (; i < currentTuple.size() - 2; i++) { - t.set(i, currentTuple.get(i)); - } + Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount); - // set the min index of reducer for this key - t.set(i++, currentIndex_); - currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1; - if (currentIndex_ < 0) { - currentIndex_ += totalReducers_; - } - // set the max index of reducer for this key - t.set(i++, currentIndex_); - } catch (ExecException e) { - throw new RuntimeException("Failed to set value to tuple." + e); - } - - currentIndex_ = (currentIndex_ + 1) % totalReducers_; - - Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount); - - return p; + return p; } // the last field of the tuple is a tuple for memory size and disk size private long getMemorySize(Tuple t) { - int s = t.size(); - try { - return (Long) t.get(s - 2); - } catch (ExecException e) { - throw new RuntimeException( - "Unable to retrive the size field from tuple.", e); - } + int s = t.size(); + try { + return (Long) t.get(s - 2); + } catch (ExecException e) { + throw new RuntimeException( + "Unable to retrive the size field from tuple.", e); + } } - // the last field of the tuple is a tuple for memory size and disk size - private long getDiskSize(Tuple t) { - int s = t.size(); - try { - return (Long) t.get(s - 1); - } catch (ExecException e) { - throw new RuntimeException( - "Unable to retrive the size field from tuple.", e); - } - } private boolean hasSameKey(Tuple t1, Tuple t2) { // Have to break the tuple down and compare it field to field. Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Nov 19 00:58:17 2009 @@ -21,65 +21,152 @@ import java.util.ArrayList; import java.util.Properties; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.pig.LoadCaster; -import org.apache.pig.PigException; + import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.util.Pair; /** - * Currently skipInterval is similar to the randomsampleloader. However, if we were to use an - * uniform distribution, we could precompute the intervals and read it from a file. - * + * See "Skewed Join sampler" in http://wiki.apache.org/pig/PigSampler */ -//XXX : FIXME - make this work with new load-store redesign public class PoissonSampleLoader extends SampleLoader { - // Base number of samples needed - private long baseNumSamples; + // marker string for special row with total number or rows. + // this will be value of first column in the special row + public static final String NUMROWS_TUPLE_MARKER = + "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah"; + + //num of rows sampled so far + private int numRowsSampled = 0; + + //average size of tuple in memory, for tuples sampled + private long avgTupleMemSz = 0; + + //current row number + private long rowNum = 0; - /// Count of the map splits - private static final String MAPSPLITS_COUNT = "pig.mapsplits.count"; + // number of tuples to skip after each sample + long skipInterval = -1; + + // bytes in input to skip after every sample. + // divide this by avgTupleMemSize to get skipInterval + private long memToSkipPerSample = 0; - /// Conversion factor accounts for the various encodings, compression etc - private static final String CONV_FACTOR = "pig.inputfile.conversionfactor"; + // has the special row with row number information been returned + private boolean numRowSplTupleReturned = false; /// For a given mean and a confidence, a sample rate is obtained from a poisson cdf private static final String SAMPLE_RATE = "pig.sksampler.samplerate"; + // 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean + // set to 10 (emperically, minimum number of samples) and the confidence set to 95% + private static final int DEFAULT_SAMPLE_RATE = 17; + + private int sampleRate = DEFAULT_SAMPLE_RATE; + /// % of memory available for the input data. This is currenty equal to the memory available /// for the skewed join private static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage"; + + private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE; + + // new Sample tuple + private Tuple newSample = null; + +// private final Log log = LogFactory.getLog(getClass()); - // 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean - // set to 10 (emperically, minimum number of samples) and the confidence set to 95% - private static final int DEFAULT_SAMPLE_RATE = 17; - - // By default the data is multiplied by 2 to account for the encoding - private static final int DEFAULT_CONV_FACTOR = 2; - - private final Log log = LogFactory.getLog(getClass()); public PoissonSampleLoader(String funcSpec, String ns) { super(funcSpec); super.setNumSamples(Integer.valueOf(ns)); // will be overridden } - - // n is the number of map tasks - @Override - public void setNumSamples(int n) { - super.setNumSamples(n); // will be overridden + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#getNext() + */ + public Tuple getNext() throws IOException { + if(numRowSplTupleReturned){ + // row num special row has been returned after all inputs + // were read, nothing more to read + return null; + } + + + if(skipInterval == -1){ + //select first tuple as sample and calculate + // number of tuples to be skipped + Tuple t = loader.getNext(); + if(t == null) + return createNumRowTuple(null); + long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc); + memToSkipPerSample = availRedMem/sampleRate; + updateSkipInterval(t); + + rowNum++; + newSample = t; + } + + // skip tuples + for(long numSkipped = 0; numSkipped < skipInterval; numSkipped++){ + if(!skipNext()){ + return createNumRowTuple(newSample); + } + rowNum++; + } + + // skipped enough, get new sample + Tuple t = loader.getNext(); + if(t == null) + return createNumRowTuple(newSample); + updateSkipInterval(t); + rowNum++; + Tuple currentSample = newSample; + newSample = t; + return currentSample; } - + + /** + * Update the average tuple size base on newly sampled tuple t + * and recalculate skipInterval + * @param t - tuple + */ + private void updateSkipInterval(Tuple t) { + avgTupleMemSz = + ((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1); + skipInterval = memToSkipPerSample/avgTupleMemSz; + + // skipping fewer number of rows the first few times, to reduce + // the probability of first tuples size (if much smaller than rest) + // resulting in + // very few samples being sampled. Sampling a little extra is OK + if(numRowsSampled < 5) + skipInterval = skipInterval/(10-numRowsSampled); + ++numRowsSampled; + + } + + /** + * @param sample - sample tuple + * @return - Tuple appended with special marker string column, num-rows column + * @throws ExecException + */ + private Tuple createNumRowTuple(Tuple sample) throws ExecException { + if(rowNum == 0 || sample == null) + return null; + TupleFactory factory = TupleFactory.getInstance(); + Tuple t = factory.newTuple(sample.size() + 2); + for(int i=0; i<sample.size(); i++){ + t.set(i, sample.get(i)); + } + t.set(sample.size(), NUMROWS_TUPLE_MARKER); + t.set(sample.size() + 1, rowNum); + numRowSplTupleReturned = true; + return t; + } + /** * Computes the number of samples for the loader * @@ -89,100 +176,28 @@ */ @Override public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) throws ExecException { - int numSplits, convFactor, sampleRate; - Properties pcProps = pc.getProperties(); - - // Set default values for the various parameters - try { - numSplits = Integer.valueOf(pcProps.getProperty(MAPSPLITS_COUNT)); - } catch (NumberFormatException e) { - String msg = "Couldn't retrieve the number of maps in the job"; - throw new ExecException(msg); - } - - try { - convFactor = Integer.valueOf(pcProps.getProperty(CONV_FACTOR)); - } catch (NumberFormatException e) { - convFactor = DEFAULT_CONV_FACTOR; - } - - try { - sampleRate = Integer.valueOf(pcProps.getProperty(SAMPLE_RATE)); - } catch (NumberFormatException e) { - sampleRate = DEFAULT_SAMPLE_RATE; - } - - // % of memory available for the records - float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE; - if (pcProps.getProperty(PERC_MEM_AVAIL) != null) { - try { - heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL)); - }catch(NumberFormatException e) { - // ignore, use default value - } - } - - // we are only concerned with the first input for skewed join - String fname = inputs.get(0).first.getFileName(); - - // calculate the base number of samples - try { - float f = (Runtime.getRuntime().maxMemory() * heapPerc) / (float) (FileLocalizer.getSize(fname,pcProps) * convFactor); - baseNumSamples = (long) Math.ceil(1.0 / f); - } catch (IOException e) { - int errCode = 2175; - String msg = "Internal error. Could not retrieve file size for the sampler."; - throw new ExecException(msg, errCode, PigException.BUG); - } catch (ArithmeticException e) { - int errCode = 1105; - String msg = "Heap percentage / Conversion factor cannot be set to 0"; - throw new ExecException(msg,errCode,PigException.INPUT); - } - - // set the number of samples - int n = (int) ((baseNumSamples * sampleRate) / numSplits); - - // set the minimum number of samples to 1 - n = (n > 1) ? n : 1; - setNumSamples(n); + Properties pcProps = pc.getProperties(); + + // % of memory available for the records + heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE; + if (pcProps.getProperty(PERC_MEM_AVAIL) != null) { + try { + heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL)); + }catch(NumberFormatException e) { + // ignore, use default value + } + } + + try { + sampleRate = Integer.valueOf(pcProps.getProperty(SAMPLE_RATE)); + } catch (NumberFormatException e) { + sampleRate = DEFAULT_SAMPLE_RATE; + } + } - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#getLoadCaster() - */ - @Override - public LoadCaster getLoadCaster() { - // TODO Auto-generated method stub - return null; - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit) - */ - @Override - public void prepareToRead(RecordReader reader, PigSplit split) { - // TODO Auto-generated method stub - - } - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job) - */ - @Override - public void setLocation(String location, Job job) throws IOException { - // TODO Auto-generated method stub - - } - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#relativeToAbsolutePath(java.lang.String, org.apache.hadoop.fs.Path) - */ - @Override - public String relativeToAbsolutePath(String location, Path curDir) - throws IOException { - // TODO Auto-generated method stub - return null; - } - + } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java Thu Nov 19 00:58:17 2009 @@ -18,23 +18,23 @@ package org.apache.pig.impl.builtin; import java.io.IOException; +import java.util.Random; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.pig.LoadCaster; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; /** - * A loader that samples the data. This loader can subsume loader that - * can handle starting in the middle of a record. Loaders that can - * handle this should implement the SamplableLoader interface. + * A loader that samples the data. + * It randomly samples tuples from input. The number of tuples to be sampled + * has to be set before the first call to getNext(). + * see documentation of getNext() call. */ -//XXX : FIXME - make this work with new load-store redesign public class RandomSampleLoader extends SampleLoader { + //array to store the sample tuples + Tuple [] samples = null; + //index into samples array to the next sample to be returned + protected int nextSampleIdx= 0; + /** * Construct with a class of loader to use. * @param funcSpec func spec of the loader to use. @@ -49,61 +49,67 @@ // set the number of samples super.setNumSamples(Integer.valueOf(ns)); } - - - @Override - public void setNumSamples(int n) { - // Setting it to 100 as default for order by - super.setNumSamples(100); - } /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#getInputFormat() + * @see org.apache.pig.LoadFunc#getNext() + * Allocate a buffer for numSamples elements, populate it with the + * first numSamples tuples, and continue scanning rest of the input. + * For every ith next() call, we generate a random number r s.t. 0<=r<i, + * and if r<numSamples we insert the new tuple into our buffer at position r. + * This gives us a random sample of the tuples in the partition. */ @Override - public InputFormat getInputFormat() throws IOException { - // TODO Auto-generated method stub - return loader.getInputFormat(); - } - - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#getLoadCaster() - */ - @Override - public LoadCaster getLoadCaster() { - // TODO Auto-generated method stub - return null; - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit) - */ - @Override - public void prepareToRead(RecordReader reader, PigSplit split) { - // TODO Auto-generated method stub + public Tuple getNext() throws IOException { + + if(samples != null){ + return getSample(); + } + //else collect samples + samples = new Tuple[numSamples]; - } - - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job) - */ - @Override - public void setLocation(String location, Job job) throws IOException { - // TODO Auto-generated method stub + // populate the samples array with first numSamples tuples + Tuple t = null; + for(int i=0; i<numSamples; i++){ + t = loader.getNext(); + if(t == null) + break; + samples[i] = t; + } + // rowNum that starts from 1 + int rowNum = numSamples+1; + Random randGen = new Random(); + + if(t != null){ // did not exhaust all tuples + while(true){ + // collect samples until input is exhausted + int rand = randGen.nextInt(rowNum); + if(rand < numSamples){ + // pick this as sample + Tuple sampleTuple = loader.getNext(); + if(sampleTuple == null) + break; + samples[rand] = sampleTuple; + }else { + //skip tuple + if(!skipNext()) + break; + } + rowNum++; + } + } + + return getSample(); + } + + private Tuple getSample() { + if(nextSampleIdx < samples.length){ + return samples[nextSampleIdx++]; + } + else{ + return null; + } } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#relativeToAbsolutePath(java.lang.String, org.apache.hadoop.fs.Path) - */ - @Override - public String relativeToAbsolutePath(String location, Path curDir) - throws IOException { - // TODO Auto-generated method stub - return null; - } } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java Thu Nov 19 00:58:17 2009 @@ -19,36 +19,32 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Map; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.pig.ExecType; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; -import org.apache.pig.SamplableLoader; -import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.BufferedPositionedInputStream; import org.apache.pig.impl.io.FileSpec; -import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.Pair; /** * Abstract class that specifies the interface for sample loaders * */ -//XXX : FIXME - make this work with new load-store redesign public abstract class SampleLoader implements LoadFunc { - protected int numSamples; - protected long skipInterval; + // number of samples to be sampled + protected int numSamples; + protected LoadFunc loader; - private TupleFactory factory; - private boolean initialized = false; - + + // RecordReader used by the underlying loader + private RecordReader<?, ?> recordReader= null; public SampleLoader(String funcSpec) { loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec); @@ -66,22 +62,49 @@ * @see org.apache.pig.LoadFunc#getInputFormat() */ @Override - public InputFormat getInputFormat() throws IOException { + public InputFormat<?,?> getInputFormat() throws IOException { return loader.getInputFormat(); + } + + public boolean skipNext() throws IOException { + try { + return recordReader.nextKeyValue(); + } catch (InterruptedException e) { + throw new IOException("Error getting input",e); + } + } + + public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) + throws ExecException { } + + @Override + public LoadCaster getLoadCaster() throws IOException { + return loader.getLoadCaster(); + } + + @Override + public String relativeToAbsolutePath(String location, Path curDir) + throws IOException { + return loader.relativeToAbsolutePath(location, curDir); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.hadoop.mapreduce.InputSplit) + */ + @Override + public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { + loader.prepareToRead(reader, split); + this.recordReader = reader; + } + /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#getNext() - */ - public Tuple getNext() throws IOException { - // estimate how many tuples there are in the map - // based on the - return null; - } - - public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) throws ExecException { - // TODO Auto-generated method stub - - } + * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job) + */ + @Override + public void setLocation(String location, Job job) throws IOException { + loader.setLocation(location, job); + } } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java?rev=882021&r1=882020&r2=882021&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java Thu Nov 19 00:58:17 2009 @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.impl.builtin; - -import java.io.IOException; -import java.lang.reflect.Type; - -import org.apache.pig.EvalFunc; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; - -/** - * UDF to get memory and disk size of a tuple. - * It is used by skewed join. - * - */ - -public class TupleSize extends EvalFunc<Tuple>{ - - private TupleFactory factory; - - public TupleSize() { - factory = TupleFactory.getInstance(); - } - - /** - * Get memory size and disk size of input tuple - */ - public Tuple exec(Tuple in) throws IOException { - if (in == null) { - return null; - } - - Tuple t = factory.newTuple(2); - t.set(0, in.getMemorySize()); - t.set(1, in.get(in.size()-1)); - - return t; - } - - public Type getReturnType(){ - return Tuple.class; - } -}