Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Feb 24 03:34:37 2017 @@ -62,6 +62,7 @@ import org.apache.tez.dag.api.EdgeProper */ public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator { + static private int maxTaskCount; static final double DEFAULT_FLATTEN_FACTOR = 10; static final double DEFAULT_FILTER_FACTOR = 0.7; static final double DEFAULT_LIMIT_FACTOR = 0.1; @@ -75,8 +76,6 @@ public class TezOperDependencyParallelis static final double DEFAULT_AGGREGATION_FACTOR = 0.7; private PigContext pc; - private int maxTaskCount; - private long bytesPerReducer; @Override public void setPigContext(PigContext pc) { @@ -95,18 +94,16 @@ public class TezOperDependencyParallelis maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM, PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM); - bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER); - - // If we have already estimated parallelism, use that one - if (tezOper.getEstimatedParallelism() != -1) { - return tezOper.getEstimatedParallelism(); - } - // If parallelism is set explicitly, respect it if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) { return tezOper.getRequestedParallelism(); } + // If we have already estimated parallelism, use that one + if (tezOper.getEstimatedParallelism()!=-1) { + return tezOper.getEstimatedParallelism(); + } + List<TezOperator> preds = plan.getPredecessors(tezOper); if (preds==null) { throw new IOException("Cannot estimate parallelism for source vertex"); @@ -133,12 +130,6 @@ public class TezOperDependencyParallelis boolean applyFactor = !tezOper.isUnion(); if (!pred.isVertexGroup() && applyFactor) { predParallelism = predParallelism * pred.getParallelismFactor(tezOper); - if (pred.getTotalInputFilesSize() > 0) { - // Estimate similar to mapreduce and use the maximum of two - int parallelismBySize = (int) Math.ceil((double) pred - .getTotalInputFilesSize() / bytesPerReducer); - predParallelism = Math.max(predParallelism, parallelismBySize); - } } estimatedParallelism += predParallelism; } @@ -166,7 +157,9 @@ public class TezOperDependencyParallelis } if (roundedEstimatedParallelism == 0) { - roundedEstimatedParallelism = 1; // We need to produce empty output file + throw new IOException("Estimated parallelism for " + + tezOper.getOperatorKey().toString() + + " is 0 which is unexpected"); } return roundedEstimatedParallelism; @@ -203,7 +196,7 @@ public class TezOperDependencyParallelis if (successor != null) { // Map side combiner TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey()); - if (!edge.combinePlan.isEmpty() || edge.needsDistinctCombiner()) { + if (!edge.combinePlan.isEmpty()) { if (successor.isDistinct()) { factor = DEFAULT_DISTINCT_FACTOR; } else {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Fri Feb 24 03:34:37 2017 @@ -29,7 +29,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.PigConfiguration; -import org.apache.pig.StoreFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; @@ -45,7 +44,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; -import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; @@ -54,6 +52,7 @@ import org.apache.pig.builtin.AvroStorag import org.apache.pig.builtin.JsonStorage; import org.apache.pig.builtin.OrcStorage; import org.apache.pig.builtin.PigStorage; +import org.apache.pig.builtin.RoundRobinPartitioner; import org.apache.pig.builtin.mock.Storage; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; @@ -109,12 +108,6 @@ public class UnionOptimizer extends TezO if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) { return false; } - - // If user has specified a PARALLEL clause with the union operator - // turn off union optimization - if (tezOp.getRequestedParallelism() != -1) { - return false; - } // Two vertices separately ranking with 1 to n and writing to output directly // will make each rank repeate twice which is wrong. Rank always needs to be // done from single vertex to have the counting correct. @@ -127,25 +120,10 @@ public class UnionOptimizer extends TezO public static boolean isOptimizableStoreFunc(TezOperator tezOp, List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs) throws VisitorException { - List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class); - - for (POStoreTez store : stores) { - String name = store.getStoreFunc().getClass().getName(); - if (store.getStoreFunc() instanceof StoreFunc) { - StoreFunc func = (StoreFunc) store.getStoreFunc(); - if (func.supportsParallelWriteToStoreLocation() != null) { - if (func.supportsParallelWriteToStoreLocation()) { - continue; - } else { - LOG.warn(name + " does not support union optimization." - + " Disabling it. There will be some performance degradation."); - return false; - } - } - } - // If StoreFunc does not explicitly state support, then check supported and - // unsupported config settings. - if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) { + if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) { + List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class); + for (POStoreTez store : stores) { + String name = store.getStoreFunc().getClass().getName(); if (unsupportedStoreFuncs != null && unsupportedStoreFuncs.contains(name)) { return false; @@ -259,23 +237,8 @@ public class UnionOptimizer extends TezO for (TezOperator succ : successors) { if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) { existingVertexGroup = succ; - break; - } - } - } - if (existingVertexGroup == null) { - // In the case of union + split + union + store, the different stores in the Split - // will be writing to same location after second union operator is optimized. - // So while optimizing the first union, we should just make it write to one vertex group - for (int j = 0; j < i; j++) { - if (unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile())) { - storeVertexGroupOps[i] = storeVertexGroupOps[j]; - break; } } - if (storeVertexGroupOps[i] != null) { - continue; - } } if (existingVertexGroup != null) { storeVertexGroupOps[i] = existingVertexGroup; @@ -307,15 +270,6 @@ public class UnionOptimizer extends TezO TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()]; String[] newOutputKeys = new String[unionOutputKeys.size()]; for (int i=0; i < outputVertexGroupOps.length; i++) { - for (int j = 0; j < i; j++) { - if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) { - outputVertexGroupOps[i] = outputVertexGroupOps[j]; - break; - } - } - if (outputVertexGroupOps[i] != null) { - continue; - } outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope)); outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo()); outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i)); @@ -561,24 +515,15 @@ public class UnionOptimizer extends TezO // Connect predecessor to the storeVertexGroups int i = 0; for (TezOperator storeVertexGroup : storeVertexGroupOps) { - // Skip connecting if they are already connected. Can happen in case of - // union + split + union + store. Because of the split all the stores - // will be writing to same location - List<OperatorKey> inputs = storeVertexGroup.getVertexGroupInfo().getInputs(); - if (inputs == null || !inputs.contains(pred.getOperatorKey())) { - tezPlan.connect(pred, storeVertexGroup); - } storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(), storeVertexGroup.getOperatorKey()); + tezPlan.connect(pred, storeVertexGroup); } for (TezOperator outputVertexGroup : outputVertexGroupOps) { - List<OperatorKey> inputs = outputVertexGroup.getVertexGroupInfo().getInputs(); - if (inputs == null || !inputs.contains(pred.getOperatorKey())) { - tezPlan.connect(pred, outputVertexGroup); - } outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); + tezPlan.connect(pred, outputVertexGroup); } copyOperatorProperties(pred, unionOp); @@ -623,7 +568,7 @@ public class UnionOptimizer extends TezO // more union predecessors. Change it to SCATTER_GATHER if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) { edge.dataMovementType = DataMovementType.SCATTER_GATHER; - edge.partitionerClass = HashValuePartitioner.class; + edge.partitionerClass = RoundRobinPartitioner.class; edge.outputClassName = UnorderedPartitionedKVOutput.class.getName(); edge.inputClassName = UnorderedKVInput.class.getName(); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java Fri Feb 24 03:34:37 2017 @@ -17,25 +17,23 @@ */ package org.apache.pig.backend.hadoop.executionengine.tez.runtime; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; -import org.apache.tez.dag.api.event.VertexState; -import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.VertexManagerEvent; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; /** @@ -48,13 +46,8 @@ import com.google.common.collect.Lists; public class PartitionerDefinedVertexManager extends VertexManagerPlugin { private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class); - private volatile boolean parallelismSet; + private boolean isParallelismSet = false; private int dynamicParallelism = -1; - private int numConfiguredSources; - private int numSources = -1; - private volatile boolean configured; - private volatile boolean started; - private volatile boolean scheduled; public PartitionerDefinedVertexManager(VertexManagerPluginContext context) { super(context); @@ -62,31 +55,7 @@ public class PartitionerDefinedVertexMan @Override public void initialize() { - // this will prevent vertex from starting until we notify we are done - getContext().vertexReconfigurationPlanned(); - parallelismSet = false; - numConfiguredSources = 0; - configured = false; - started = false; - numSources = getContext().getInputVertexEdgeProperties().size(); - // wait for sources and self to start - Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties(); - for (String entry : edges.keySet()) { - getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED)); - } - } - - @Override - public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) - throws Exception { - numConfiguredSources++; - LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: " - + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources - + " needed: " + numSources); - Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: " + getContext().getVertexName()); - if (numConfiguredSources == numSources) { - configure(); - } + // Nothing to do } @Override @@ -104,9 +73,10 @@ public class PartitionerDefinedVertexMan public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception { // There could be multiple partition vertex sending VertexManagerEvent // Only need to setVertexParallelism once - if (parallelismSet) { + if (isParallelismSet) { return; } + isParallelismSet = true; // Need to distinguish from VertexManagerEventPayloadProto emitted by OrderedPartitionedKVOutput if (vmEvent.getUserPayload().limit()==4) { dynamicParallelism = vmEvent.getUserPayload().getInt(); @@ -126,50 +96,18 @@ public class PartitionerDefinedVertexMan edgeManagers.put(entry.getKey(), edge); } getContext().reconfigureVertex(dynamicParallelism, null, edgeManagers); - parallelismSet = true; - configure(); - } - } - } - - private void configure() { - if(parallelismSet && (numSources == numConfiguredSources)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Done reconfiguring vertex " + getContext().getVertexName()); } - getContext().doneReconfiguringVertex(); - configured = true; - trySchedulingTasks(); } } - private synchronized void trySchedulingTasks() { - if (configured && started && !scheduled) { - LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " + getContext().getVertexName()); + @Override + public void onVertexStarted(Map<String, List<Integer>> completions) { + if (dynamicParallelism != -1) { List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism); - for (int i = 0; i < dynamicParallelism; ++i) { + for (int i=0; i<dynamicParallelism; ++i) { tasksToStart.add(new TaskWithLocationHint(new Integer(i), null)); } getContext().scheduleVertexTasks(tasksToStart); - scheduled = true; } } - - @Override - public void onVertexStarted(Map<String, List<Integer>> completions) { - // This vertex manager will be getting the following calls - // 1) onVertexManagerEventReceived - Parallelism vertex manager event sent by sample aggregator vertex - // 2) onVertexStateUpdated - Vertex CONFIGURED status updates from - // - Order by Partitioner vertex (1-1) in case of Order by - // - Skewed Join Left Partitioner (1-1) and Right Input Vertices in case of SkewedJoin - // 3) onVertexStarted - // Calls 2) and 3) can happen in any order. So we should schedule tasks - // only after start is called and configuration is also complete - started = true; - if (LOG.isDebugEnabled()) { - LOG.debug("Vertex start received for " + getContext().getVertexName()); - } - trySchedulingTasks(); - } - } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java Fri Feb 24 03:34:37 2017 @@ -33,15 +33,15 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter; import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; @@ -72,7 +72,7 @@ public class PigGraceShuffleVertexManage conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); bytesPerTask = conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER); - pc = (PigContext)ObjectSerializer.deserialize(conf.get(PigImplConstants.PIG_CONTEXT)); + pc = (PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext")); tezPlan = (TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan")); TezEstimatedParallelismClearer clearer = new TezEstimatedParallelismClearer(tezPlan); try { @@ -81,10 +81,9 @@ public class PigGraceShuffleVertexManage throw new TezUncheckedException(e); } TezOperator op = tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName())); - + // Collect grandparents of the vertex - Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() { - @Override + Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() { public String apply(TezOperator op) { return op.getOperatorKey().toString(); } }; grandParents = Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op), tezOpToString); @@ -136,7 +135,7 @@ public class PigGraceShuffleVertexManage // Now one of the predecessor is about to start, we need to make a decision now if (anyPredAboutToStart) { // All grandparents finished, start parents with right parallelism - + for (TezOperator pred : preds) { if (pred.getRequestedParallelism()==-1) { List<TezOperator> predPreds = tezPlan.getPredecessors(pred); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Fri Feb 24 03:34:37 2017 @@ -25,7 +25,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import org.apache.commons.logging.Log; @@ -33,7 +32,6 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.log4j.PropertyConfigurator; import org.apache.pig.JVMReuseImpl; import org.apache.pig.PigConstants; import org.apache.pig.PigException; @@ -41,7 +39,6 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; @@ -56,7 +53,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.data.SchemaTupleBackend; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; @@ -136,11 +132,7 @@ public class PigProcessor extends Abstra SpillableMemoryManager.getInstance().configure(conf); PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer .deserialize(conf.get("udf.import.list"))); - Properties log4jProperties = (Properties) ObjectSerializer - .deserialize(conf.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); - if (log4jProperties != null) { - PropertyConfigurator.configure(log4jProperties); - } + PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext")); // To determine front-end in UDFContext conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier()); @@ -159,12 +151,6 @@ public class PigProcessor extends Abstra conf.setInt(JobContext.TASK_PARTITION, taskAttemptId.getTaskID().getId()); conf.set(JobContext.ID, taskAttemptId.getJobID().toString()); - if (conf.get(PigInputFormat.PIG_INPUT_LIMITS) != null) { - // Has Load and is a root vertex - conf.setInt(JobContext.NUM_MAPS, getContext().getVertexParallelism()); - } else { - conf.setInt(JobContext.NUM_REDUCES, getContext().getVertexParallelism()); - } conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex())); UDFContext.getUDFContext().addJobConf(conf); @@ -172,7 +158,7 @@ public class PigProcessor extends Abstra String execPlanString = conf.get(PLAN); execPlan = (PhysicalPlan) ObjectSerializer.deserialize(execPlanString); - SchemaTupleBackend.initialize(conf); + SchemaTupleBackend.initialize(conf, pc); PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new org.apache.hadoop.mapreduce.JobID()); // Set the job conf as a thread-local member of PigMapReduce @@ -181,7 +167,7 @@ public class PigProcessor extends Abstra Utils.setDefaultTimeZone(conf); - boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new TezTaskContext(getContext())); pigHadoopLogger = PigHadoopLogger.getInstance(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java Fri Feb 24 03:34:37 2017 @@ -43,15 +43,6 @@ public interface TezInput { */ public void addInputsToSkip(Set<String> inputsToSkip); - /** - * Attach the inputs to the operator. Also ensure reader.next() is called to force fetch - * the input so that all inputs are fetched and memory released before memory is allocated - * for outputs - * - * @param inputs available inputs - * @param conf configuration - * @throws ExecException - */ public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java Fri Feb 24 03:34:37 2017 @@ -23,7 +23,6 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner; import org.apache.pig.data.DataBag; @@ -31,7 +30,6 @@ import org.apache.pig.data.InternalMap; import org.apache.pig.data.Tuple; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.io.PigNullableWritable; -import org.apache.pig.impl.util.UDFContext; import org.apache.tez.runtime.library.common.ConfigUtils; public class WeightedRangePartitionerTez extends WeightedRangePartitioner { @@ -66,13 +64,11 @@ public class WeightedRangePartitionerTez InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM); convertToArray(quantilesList); - long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode(); - long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); for (Entry<Object, Object> ent : weightedPartsData.entrySet()) { Tuple key = (Tuple) ent.getKey(); // sample item which repeats float[] probVec = getProbVec((Tuple) ent.getValue()); weightedParts.put(getPigNullableWritable(key), - new DiscreteProbabilitySampleGenerator(randomSeed, probVec)); + new DiscreteProbabilitySampleGenerator(probVec)); } } catch (Exception e) { throw new RuntimeException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Fri Feb 24 03:34:37 2017 @@ -50,7 +50,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.classification.InterfaceAudience; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; -import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk; @@ -103,6 +102,7 @@ public class MRToTezHelper { mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED); mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL); mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency"); + mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency"); mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms"); } @@ -165,7 +165,11 @@ public class MRToTezHelper { continue; } } - if (key.startsWith("yarn.nodemanager")) { + if (key.startsWith("dfs.datanode")) { + tezConf.unset(key); + } else if (key.startsWith("dfs.namenode")) { + tezConf.unset(key); + } else if (key.startsWith("yarn.nodemanager")) { tezConf.unset(key); } else if (key.startsWith("mapreduce.jobhistory")) { tezConf.unset(key); @@ -177,15 +181,20 @@ public class MRToTezHelper { } } - public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) { + public static TezConfiguration getDAGAMConfFromMRConf( + Configuration tezConf) { + + // Set Tez parameters based on MR parameters. + TezConfiguration dagAMConf = new TezConfiguration(tezConf); + convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap()); convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap); - String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV); - if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) { - env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV) - : env + "," + dagAMConf.get(MRJobConfig.MR_AM_ENV); + String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV); + if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) { + env = (env == null) ? tezConf.get(MRJobConfig.MR_AM_ENV) + : env + "," + tezConf.get(MRJobConfig.MR_AM_ENV); } if (env != null) { @@ -194,23 +203,24 @@ public class MRToTezHelper { dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, org.apache.tez.mapreduce.hadoop.MRHelpers - .getJavaOptsForMRAM(dagAMConf)); + .getJavaOptsForMRAM(tezConf)); - String queueName = dagAMConf.get(JobContext.QUEUE_NAME, + String queueName = tezConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME); dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName); dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS, - dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); + tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS, - dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); + tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); // Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5"); removeUnwantedSettings(dagAMConf, true); + return dagAMConf; } /** @@ -253,14 +263,6 @@ public class MRToTezHelper { JobControlCompiler.configureCompression(tezConf); convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap()); removeUnwantedSettings(tezConf, false); - - // ShuffleVertexManager Plugin settings - // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and not max - String slowStartFraction = mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART); - if (slowStartFraction != null) { - tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, slowStartFraction); - tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, slowStartFraction); - } } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Feb 24 03:34:37 2017 @@ -36,14 +36,13 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; -import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; +import org.apache.pig.builtin.RoundRobinPartitioner; import org.apache.pig.builtin.TOBAG; import org.apache.pig.data.DataType; import org.apache.pig.data.TupleFactory; @@ -199,8 +198,8 @@ public class TezCompilerUtil { public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException { try { - List<POFRJoinTez> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, POFRJoinTez.class); - for (POFRJoinTez input : inputs) { + List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class); + for (TezInput input : inputs) { if (ArrayUtils.contains(input.getTezInputs(), inputKey)) { return true; } @@ -270,7 +269,7 @@ public class TezCompilerUtil { } else if (dataMovementType == DataMovementType.SCATTER_GATHER) { edge.outputClassName = UnorderedPartitionedKVOutput.class.getName(); edge.inputClassName = UnorderedKVInput.class.getName(); - edge.partitionerClass = HashValuePartitioner.class; + edge.partitionerClass = RoundRobinPartitioner.class; } edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName()); edge.setIntermediateOutputValueClass(TUPLE_CLASS); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Feb 24 03:34:37 2017 @@ -70,7 +70,7 @@ public class MapRedUtil { private static Log log = LogFactory.getLog(MapRedUtil.class); private static final TupleFactory tf = TupleFactory.getInstance(); - public static final String FILE_SYSTEM_NAME = FileSystem.FS_DEFAULT_NAME_KEY; + public static final String FILE_SYSTEM_NAME = "fs.default.name"; /** * Loads the key distribution sampler file @@ -301,7 +301,7 @@ public class MapRedUtil { /** * Returns the total number of bytes for this file, or if a directory all * files in the directory. - * + * * @param fs FileSystem * @param status FileStatus * @param max Maximum value of total length that will trigger exit. Many Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Feb 24 03:34:37 2017 @@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.hb import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; import java.math.BigDecimal; import java.math.BigInteger; @@ -64,7 +65,6 @@ import org.apache.hadoop.hbase.mapreduce import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; -import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; @@ -86,6 +86,7 @@ import org.apache.pig.ResourceSchema.Res import org.apache.pig.StoreFuncInterface; import org.apache.pig.StoreResources; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder; import org.apache.pig.builtin.FuncUtils; import org.apache.pig.builtin.Utf8StorageConverter; @@ -596,9 +597,7 @@ public class HBaseStorage extends LoadFu new BinaryComparator(colInfo.getColumnName()))); } } - if (columnFilters.getFilters().size() != 0) { - thisColumnGroupFilter.addFilter(columnFilters); - } + thisColumnGroupFilter.addFilter(columnFilters); allColumnFilters.addFilter(thisColumnGroupFilter); } if (allColumnFilters != null) { @@ -793,35 +792,46 @@ public class HBaseStorage extends LoadFu public List<String> getShipFiles() { // Depend on HBase to do the right thing when available, as of HBASE-9165 try { - Configuration conf = new Configuration(); - TableMapReduceUtil.addHBaseDependencyJars(conf); - if (conf.get("tmpjars") != null) { - String[] tmpjars = conf.getStrings("tmpjars"); - List<String> shipFiles = new ArrayList<String>(tmpjars.length); - for (String tmpjar : tmpjars) { - shipFiles.add(new URL(tmpjar).getPath()); + Method addHBaseDependencyJars = + TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class); + if (addHBaseDependencyJars != null) { + Configuration conf = new Configuration(); + addHBaseDependencyJars.invoke(null, conf); + if (conf.get("tmpjars") != null) { + String[] tmpjars = conf.getStrings("tmpjars"); + List<String> shipFiles = new ArrayList<String>(tmpjars.length); + for (String tmpjar : tmpjars) { + shipFiles.add(new URL(tmpjar).getPath()); + } + return shipFiles; } - return shipFiles; - } - } catch (IOException e) { - if(e instanceof MalformedURLException){ - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars" - + " had malformed url. Falling back to previous logic.", e); - }else { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" - + " failed. Falling back to previous logic.", e); } + } catch (NoSuchMethodException e) { + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available." + + " Falling back to previous logic.", e); + } catch (IllegalAccessException e) { + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" + + " not permitted. Falling back to previous logic.", e); + } catch (InvocationTargetException e) { + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" + + " failed. Falling back to previous logic.", e); + } catch (MalformedURLException e) { + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars" + + " had malformed url. Falling back to previous logic.", e); } List<Class> classList = new ArrayList<Class>(); classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server + if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava + classList.add(com.google.common.collect.Lists.class); // guava + } classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper // Additional jars that are specific to v0.95.0+ addClassToList("org.cloudera.htrace.Trace", classList); // htrace addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common - addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat + addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty return FuncUtils.getShipFiles(classList); } @@ -872,13 +882,27 @@ public class HBaseStorage extends LoadFu } if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) { + // Will not be entering this block for 0.20.2 as it has no security. try { - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - if (currentUser.hasKerberosCredentials()) { - TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job); + // getCurrentUser method is not public in 0.20.2 + Method m1 = UserGroupInformation.class.getMethod("getCurrentUser"); + UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null); + // hasKerberosCredentials method not available in 0.20.2 + Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials"); + boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null); + if (hasKerberosCredentials) { + // Class and method are available only from 0.92 security release + Class tokenUtilClass = Class + .forName("org.apache.hadoop.hbase.security.token.TokenUtil"); + Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] { + Configuration.class, UserGroupInformation.class, Job.class }); + m3.invoke(null, new Object[] { hbaseConf, currentUser, job }); } else { LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available"); } + } catch (ClassNotFoundException cnfe) { + throw new RuntimeException("Failure loading TokenUtil class, " + + "is secure RPC available?", cnfe); } catch (RuntimeException re) { throw re; } catch (Exception e) { Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Fri Feb 24 03:34:37 2017 @@ -35,7 +35,6 @@ import org.apache.pig.FilterFunc; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; /** * Use a Bloom filter build previously by BuildBloom. You would first @@ -55,36 +54,14 @@ import org.apache.pig.data.TupleFactory; * C = filter B by bloom(z); * D = join C by z, A by x; * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}. - * - * You can also pass the Bloom filter from BuildBloom directly to Bloom UDF - * as a scalar instead of storing it to file and loading again. This is simpler - * if the Bloom filter will not be reused and needs to be discarded after the - * run of the script. - * - * define bb BuildBloom('jenkins', '100', '0.1'); - * A = load 'foo' as (x, y); - * B = group A all; - * C = foreach B generate bb(A.x) as bloomfilter; - * D = load 'bar' as (z); - * E = filter D by Bloom(C.bloomfilter, z); - * F = join E by z, A by x; */ public class Bloom extends FilterFunc { - private static TupleFactory mTupleFactory = TupleFactory.getInstance(); - private String bloomFile; - private BloomFilter filter = null; + public BloomFilter filter = null; - public Bloom() { - } - - /** - * The filename containing the serialized Bloom filter. If filename is null - * or the no-arg constructor is used, then the bloomfilter bytearray which - * is the output of BuildBloom should be passed as the first argument to the UDF - * - * @param filename file containing the serialized Bloom filter + /** + * @param filename file containing the serialized Bloom filter */ public Bloom(String filename) { bloomFile = filename; @@ -93,25 +70,11 @@ public class Bloom extends FilterFunc { @Override public Boolean exec(Tuple input) throws IOException { if (filter == null) { - init(input); + init(); } byte[] b; - if (bloomFile == null) { - // The first one is the bloom filter. Skip that - if (input.size() == 2) { - b = DataType.toBytes(input.get(1)); - } else { - List<Object> inputList = input.getAll(); - Tuple tuple = mTupleFactory.newTupleNoCopy(inputList.subList(1, inputList.size())); - b = DataType.toBytes(tuple, DataType.TUPLE); - } - } else { - if (input.size() == 1) { - b = DataType.toBytes(input.get(0)); - } else { - b = DataType.toBytes(input, DataType.TUPLE); - } - } + if (input.size() == 1) b = DataType.toBytes(input.get(0)); + else b = DataType.toBytes(input, DataType.TUPLE); Key k = new Key(b); return filter.membershipTest(k); @@ -119,46 +82,34 @@ public class Bloom extends FilterFunc { @Override public List<String> getCacheFiles() { - if (bloomFile != null) { - List<String> list = new ArrayList<String>(1); - // We were passed the name of the file on HDFS. Append a - // name for the file on the task node. - try { - list.add(bloomFile + "#" + getFilenameFromPath(bloomFile)); - } catch (IOException e) { - throw new RuntimeException(e); - } - return list; + List<String> list = new ArrayList<String>(1); + // We were passed the name of the file on HDFS. Append a + // name for the file on the task node. + try { + list.add(bloomFile + "#" + getFilenameFromPath(bloomFile)); + } catch (IOException e) { + throw new RuntimeException(e); } - return null; + return list; } - private void init(Tuple input) throws IOException { - if (bloomFile == null) { - if (input.get(0) instanceof DataByteArray) { - filter = BuildBloomBase.bloomIn((DataByteArray) input.get(0)); - } else { - throw new IllegalArgumentException("The first argument to the Bloom UDF should be" - + " the bloom filter if a bloom file is not specified in the constructor"); - } - } else { - filter = new BloomFilter(); - String dir = "./" + getFilenameFromPath(bloomFile); - String[] partFiles = new File(dir) - .list(new FilenameFilter() { - @Override - public boolean accept(File current, String name) { - return name.startsWith("part"); - } - }); - - String dcFile = dir + "/" + partFiles[0]; - DataInputStream dis = new DataInputStream(new FileInputStream(dcFile)); - try { - filter.readFields(dis); - } finally { - dis.close(); - } + private void init() throws IOException { + filter = new BloomFilter(); + String dir = "./" + getFilenameFromPath(bloomFile); + String[] partFiles = new File(dir) + .list(new FilenameFilter() { + @Override + public boolean accept(File current, String name) { + return name.startsWith("part"); + } + }); + + String dcFile = dir + "/" + partFiles[0]; + DataInputStream dis = new DataInputStream(new FileInputStream(dcFile)); + try { + filter.readFields(dis); + } finally { + dis.close(); } } Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java Fri Feb 24 03:34:37 2017 @@ -18,15 +18,16 @@ package org.apache.pig.builtin; +import java.io.IOException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.hash.Hash; + import org.apache.pig.EvalFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; @@ -46,7 +47,7 @@ public abstract class BuildBloomBase<T> protected BuildBloomBase() { } - /** + /** * @param hashType type of the hashing function (see * {@link org.apache.hadoop.util.hash.Hash}). * @param mode Will be ignored, though by convention it should be @@ -63,7 +64,7 @@ public abstract class BuildBloomBase<T> hType = convertHashType(hashType); } - /** + /** * @param hashType type of the hashing function (see * {@link org.apache.hadoop.util.hash.Hash}). * @param numElements The number of distinct elements expected to be @@ -103,7 +104,7 @@ public abstract class BuildBloomBase<T> return new DataByteArray(baos.toByteArray()); } - public static BloomFilter bloomIn(DataByteArray b) throws IOException { + protected BloomFilter bloomIn(DataByteArray b) throws IOException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b.get())); BloomFilter f = new BloomFilter(); Modified: pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java Fri Feb 24 03:34:37 2017 @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.shims.Hadoop23Shims; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.Counters; @@ -181,9 +180,20 @@ abstract class HiveUDFBase extends EvalF @Override public List<String> getShipFiles() { + String hadoopVersion = "20S"; + if (Utils.isHadoop23() || Utils.isHadoop2()) { + hadoopVersion = "23"; + } + Class hadoopVersionShimsClass; + try { + hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + + hadoopVersion + "Shims"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); + } List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class, - PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, - Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class}); + PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, + hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class}); return files; } Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Fri Feb 24 03:34:37 2017 @@ -56,7 +56,6 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.shims.Hadoop23Shims; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -390,8 +389,20 @@ public class OrcStorage extends LoadFunc @Override public List<String> getShipFiles() { + List<String> cacheFiles = new ArrayList<String>(); + String hadoopVersion = "20S"; + if (Utils.isHadoop23() || Utils.isHadoop2()) { + hadoopVersion = "23"; + } + Class hadoopVersionShimsClass; + try { + hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + + hadoopVersion + "Shims"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); + } Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class, - org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class, + org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass, Input.class}; return FuncUtils.getShipFiles(classList); } @@ -445,7 +456,7 @@ public class OrcStorage extends LoadFunc } private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException { - FileSystem fs = FileSystem.get(new Path(location).toUri(), job.getConfiguration()); + FileSystem fs = FileSystem.get(job.getConfiguration()); Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs)); if (path == null) { log.info("Cannot find any ORC files from " + location + Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Fri Feb 24 03:34:37 2017 @@ -68,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; @@ -170,7 +171,7 @@ LoadPushDown, LoadMetadata, StoreMetadat validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple."); validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple."); validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple."); - Option overwrite = new Option("overwrite", "Overwrites the destination."); + Option overwrite = new Option(" ", "Overwrites the destination."); overwrite.setLongOpt("overwrite"); overwrite.setOptionalArg(true); overwrite.setArgs(1); @@ -411,7 +412,7 @@ LoadPushDown, LoadMetadata, StoreMetadat @Override public InputFormat getInputFormat() { if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) - && (!bzipinput_usehadoops) ) { + && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) { mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { Modified: pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java Fri Feb 24 03:34:37 2017 @@ -17,63 +17,15 @@ */ package org.apache.pig.builtin; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Partitioner; -/** - * This partitioner should be used with extreme caution and only in cases - * where the order of output records is guaranteed to be same. If the order of - * output records can vary on retries which is mostly the case, map reruns - * due to shuffle fetch failures can lead to data being partitioned differently - * and result in incorrect output due to loss or duplication of data. - * Refer PIG-5041 for more details. - * - * This will be removed in the next release as it is risky to use in most cases. - */ -@Deprecated -public class RoundRobinPartitioner extends Partitioner<Writable, Writable> - implements Configurable { - - /** - * Batch size for round robin partitioning. Batch size number of records - * will be distributed to each partition in a round robin fashion. Default - * value is 0 which distributes each record in a circular fashion. Higher - * number for batch size can be used to increase probability of keeping - * similar records in the same partition if output is already sorted and get - * better compression. - */ - public static String PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE = "pig.round.robin.partitioner.batch.size"; - private int num = -1; - private int batchSize = 0; - private int currentBatchCount = 0; - private Configuration conf; +public class RoundRobinPartitioner extends Partitioner<Writable, Writable> { + private int num = 0; @Override public int getPartition(Writable key, Writable value, int numPartitions) { - if (batchSize > 0) { - if (currentBatchCount == 0) { - num = ++num % numPartitions; - } - if (++currentBatchCount == batchSize) { - currentBatchCount = 0; - } - } else { - num = ++num % numPartitions; - } + num = ++num % numPartitions; return num; } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - batchSize = conf.getInt(PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 0); - } - - @Override - public Configuration getConf() { - return conf; - } - } Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Fri Feb 24 03:34:37 2017 @@ -37,6 +37,7 @@ import org.apache.pig.ResourceSchema.Res import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -258,7 +259,8 @@ public class TextLoader extends LoadFunc @Override public InputFormat getInputFormat() { if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) - && !bzipinput_usehadoops ) { + && !HadoopShims.isHadoopYARN() + && !bzipinput_usehadoops ) { mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Fri Feb 24 03:34:37 2017 @@ -423,7 +423,7 @@ public abstract class DefaultAbstractBag } @SuppressWarnings("rawtypes") - protected void warn(String msg, Enum warningEnum, Throwable e) { + protected void warn(String msg, Enum warningEnum, Exception e) { pigLogger = PhysicalOperator.getPigLogger(); if(pigLogger != null) { pigLogger.warn(this, msg, warningEnum); Modified: pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java Fri Feb 24 03:34:37 2017 @@ -22,11 +22,11 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.io.FileNotFoundException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,12 +42,12 @@ import org.apache.pig.PigWarning; public class DefaultDataBag extends DefaultAbstractBag { /** - * + * */ private static final long serialVersionUID = 2L; private static final Log log = LogFactory.getLog(DefaultDataBag.class); - + private static final InterSedes SEDES = InterSedesFactory.getInterSedesInstance(); public DefaultDataBag() { @@ -70,12 +70,12 @@ public class DefaultDataBag extends Defa public boolean isSorted() { return false; } - + @Override public boolean isDistinct() { return false; } - + @Override public Iterator<Tuple> iterator() { return new DefaultDataBagIterator(); @@ -110,15 +110,12 @@ public class DefaultDataBag extends Defa if ((spilled & 0x3fff) == 0) reportProgress(); } out.flush(); - out.close(); - out = null; - mContents.clear(); - } catch (Throwable e) { + } catch (IOException ioe) { // Remove the last file from the spilled array, since we failed to // write to it. mSpillFiles.remove(mSpillFiles.size() - 1); warn( - "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e); + "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe); return 0; } finally { if (out != null) { @@ -129,6 +126,7 @@ public class DefaultDataBag extends Defa } } } + mContents.clear(); } // Increment the spill count incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); @@ -158,7 +156,7 @@ public class DefaultDataBag extends Defa } @Override - public boolean hasNext() { + public boolean hasNext() { // Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple if (hasCachedTuple) return (mBuf != null); @@ -211,7 +209,7 @@ public class DefaultDataBag extends Defa } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should never // happen. - String msg = "Unable to find our spill file."; + String msg = "Unable to find our spill file."; log.fatal(msg, fnfe); throw new RuntimeException(msg, fnfe); } @@ -225,7 +223,7 @@ public class DefaultDataBag extends Defa log.fatal(msg, eof); throw new RuntimeException(msg, eof); } catch (IOException ioe) { - String msg = "Unable to read our spill file."; + String msg = "Unable to read our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); } @@ -261,7 +259,7 @@ public class DefaultDataBag extends Defa log.warn("Failed to close spill file.", e); } } catch (IOException ioe) { - String msg = "Unable to read our spill file."; + String msg = "Unable to read our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); } Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Fri Feb 24 03:34:37 2017 @@ -67,17 +67,17 @@ public class DistinctDataBag extends Def public boolean isSorted() { return false; } - + @Override public boolean isDistinct() { return true; } - - + + @Override public long size() { if (mSpillFiles != null && mSpillFiles.size() > 0){ - //We need to racalculate size to guarantee a count of unique + //We need to racalculate size to guarantee a count of unique //entries including those on disk Iterator<Tuple> iter = iterator(); int newSize = 0; @@ -85,7 +85,7 @@ public class DistinctDataBag extends Def newSize++; iter.next(); } - + synchronized(mContents) { //we don't want adds to change our numbers //the lock may need to cover more of the method @@ -94,8 +94,8 @@ public class DistinctDataBag extends Def } return mSize; } - - + + @Override public Iterator<Tuple> iterator() { return new DistinctDataBagIterator(); @@ -155,15 +155,12 @@ public class DistinctDataBag extends Def } } out.flush(); - out.close(); - out = null; - mContents.clear(); - } catch (Throwable e) { + } catch (IOException ioe) { // Remove the last file from the spilled array, since we failed to // write to it. mSpillFiles.remove(mSpillFiles.size() - 1); warn( - "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e); + "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe); return 0; } finally { if (out != null) { @@ -174,6 +171,7 @@ public class DistinctDataBag extends Def } } } + mContents.clear(); } // Increment the spill count incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); @@ -210,7 +208,7 @@ public class DistinctDataBag extends Def @Override public int hashCode() { - return tuple.hashCode(); + return tuple.hashCode(); } } @@ -239,7 +237,7 @@ public class DistinctDataBag extends Def } @Override - public boolean hasNext() { + public boolean hasNext() { // See if we can find a tuple. If so, buffer it. mBuf = next(); return mBuf != null; @@ -297,7 +295,7 @@ public class DistinctDataBag extends Def } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should never // happen. - String msg = "Unable to find our spill file."; + String msg = "Unable to find our spill file."; log.fatal(msg, fnfe); throw new RuntimeException(msg, fnfe); } @@ -348,7 +346,7 @@ public class DistinctDataBag extends Def Iterator<File> i = mSpillFiles.iterator(); while (i.hasNext()) { try { - DataInputStream in = + DataInputStream in = new DataInputStream(new BufferedInputStream( new FileInputStream(i.next()))); mStreams.add(in); @@ -504,7 +502,7 @@ public class DistinctDataBag extends Def addToQueue(null, mStreams.size() - 1); i.remove(); filesToDelete.add(f); - + } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should // neer happen. @@ -547,7 +545,7 @@ public class DistinctDataBag extends Def log.warn("Failed to delete spill file: " + f.getPath()); } } - + // clear the list, so that finalize does not delete any files, // when mSpillFiles is assigned a new value mSpillFiles.clear(); @@ -562,6 +560,6 @@ public class DistinctDataBag extends Def } } } - + } Modified: pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java Fri Feb 24 03:34:37 2017 @@ -50,9 +50,6 @@ public class ReadOnceBag implements Data */ private static final long serialVersionUID = 2L; - public ReadOnceBag() { - } - /** * This constructor creates a bag out of an existing iterator * of tuples by taking ownership of the iterator and NOT Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Fri Feb 24 03:34:37 2017 @@ -39,7 +39,6 @@ import org.apache.pig.data.utils.Structu import org.apache.pig.data.utils.StructuresHelper.Triple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.util.Utils; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -273,20 +272,14 @@ public class SchemaTupleBackend { private static SchemaTupleBackend stb; public static void initialize(Configuration jConf, PigContext pigContext) throws IOException { - if (stb != null) { - SchemaTupleFrontend.lazyReset(pigContext); - } - initialize(jConf, pigContext.getExecType().isLocal()); + initialize(jConf, pigContext, pigContext.getExecType().isLocal()); } - public static void initialize(Configuration jConf) throws IOException { - initialize(jConf, Utils.isLocal(jConf)); - } - - public static void initialize(Configuration jConf, boolean isLocal) throws IOException { + public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException { if (stb != null) { LOG.warn("SchemaTupleBackend has already been initialized"); } else { + SchemaTupleFrontend.lazyReset(pigContext); SchemaTupleFrontend.reset(); SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal); stbInstance.copyAndResolve(); Modified: pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java Fri Feb 24 03:34:37 2017 @@ -32,7 +32,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.ListIterator; import java.util.PriorityQueue; - + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.PigCounters; @@ -44,14 +44,14 @@ import org.apache.pig.PigWarning; * stored unsorted as it comes in, and only sorted when it is time to dump * it to a file or when the first iterator is requested. Experementation * found this to be the faster than storing it sorted to begin with. - * + * * We allow a user defined comparator, but provide a default comparator in * cases where the user doesn't specify one. */ public class SortedDataBag extends DefaultAbstractBag{ /** - * + * */ private static final long serialVersionUID = 2L; @@ -76,7 +76,7 @@ public class SortedDataBag extends Defau @Override public int hashCode() { - return 42; + return 42; } } @@ -95,12 +95,12 @@ public class SortedDataBag extends Defau public boolean isSorted() { return true; } - + @Override public boolean isDistinct() { return false; } - + @Override public Iterator<Tuple> iterator() { return new SortedDataBagIterator(); @@ -145,15 +145,12 @@ public class SortedDataBag extends Defau if ((spilled & 0x3fff) == 0) reportProgress(); } out.flush(); - out.close(); - out = null; - mContents.clear(); - } catch (Throwable e) { + } catch (IOException ioe) { // Remove the last file from the spilled array, since we failed to // write to it. mSpillFiles.remove(mSpillFiles.size() - 1); warn( - "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e); + "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe); return 0; } finally { if (out != null) { @@ -164,6 +161,7 @@ public class SortedDataBag extends Defau } } } + mContents.clear(); } // Increment the spill count incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); @@ -205,7 +203,7 @@ public class SortedDataBag extends Defau @Override public int hashCode() { - return tuple.hashCode(); + return tuple.hashCode(); } } @@ -230,7 +228,7 @@ public class SortedDataBag extends Defau } @Override - public boolean hasNext() { + public boolean hasNext() { // See if we can find a tuple. If so, buffer it. mBuf = next(); return mBuf != null; @@ -343,7 +341,7 @@ public class SortedDataBag extends Defau Iterator<File> i = mSpillFiles.iterator(); while (i.hasNext()) { try { - DataInputStream in = + DataInputStream in = new DataInputStream(new BufferedInputStream( new FileInputStream(i.next()))); mStreams.add(in); @@ -353,7 +351,7 @@ public class SortedDataBag extends Defau } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should // never happen. - String msg = "Unable to find our spill file."; + String msg = "Unable to find our spill file."; log.fatal(msg, fnfe); throw new RuntimeException(msg, fnfe); } @@ -413,7 +411,7 @@ public class SortedDataBag extends Defau in.close(); }catch(IOException e) { log.warn("Failed to close spill file.", e); - } + } mStreams.set(fileNum, null); } catch (IOException ioe) { String msg = "Unable to find our spill file."; @@ -520,7 +518,7 @@ public class SortedDataBag extends Defau log.warn("Failed to delete spill file: " + f.getPath()); } } - + // clear the list, so that finalize does not delete any files, // when mSpillFiles is assigned a new value mSpillFiles.clear();