Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Feb 24 08:19:42 2017 @@ -62,7 +62,6 @@ import org.apache.tez.dag.api.EdgeProper */ public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator { - static private int maxTaskCount; static final double DEFAULT_FLATTEN_FACTOR = 10; static final double DEFAULT_FILTER_FACTOR = 0.7; static final double DEFAULT_LIMIT_FACTOR = 0.1; @@ -76,6 +75,8 @@ public class TezOperDependencyParallelis static final double DEFAULT_AGGREGATION_FACTOR = 0.7; private PigContext pc; + private int maxTaskCount; + private long bytesPerReducer; @Override public void setPigContext(PigContext pc) { @@ -94,16 +95,18 @@ public class TezOperDependencyParallelis maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM, PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM); - // If parallelism is set explicitly, respect it - if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) { - return tezOper.getRequestedParallelism(); - } + bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER); // If we have already estimated parallelism, use that one - if (tezOper.getEstimatedParallelism()!=-1) { + if (tezOper.getEstimatedParallelism() != -1) { return tezOper.getEstimatedParallelism(); } + // If parallelism is set explicitly, respect it + if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) { + return tezOper.getRequestedParallelism(); + } + List<TezOperator> preds = plan.getPredecessors(tezOper); if (preds==null) { throw new IOException("Cannot estimate parallelism for source vertex"); @@ -130,6 +133,12 @@ public class TezOperDependencyParallelis boolean applyFactor = !tezOper.isUnion(); if (!pred.isVertexGroup() && applyFactor) { predParallelism = predParallelism * pred.getParallelismFactor(tezOper); + if (pred.getTotalInputFilesSize() > 0) { + // Estimate similar to mapreduce and use the maximum of two + int parallelismBySize = (int) Math.ceil((double) pred + .getTotalInputFilesSize() / bytesPerReducer); + predParallelism = Math.max(predParallelism, parallelismBySize); + } } estimatedParallelism += predParallelism; } @@ -157,9 +166,7 @@ public class TezOperDependencyParallelis } if (roundedEstimatedParallelism == 0) { - throw new IOException("Estimated parallelism for " - + tezOper.getOperatorKey().toString() - + " is 0 which is unexpected"); + roundedEstimatedParallelism = 1; // We need to produce empty output file } return roundedEstimatedParallelism; @@ -196,7 +203,7 @@ public class TezOperDependencyParallelis if (successor != null) { // Map side combiner TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey()); - if (!edge.combinePlan.isEmpty()) { + if (!edge.combinePlan.isEmpty() || edge.needsDistinctCombiner()) { if (successor.isDistinct()) { factor = DEFAULT_DISTINCT_FACTOR; } else {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Fri Feb 24 08:19:42 2017 @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.PigConfiguration; +import org.apache.pig.StoreFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; @@ -44,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; @@ -52,7 +54,6 @@ import org.apache.pig.builtin.AvroStorag import org.apache.pig.builtin.JsonStorage; import org.apache.pig.builtin.OrcStorage; import org.apache.pig.builtin.PigStorage; -import org.apache.pig.builtin.RoundRobinPartitioner; import org.apache.pig.builtin.mock.Storage; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; @@ -108,6 +109,12 @@ public class UnionOptimizer extends TezO if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) { return false; } + + // If user has specified a PARALLEL clause with the union operator + // turn off union optimization + if (tezOp.getRequestedParallelism() != -1) { + return false; + } // Two vertices separately ranking with 1 to n and writing to output directly // will make each rank repeate twice which is wrong. Rank always needs to be // done from single vertex to have the counting correct. @@ -120,10 +127,25 @@ public class UnionOptimizer extends TezO public static boolean isOptimizableStoreFunc(TezOperator tezOp, List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs) throws VisitorException { - if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) { - List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class); - for (POStoreTez store : stores) { - String name = store.getStoreFunc().getClass().getName(); + List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class); + + for (POStoreTez store : stores) { + String name = store.getStoreFunc().getClass().getName(); + if (store.getStoreFunc() instanceof StoreFunc) { + StoreFunc func = (StoreFunc) store.getStoreFunc(); + if (func.supportsParallelWriteToStoreLocation() != null) { + if (func.supportsParallelWriteToStoreLocation()) { + continue; + } else { + LOG.warn(name + " does not support union optimization." + + " Disabling it. There will be some performance degradation."); + return false; + } + } + } + // If StoreFunc does not explicitly state support, then check supported and + // unsupported config settings. + if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) { if (unsupportedStoreFuncs != null && unsupportedStoreFuncs.contains(name)) { return false; @@ -237,8 +259,23 @@ public class UnionOptimizer extends TezO for (TezOperator succ : successors) { if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) { existingVertexGroup = succ; + break; + } + } + } + if (existingVertexGroup == null) { + // In the case of union + split + union + store, the different stores in the Split + // will be writing to same location after second union operator is optimized. + // So while optimizing the first union, we should just make it write to one vertex group + for (int j = 0; j < i; j++) { + if (unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile())) { + storeVertexGroupOps[i] = storeVertexGroupOps[j]; + break; } } + if (storeVertexGroupOps[i] != null) { + continue; + } } if (existingVertexGroup != null) { storeVertexGroupOps[i] = existingVertexGroup; @@ -270,6 +307,15 @@ public class UnionOptimizer extends TezO TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()]; String[] newOutputKeys = new String[unionOutputKeys.size()]; for (int i=0; i < outputVertexGroupOps.length; i++) { + for (int j = 0; j < i; j++) { + if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) { + outputVertexGroupOps[i] = outputVertexGroupOps[j]; + break; + } + } + if (outputVertexGroupOps[i] != null) { + continue; + } outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope)); outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo()); outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i)); @@ -515,15 +561,24 @@ public class UnionOptimizer extends TezO // Connect predecessor to the storeVertexGroups int i = 0; for (TezOperator storeVertexGroup : storeVertexGroupOps) { + // Skip connecting if they are already connected. Can happen in case of + // union + split + union + store. Because of the split all the stores + // will be writing to same location + List<OperatorKey> inputs = storeVertexGroup.getVertexGroupInfo().getInputs(); + if (inputs == null || !inputs.contains(pred.getOperatorKey())) { + tezPlan.connect(pred, storeVertexGroup); + } storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(), storeVertexGroup.getOperatorKey()); - tezPlan.connect(pred, storeVertexGroup); } for (TezOperator outputVertexGroup : outputVertexGroupOps) { + List<OperatorKey> inputs = outputVertexGroup.getVertexGroupInfo().getInputs(); + if (inputs == null || !inputs.contains(pred.getOperatorKey())) { + tezPlan.connect(pred, outputVertexGroup); + } outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); - tezPlan.connect(pred, outputVertexGroup); } copyOperatorProperties(pred, unionOp); @@ -568,7 +623,7 @@ public class UnionOptimizer extends TezO // more union predecessors. Change it to SCATTER_GATHER if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) { edge.dataMovementType = DataMovementType.SCATTER_GATHER; - edge.partitionerClass = RoundRobinPartitioner.class; + edge.partitionerClass = HashValuePartitioner.class; edge.outputClassName = UnorderedPartitionedKVOutput.class.getName(); edge.inputClassName = UnorderedKVInput.class.getName(); } Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez.runtime; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableTuple; + +public class HashValuePartitioner extends Partitioner<Writable, Writable> { + + @SuppressWarnings("rawtypes") + @Override + public int getPartition(Writable key, Writable value, int numPartitions) { + int hash = 17; + Tuple tuple; + if (value instanceof Tuple) { + // union optimizer turned off + tuple = (Tuple) value; + } else { + // union followed by order by or skewed join + tuple = (Tuple)((NullableTuple) value).getValueAsPigType(); + } + if (tuple != null) { + for (Object o : tuple.getAll()) { + if (o != null) { + // Skip computing hashcode for bags. + // Order of elements in the map/bag may be different on each run + // Can't even include size as some DataBag implementations + // iterate through all elements in the bag to get the size. + if (o instanceof DataBag) { + hash = 31 * hash; + } else { + hash = 31 * hash + o.hashCode(); + } + } + } + } + return (hash & Integer.MAX_VALUE) % numPartitions; + } + +} \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java Fri Feb 24 08:19:42 2017 @@ -17,23 +17,25 @@ */ package org.apache.pig.backend.hadoop.executionengine.tez.runtime; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; -import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; /** @@ -46,8 +48,13 @@ import com.google.common.collect.Lists; public class PartitionerDefinedVertexManager extends VertexManagerPlugin { private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class); - private boolean isParallelismSet = false; + private volatile boolean parallelismSet; private int dynamicParallelism = -1; + private int numConfiguredSources; + private int numSources = -1; + private volatile boolean configured; + private volatile boolean started; + private volatile boolean scheduled; public PartitionerDefinedVertexManager(VertexManagerPluginContext context) { super(context); @@ -55,7 +62,31 @@ public class PartitionerDefinedVertexMan @Override public void initialize() { - // Nothing to do + // this will prevent vertex from starting until we notify we are done + getContext().vertexReconfigurationPlanned(); + parallelismSet = false; + numConfiguredSources = 0; + configured = false; + started = false; + numSources = getContext().getInputVertexEdgeProperties().size(); + // wait for sources and self to start + Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties(); + for (String entry : edges.keySet()) { + getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED)); + } + } + + @Override + public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) + throws Exception { + numConfiguredSources++; + LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: " + + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources + + " needed: " + numSources); + Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: " + getContext().getVertexName()); + if (numConfiguredSources == numSources) { + configure(); + } } @Override @@ -73,10 +104,9 @@ public class PartitionerDefinedVertexMan public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception { // There could be multiple partition vertex sending VertexManagerEvent // Only need to setVertexParallelism once - if (isParallelismSet) { + if (parallelismSet) { return; } - isParallelismSet = true; // Need to distinguish from VertexManagerEventPayloadProto emitted by OrderedPartitionedKVOutput if (vmEvent.getUserPayload().limit()==4) { dynamicParallelism = vmEvent.getUserPayload().getInt(); @@ -96,18 +126,50 @@ public class PartitionerDefinedVertexMan edgeManagers.put(entry.getKey(), edge); } getContext().reconfigureVertex(dynamicParallelism, null, edgeManagers); + parallelismSet = true; + configure(); } } } - @Override - public void onVertexStarted(Map<String, List<Integer>> completions) { - if (dynamicParallelism != -1) { + private void configure() { + if(parallelismSet && (numSources == numConfiguredSources)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Done reconfiguring vertex " + getContext().getVertexName()); + } + getContext().doneReconfiguringVertex(); + configured = true; + trySchedulingTasks(); + } + } + + private synchronized void trySchedulingTasks() { + if (configured && started && !scheduled) { + LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " + getContext().getVertexName()); List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism); - for (int i=0; i<dynamicParallelism; ++i) { + for (int i = 0; i < dynamicParallelism; ++i) { tasksToStart.add(new TaskWithLocationHint(new Integer(i), null)); } getContext().scheduleVertexTasks(tasksToStart); + scheduled = true; } } + + @Override + public void onVertexStarted(Map<String, List<Integer>> completions) { + // This vertex manager will be getting the following calls + // 1) onVertexManagerEventReceived - Parallelism vertex manager event sent by sample aggregator vertex + // 2) onVertexStateUpdated - Vertex CONFIGURED status updates from + // - Order by Partitioner vertex (1-1) in case of Order by + // - Skewed Join Left Partitioner (1-1) and Right Input Vertices in case of SkewedJoin + // 3) onVertexStarted + // Calls 2) and 3) can happen in any order. So we should schedule tasks + // only after start is called and configuration is also complete + started = true; + if (LOG.isDebugEnabled()) { + LOG.debug("Vertex start received for " + getContext().getVertexName()); + } + trySchedulingTasks(); + } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java Fri Feb 24 08:19:42 2017 @@ -33,15 +33,15 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter; import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; @@ -72,7 +72,7 @@ public class PigGraceShuffleVertexManage conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); bytesPerTask = conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER); - pc = (PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext")); + pc = (PigContext)ObjectSerializer.deserialize(conf.get(PigImplConstants.PIG_CONTEXT)); tezPlan = (TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan")); TezEstimatedParallelismClearer clearer = new TezEstimatedParallelismClearer(tezPlan); try { @@ -81,9 +81,10 @@ public class PigGraceShuffleVertexManage throw new TezUncheckedException(e); } TezOperator op = tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName())); - + // Collect grandparents of the vertex - Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() { + Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() { + @Override public String apply(TezOperator op) { return op.getOperatorKey().toString(); } }; grandParents = Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op), tezOpToString); @@ -135,7 +136,7 @@ public class PigGraceShuffleVertexManage // Now one of the predecessor is about to start, we need to make a decision now if (anyPredAboutToStart) { // All grandparents finished, start parents with right parallelism - + for (TezOperator pred : preds) { if (pred.getRequestedParallelism()==-1) { List<TezOperator> predPreds = tezPlan.getPredecessors(pred); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Fri Feb 24 08:19:42 2017 @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; import org.apache.commons.logging.Log; @@ -32,6 +33,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.log4j.PropertyConfigurator; import org.apache.pig.JVMReuseImpl; import org.apache.pig.PigConstants; import org.apache.pig.PigException; @@ -39,6 +41,7 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; @@ -53,6 +56,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.data.SchemaTupleBackend; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; @@ -132,7 +136,11 @@ public class PigProcessor extends Abstra SpillableMemoryManager.getInstance().configure(conf); PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer .deserialize(conf.get("udf.import.list"))); - PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext")); + Properties log4jProperties = (Properties) ObjectSerializer + .deserialize(conf.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); + if (log4jProperties != null) { + PropertyConfigurator.configure(log4jProperties); + } // To determine front-end in UDFContext conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier()); @@ -151,6 +159,12 @@ public class PigProcessor extends Abstra conf.setInt(JobContext.TASK_PARTITION, taskAttemptId.getTaskID().getId()); conf.set(JobContext.ID, taskAttemptId.getJobID().toString()); + if (conf.get(PigInputFormat.PIG_INPUT_LIMITS) != null) { + // Has Load and is a root vertex + conf.setInt(JobContext.NUM_MAPS, getContext().getVertexParallelism()); + } else { + conf.setInt(JobContext.NUM_REDUCES, getContext().getVertexParallelism()); + } conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex())); UDFContext.getUDFContext().addJobConf(conf); @@ -158,7 +172,7 @@ public class PigProcessor extends Abstra String execPlanString = conf.get(PLAN); execPlan = (PhysicalPlan) ObjectSerializer.deserialize(execPlanString); - SchemaTupleBackend.initialize(conf, pc); + SchemaTupleBackend.initialize(conf); PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new org.apache.hadoop.mapreduce.JobID()); // Set the job conf as a thread-local member of PigMapReduce @@ -167,7 +181,7 @@ public class PigProcessor extends Abstra Utils.setDefaultTimeZone(conf); - boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new TezTaskContext(getContext())); pigHadoopLogger = PigHadoopLogger.getInstance(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java Fri Feb 24 08:19:42 2017 @@ -43,6 +43,15 @@ public interface TezInput { */ public void addInputsToSkip(Set<String> inputsToSkip); + /** + * Attach the inputs to the operator. Also ensure reader.next() is called to force fetch + * the input so that all inputs are fetched and memory released before memory is allocated + * for outputs + * + * @param inputs available inputs + * @param conf configuration + * @throws ExecException + */ public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java Fri Feb 24 08:19:42 2017 @@ -23,6 +23,7 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner; import org.apache.pig.data.DataBag; @@ -30,6 +31,7 @@ import org.apache.pig.data.InternalMap; import org.apache.pig.data.Tuple; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.UDFContext; import org.apache.tez.runtime.library.common.ConfigUtils; public class WeightedRangePartitionerTez extends WeightedRangePartitioner { @@ -64,11 +66,13 @@ public class WeightedRangePartitionerTez InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM); convertToArray(quantilesList); + long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode(); + long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); for (Entry<Object, Object> ent : weightedPartsData.entrySet()) { Tuple key = (Tuple) ent.getKey(); // sample item which repeats float[] probVec = getProbVec((Tuple) ent.getValue()); weightedParts.put(getPigNullableWritable(key), - new DiscreteProbabilitySampleGenerator(probVec)); + new DiscreteProbabilitySampleGenerator(randomSeed, probVec)); } } catch (Exception e) { throw new RuntimeException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Fri Feb 24 08:19:42 2017 @@ -50,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.classification.InterfaceAudience; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk; @@ -102,7 +103,6 @@ public class MRToTezHelper { mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED); mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL); mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency"); - mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency"); mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms"); } @@ -165,11 +165,7 @@ public class MRToTezHelper { continue; } } - if (key.startsWith("dfs.datanode")) { - tezConf.unset(key); - } else if (key.startsWith("dfs.namenode")) { - tezConf.unset(key); - } else if (key.startsWith("yarn.nodemanager")) { + if (key.startsWith("yarn.nodemanager")) { tezConf.unset(key); } else if (key.startsWith("mapreduce.jobhistory")) { tezConf.unset(key); @@ -181,20 +177,15 @@ public class MRToTezHelper { } } - public static TezConfiguration getDAGAMConfFromMRConf( - Configuration tezConf) { - - // Set Tez parameters based on MR parameters. - TezConfiguration dagAMConf = new TezConfiguration(tezConf); - + public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) { convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap()); convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap); - String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV); - if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) { - env = (env == null) ? tezConf.get(MRJobConfig.MR_AM_ENV) - : env + "," + tezConf.get(MRJobConfig.MR_AM_ENV); + String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV); + if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) { + env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV) + : env + "," + dagAMConf.get(MRJobConfig.MR_AM_ENV); } if (env != null) { @@ -203,24 +194,23 @@ public class MRToTezHelper { dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, org.apache.tez.mapreduce.hadoop.MRHelpers - .getJavaOptsForMRAM(tezConf)); + .getJavaOptsForMRAM(dagAMConf)); - String queueName = tezConf.get(JobContext.QUEUE_NAME, + String queueName = dagAMConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME); dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName); dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS, - tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); + dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS, - tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); + dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); // Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5"); removeUnwantedSettings(dagAMConf, true); - return dagAMConf; } /** @@ -263,6 +253,14 @@ public class MRToTezHelper { JobControlCompiler.configureCompression(tezConf); convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap()); removeUnwantedSettings(tezConf, false); + + // ShuffleVertexManager Plugin settings + // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and not max + String slowStartFraction = mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART); + if (slowStartFraction != null) { + tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, slowStartFraction); + tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, slowStartFraction); + } } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Feb 24 08:19:42 2017 @@ -36,13 +36,14 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; -import org.apache.pig.builtin.RoundRobinPartitioner; import org.apache.pig.builtin.TOBAG; import org.apache.pig.data.DataType; import org.apache.pig.data.TupleFactory; @@ -198,8 +199,8 @@ public class TezCompilerUtil { public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException { try { - List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class); - for (TezInput input : inputs) { + List<POFRJoinTez> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, POFRJoinTez.class); + for (POFRJoinTez input : inputs) { if (ArrayUtils.contains(input.getTezInputs(), inputKey)) { return true; } @@ -269,7 +270,7 @@ public class TezCompilerUtil { } else if (dataMovementType == DataMovementType.SCATTER_GATHER) { edge.outputClassName = UnorderedPartitionedKVOutput.class.getName(); edge.inputClassName = UnorderedKVInput.class.getName(); - edge.partitionerClass = RoundRobinPartitioner.class; + edge.partitionerClass = HashValuePartitioner.class; } edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName()); edge.setIntermediateOutputValueClass(TUPLE_CLASS); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Feb 24 08:19:42 2017 @@ -70,7 +70,7 @@ public class MapRedUtil { private static Log log = LogFactory.getLog(MapRedUtil.class); private static final TupleFactory tf = TupleFactory.getInstance(); - public static final String FILE_SYSTEM_NAME = "fs.default.name"; + public static final String FILE_SYSTEM_NAME = FileSystem.FS_DEFAULT_NAME_KEY; /** * Loads the key distribution sampler file @@ -301,7 +301,7 @@ public class MapRedUtil { /** * Returns the total number of bytes for this file, or if a directory all * files in the directory. - * + * * @param fs FileSystem * @param status FileStatus * @param max Maximum value of total length that will trigger exit. Many Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Feb 24 08:19:42 2017 @@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.hb import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; import java.math.BigDecimal; import java.math.BigInteger; @@ -65,6 +64,7 @@ import org.apache.hadoop.hbase.mapreduce import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; @@ -86,7 +86,6 @@ import org.apache.pig.ResourceSchema.Res import org.apache.pig.StoreFuncInterface; import org.apache.pig.StoreResources; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder; import org.apache.pig.builtin.FuncUtils; import org.apache.pig.builtin.Utf8StorageConverter; @@ -597,7 +596,9 @@ public class HBaseStorage extends LoadFu new BinaryComparator(colInfo.getColumnName()))); } } - thisColumnGroupFilter.addFilter(columnFilters); + if (columnFilters.getFilters().size() != 0) { + thisColumnGroupFilter.addFilter(columnFilters); + } allColumnFilters.addFilter(thisColumnGroupFilter); } if (allColumnFilters != null) { @@ -792,46 +793,35 @@ public class HBaseStorage extends LoadFu public List<String> getShipFiles() { // Depend on HBase to do the right thing when available, as of HBASE-9165 try { - Method addHBaseDependencyJars = - TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class); - if (addHBaseDependencyJars != null) { - Configuration conf = new Configuration(); - addHBaseDependencyJars.invoke(null, conf); - if (conf.get("tmpjars") != null) { - String[] tmpjars = conf.getStrings("tmpjars"); - List<String> shipFiles = new ArrayList<String>(tmpjars.length); - for (String tmpjar : tmpjars) { - shipFiles.add(new URL(tmpjar).getPath()); - } - return shipFiles; + Configuration conf = new Configuration(); + TableMapReduceUtil.addHBaseDependencyJars(conf); + if (conf.get("tmpjars") != null) { + String[] tmpjars = conf.getStrings("tmpjars"); + List<String> shipFiles = new ArrayList<String>(tmpjars.length); + for (String tmpjar : tmpjars) { + shipFiles.add(new URL(tmpjar).getPath()); } + return shipFiles; + } + } catch (IOException e) { + if(e instanceof MalformedURLException){ + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars" + + " had malformed url. Falling back to previous logic.", e); + }else { + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" + + " failed. Falling back to previous logic.", e); } - } catch (NoSuchMethodException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available." - + " Falling back to previous logic.", e); - } catch (IllegalAccessException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" - + " not permitted. Falling back to previous logic.", e); - } catch (InvocationTargetException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" - + " failed. Falling back to previous logic.", e); - } catch (MalformedURLException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars" - + " had malformed url. Falling back to previous logic.", e); } List<Class> classList = new ArrayList<Class>(); classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server - if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava - classList.add(com.google.common.collect.Lists.class); // guava - } classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper // Additional jars that are specific to v0.95.0+ addClassToList("org.cloudera.htrace.Trace", classList); // htrace addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common - addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar + addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty return FuncUtils.getShipFiles(classList); } @@ -882,27 +872,13 @@ public class HBaseStorage extends LoadFu } if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) { - // Will not be entering this block for 0.20.2 as it has no security. try { - // getCurrentUser method is not public in 0.20.2 - Method m1 = UserGroupInformation.class.getMethod("getCurrentUser"); - UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null); - // hasKerberosCredentials method not available in 0.20.2 - Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials"); - boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null); - if (hasKerberosCredentials) { - // Class and method are available only from 0.92 security release - Class tokenUtilClass = Class - .forName("org.apache.hadoop.hbase.security.token.TokenUtil"); - Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] { - Configuration.class, UserGroupInformation.class, Job.class }); - m3.invoke(null, new Object[] { hbaseConf, currentUser, job }); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + if (currentUser.hasKerberosCredentials()) { + TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job); } else { LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available"); } - } catch (ClassNotFoundException cnfe) { - throw new RuntimeException("Failure loading TokenUtil class, " - + "is secure RPC available?", cnfe); } catch (RuntimeException re) { throw re; } catch (Exception e) { Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Fri Feb 24 08:19:42 2017 @@ -35,6 +35,7 @@ import org.apache.pig.FilterFunc; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; /** * Use a Bloom filter build previously by BuildBloom. You would first @@ -54,14 +55,36 @@ import org.apache.pig.data.Tuple; * C = filter B by bloom(z); * D = join C by z, A by x; * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}. + * + * You can also pass the Bloom filter from BuildBloom directly to Bloom UDF + * as a scalar instead of storing it to file and loading again. This is simpler + * if the Bloom filter will not be reused and needs to be discarded after the + * run of the script. + * + * define bb BuildBloom('jenkins', '100', '0.1'); + * A = load 'foo' as (x, y); + * B = group A all; + * C = foreach B generate bb(A.x) as bloomfilter; + * D = load 'bar' as (z); + * E = filter D by Bloom(C.bloomfilter, z); + * F = join E by z, A by x; */ public class Bloom extends FilterFunc { + private static TupleFactory mTupleFactory = TupleFactory.getInstance(); + private String bloomFile; - public BloomFilter filter = null; + private BloomFilter filter = null; - /** - * @param filename file containing the serialized Bloom filter + public Bloom() { + } + + /** + * The filename containing the serialized Bloom filter. If filename is null + * or the no-arg constructor is used, then the bloomfilter bytearray which + * is the output of BuildBloom should be passed as the first argument to the UDF + * + * @param filename file containing the serialized Bloom filter */ public Bloom(String filename) { bloomFile = filename; @@ -70,11 +93,25 @@ public class Bloom extends FilterFunc { @Override public Boolean exec(Tuple input) throws IOException { if (filter == null) { - init(); + init(input); } byte[] b; - if (input.size() == 1) b = DataType.toBytes(input.get(0)); - else b = DataType.toBytes(input, DataType.TUPLE); + if (bloomFile == null) { + // The first one is the bloom filter. Skip that + if (input.size() == 2) { + b = DataType.toBytes(input.get(1)); + } else { + List<Object> inputList = input.getAll(); + Tuple tuple = mTupleFactory.newTupleNoCopy(inputList.subList(1, inputList.size())); + b = DataType.toBytes(tuple, DataType.TUPLE); + } + } else { + if (input.size() == 1) { + b = DataType.toBytes(input.get(0)); + } else { + b = DataType.toBytes(input, DataType.TUPLE); + } + } Key k = new Key(b); return filter.membershipTest(k); @@ -82,34 +119,46 @@ public class Bloom extends FilterFunc { @Override public List<String> getCacheFiles() { - List<String> list = new ArrayList<String>(1); - // We were passed the name of the file on HDFS. Append a - // name for the file on the task node. - try { - list.add(bloomFile + "#" + getFilenameFromPath(bloomFile)); - } catch (IOException e) { - throw new RuntimeException(e); + if (bloomFile != null) { + List<String> list = new ArrayList<String>(1); + // We were passed the name of the file on HDFS. Append a + // name for the file on the task node. + try { + list.add(bloomFile + "#" + getFilenameFromPath(bloomFile)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return list; } - return list; + return null; } - private void init() throws IOException { - filter = new BloomFilter(); - String dir = "./" + getFilenameFromPath(bloomFile); - String[] partFiles = new File(dir) - .list(new FilenameFilter() { - @Override - public boolean accept(File current, String name) { - return name.startsWith("part"); - } - }); - - String dcFile = dir + "/" + partFiles[0]; - DataInputStream dis = new DataInputStream(new FileInputStream(dcFile)); - try { - filter.readFields(dis); - } finally { - dis.close(); + private void init(Tuple input) throws IOException { + if (bloomFile == null) { + if (input.get(0) instanceof DataByteArray) { + filter = BuildBloomBase.bloomIn((DataByteArray) input.get(0)); + } else { + throw new IllegalArgumentException("The first argument to the Bloom UDF should be" + + " the bloom filter if a bloom file is not specified in the constructor"); + } + } else { + filter = new BloomFilter(); + String dir = "./" + getFilenameFromPath(bloomFile); + String[] partFiles = new File(dir) + .list(new FilenameFilter() { + @Override + public boolean accept(File current, String name) { + return name.startsWith("part"); + } + }); + + String dcFile = dir + "/" + partFiles[0]; + DataInputStream dis = new DataInputStream(new FileInputStream(dcFile)); + try { + filter.readFields(dis); + } finally { + dis.close(); + } } } Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java Fri Feb 24 08:19:42 2017 @@ -18,16 +18,15 @@ package org.apache.pig.builtin; -import java.io.IOException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.hash.Hash; - import org.apache.pig.EvalFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; @@ -47,7 +46,7 @@ public abstract class BuildBloomBase<T> protected BuildBloomBase() { } - /** + /** * @param hashType type of the hashing function (see * {@link org.apache.hadoop.util.hash.Hash}). * @param mode Will be ignored, though by convention it should be @@ -64,7 +63,7 @@ public abstract class BuildBloomBase<T> hType = convertHashType(hashType); } - /** + /** * @param hashType type of the hashing function (see * {@link org.apache.hadoop.util.hash.Hash}). * @param numElements The number of distinct elements expected to be @@ -104,7 +103,7 @@ public abstract class BuildBloomBase<T> return new DataByteArray(baos.toByteArray()); } - protected BloomFilter bloomIn(DataByteArray b) throws IOException { + public static BloomFilter bloomIn(DataByteArray b) throws IOException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b.get())); BloomFilter f = new BloomFilter(); Modified: pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java Fri Feb 24 08:19:42 2017 @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.shims.Hadoop23Shims; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.Counters; @@ -180,20 +181,9 @@ abstract class HiveUDFBase extends EvalF @Override public List<String> getShipFiles() { - String hadoopVersion = "20S"; - if (Utils.isHadoop23() || Utils.isHadoop2()) { - hadoopVersion = "23"; - } - Class hadoopVersionShimsClass; - try { - hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + - hadoopVersion + "Shims"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); - } List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class, - PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, - hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class}); + PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, + Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class}); return files; } Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Fri Feb 24 08:19:42 2017 @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.Hadoop23Shims; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -389,20 +390,8 @@ public class OrcStorage extends LoadFunc @Override public List<String> getShipFiles() { - List<String> cacheFiles = new ArrayList<String>(); - String hadoopVersion = "20S"; - if (Utils.isHadoop23() || Utils.isHadoop2()) { - hadoopVersion = "23"; - } - Class hadoopVersionShimsClass; - try { - hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + - hadoopVersion + "Shims"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); - } Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class, - org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass, + org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class, Input.class}; return FuncUtils.getShipFiles(classList); } @@ -456,7 +445,7 @@ public class OrcStorage extends LoadFunc } private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException { - FileSystem fs = FileSystem.get(job.getConfiguration()); + FileSystem fs = FileSystem.get(new Path(location).toUri(), job.getConfiguration()); Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs)); if (path == null) { log.info("Cannot find any ORC files from " + location + Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Fri Feb 24 08:19:42 2017 @@ -68,7 +68,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; @@ -171,7 +170,7 @@ LoadPushDown, LoadMetadata, StoreMetadat validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple."); validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple."); validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple."); - Option overwrite = new Option(" ", "Overwrites the destination."); + Option overwrite = new Option("overwrite", "Overwrites the destination."); overwrite.setLongOpt("overwrite"); overwrite.setOptionalArg(true); overwrite.setArgs(1); @@ -412,7 +411,7 @@ LoadPushDown, LoadMetadata, StoreMetadat @Override public InputFormat getInputFormat() { if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) - && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) { + && (!bzipinput_usehadoops) ) { mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { Modified: pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java Fri Feb 24 08:19:42 2017 @@ -17,15 +17,63 @@ */ package org.apache.pig.builtin; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Partitioner; -public class RoundRobinPartitioner extends Partitioner<Writable, Writable> { - private int num = 0; +/** + * This partitioner should be used with extreme caution and only in cases + * where the order of output records is guaranteed to be same. If the order of + * output records can vary on retries which is mostly the case, map reruns + * due to shuffle fetch failures can lead to data being partitioned differently + * and result in incorrect output due to loss or duplication of data. + * Refer PIG-5041 for more details. + * + * This will be removed in the next release as it is risky to use in most cases. + */ +@Deprecated +public class RoundRobinPartitioner extends Partitioner<Writable, Writable> + implements Configurable { + + /** + * Batch size for round robin partitioning. Batch size number of records + * will be distributed to each partition in a round robin fashion. Default + * value is 0 which distributes each record in a circular fashion. Higher + * number for batch size can be used to increase probability of keeping + * similar records in the same partition if output is already sorted and get + * better compression. + */ + public static String PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE = "pig.round.robin.partitioner.batch.size"; + private int num = -1; + private int batchSize = 0; + private int currentBatchCount = 0; + private Configuration conf; @Override public int getPartition(Writable key, Writable value, int numPartitions) { - num = ++num % numPartitions; + if (batchSize > 0) { + if (currentBatchCount == 0) { + num = ++num % numPartitions; + } + if (++currentBatchCount == batchSize) { + currentBatchCount = 0; + } + } else { + num = ++num % numPartitions; + } return num; } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + batchSize = conf.getInt(PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 0); + } + + @Override + public Configuration getConf() { + return conf; + } + } Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Fri Feb 24 08:19:42 2017 @@ -37,7 +37,6 @@ import org.apache.pig.ResourceSchema.Res import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -259,8 +258,7 @@ public class TextLoader extends LoadFunc @Override public InputFormat getInputFormat() { if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) - && !HadoopShims.isHadoopYARN() - && !bzipinput_usehadoops ) { + && !bzipinput_usehadoops ) { mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Fri Feb 24 08:19:42 2017 @@ -423,7 +423,7 @@ public abstract class DefaultAbstractBag } @SuppressWarnings("rawtypes") - protected void warn(String msg, Enum warningEnum, Exception e) { + protected void warn(String msg, Enum warningEnum, Throwable e) { pigLogger = PhysicalOperator.getPigLogger(); if(pigLogger != null) { pigLogger.warn(this, msg, warningEnum); Modified: pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java Fri Feb 24 08:19:42 2017 @@ -22,11 +22,11 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.io.FileNotFoundException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,12 +42,12 @@ import org.apache.pig.PigWarning; public class DefaultDataBag extends DefaultAbstractBag { /** - * + * */ private static final long serialVersionUID = 2L; private static final Log log = LogFactory.getLog(DefaultDataBag.class); - + private static final InterSedes SEDES = InterSedesFactory.getInterSedesInstance(); public DefaultDataBag() { @@ -70,12 +70,12 @@ public class DefaultDataBag extends Defa public boolean isSorted() { return false; } - + @Override public boolean isDistinct() { return false; } - + @Override public Iterator<Tuple> iterator() { return new DefaultDataBagIterator(); @@ -110,12 +110,15 @@ public class DefaultDataBag extends Defa if ((spilled & 0x3fff) == 0) reportProgress(); } out.flush(); - } catch (IOException ioe) { + out.close(); + out = null; + mContents.clear(); + } catch (Throwable e) { // Remove the last file from the spilled array, since we failed to // write to it. mSpillFiles.remove(mSpillFiles.size() - 1); warn( - "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe); + "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e); return 0; } finally { if (out != null) { @@ -126,7 +129,6 @@ public class DefaultDataBag extends Defa } } } - mContents.clear(); } // Increment the spill count incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); @@ -156,7 +158,7 @@ public class DefaultDataBag extends Defa } @Override - public boolean hasNext() { + public boolean hasNext() { // Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple if (hasCachedTuple) return (mBuf != null); @@ -209,7 +211,7 @@ public class DefaultDataBag extends Defa } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should never // happen. - String msg = "Unable to find our spill file."; + String msg = "Unable to find our spill file."; log.fatal(msg, fnfe); throw new RuntimeException(msg, fnfe); } @@ -223,7 +225,7 @@ public class DefaultDataBag extends Defa log.fatal(msg, eof); throw new RuntimeException(msg, eof); } catch (IOException ioe) { - String msg = "Unable to read our spill file."; + String msg = "Unable to read our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); } @@ -259,7 +261,7 @@ public class DefaultDataBag extends Defa log.warn("Failed to close spill file.", e); } } catch (IOException ioe) { - String msg = "Unable to read our spill file."; + String msg = "Unable to read our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); } Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Fri Feb 24 08:19:42 2017 @@ -67,17 +67,17 @@ public class DistinctDataBag extends Def public boolean isSorted() { return false; } - + @Override public boolean isDistinct() { return true; } - - + + @Override public long size() { if (mSpillFiles != null && mSpillFiles.size() > 0){ - //We need to racalculate size to guarantee a count of unique + //We need to racalculate size to guarantee a count of unique //entries including those on disk Iterator<Tuple> iter = iterator(); int newSize = 0; @@ -85,7 +85,7 @@ public class DistinctDataBag extends Def newSize++; iter.next(); } - + synchronized(mContents) { //we don't want adds to change our numbers //the lock may need to cover more of the method @@ -94,8 +94,8 @@ public class DistinctDataBag extends Def } return mSize; } - - + + @Override public Iterator<Tuple> iterator() { return new DistinctDataBagIterator(); @@ -155,12 +155,15 @@ public class DistinctDataBag extends Def } } out.flush(); - } catch (IOException ioe) { + out.close(); + out = null; + mContents.clear(); + } catch (Throwable e) { // Remove the last file from the spilled array, since we failed to // write to it. mSpillFiles.remove(mSpillFiles.size() - 1); warn( - "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe); + "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e); return 0; } finally { if (out != null) { @@ -171,7 +174,6 @@ public class DistinctDataBag extends Def } } } - mContents.clear(); } // Increment the spill count incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); @@ -208,7 +210,7 @@ public class DistinctDataBag extends Def @Override public int hashCode() { - return tuple.hashCode(); + return tuple.hashCode(); } } @@ -237,7 +239,7 @@ public class DistinctDataBag extends Def } @Override - public boolean hasNext() { + public boolean hasNext() { // See if we can find a tuple. If so, buffer it. mBuf = next(); return mBuf != null; @@ -295,7 +297,7 @@ public class DistinctDataBag extends Def } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should never // happen. - String msg = "Unable to find our spill file."; + String msg = "Unable to find our spill file."; log.fatal(msg, fnfe); throw new RuntimeException(msg, fnfe); } @@ -346,7 +348,7 @@ public class DistinctDataBag extends Def Iterator<File> i = mSpillFiles.iterator(); while (i.hasNext()) { try { - DataInputStream in = + DataInputStream in = new DataInputStream(new BufferedInputStream( new FileInputStream(i.next()))); mStreams.add(in); @@ -502,7 +504,7 @@ public class DistinctDataBag extends Def addToQueue(null, mStreams.size() - 1); i.remove(); filesToDelete.add(f); - + } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should // neer happen. @@ -545,7 +547,7 @@ public class DistinctDataBag extends Def log.warn("Failed to delete spill file: " + f.getPath()); } } - + // clear the list, so that finalize does not delete any files, // when mSpillFiles is assigned a new value mSpillFiles.clear(); @@ -560,6 +562,6 @@ public class DistinctDataBag extends Def } } } - + } Modified: pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java Fri Feb 24 08:19:42 2017 @@ -50,6 +50,9 @@ public class ReadOnceBag implements Data */ private static final long serialVersionUID = 2L; + public ReadOnceBag() { + } + /** * This constructor creates a bag out of an existing iterator * of tuples by taking ownership of the iterator and NOT Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Fri Feb 24 08:19:42 2017 @@ -39,6 +39,7 @@ import org.apache.pig.data.utils.Structu import org.apache.pig.data.utils.StructuresHelper.Triple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.Utils; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -272,14 +273,20 @@ public class SchemaTupleBackend { private static SchemaTupleBackend stb; public static void initialize(Configuration jConf, PigContext pigContext) throws IOException { - initialize(jConf, pigContext, pigContext.getExecType().isLocal()); + if (stb != null) { + SchemaTupleFrontend.lazyReset(pigContext); + } + initialize(jConf, pigContext.getExecType().isLocal()); } - public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException { + public static void initialize(Configuration jConf) throws IOException { + initialize(jConf, Utils.isLocal(jConf)); + } + + public static void initialize(Configuration jConf, boolean isLocal) throws IOException { if (stb != null) { LOG.warn("SchemaTupleBackend has already been initialized"); } else { - SchemaTupleFrontend.lazyReset(pigContext); SchemaTupleFrontend.reset(); SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal); stbInstance.copyAndResolve();
