Author: rohini
Date: Fri Oct 16 22:44:34 2015
New Revision: 1709122
URL: http://svn.apache.org/viewvc?rev=1709122&view=rev
Log:
PIG-4697: Pig needs to serialize only part of the udfcontext for each vertex
(rohini)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java
pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Oct 16 22:44:34 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4697: Pig needs to serialize only part of the udfcontext for each vertex
(rohini)
+
PIG-4702: Load once for sampling and partitioning in order by for certain
LoadFuncs (rohini)
PIG-4699: Print Job stats information in Tez like mapreduce (rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Fri Oct 16 22:44:34 2015
@@ -693,13 +693,13 @@ public class JobControlCompiler{
if(Utils.isLocal(pigContext, conf)) {
ConfigurationUtil.replaceConfigForLocalMode(conf);
}
- conf.set("pig.inputs", ObjectSerializer.serialize(inp));
- conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
- conf.set("pig.inpSignatures",
ObjectSerializer.serialize(inpSignatureLists));
- conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+ conf.set(PigInputFormat.PIG_INPUTS,
ObjectSerializer.serialize(inp));
+ conf.set(PigInputFormat.PIG_INPUT_TARGETS,
ObjectSerializer.serialize(inpTargets));
+ conf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
ObjectSerializer.serialize(inpSignatureLists));
+ conf.set(PigInputFormat.PIG_INPUT_LIMITS,
ObjectSerializer.serialize(inpLimits));
// Removing job credential entry before serializing pigcontext
into jobconf
- // since this path would be invalid for the new job being created
+ // since this path would be invalid for the new job being created
pigContext.getProperties().remove("mapreduce.job.credentials.binary");
conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
Fri Oct 16 22:44:34 2015
@@ -57,6 +57,9 @@ public class PigInputFormat extends Inpu
.getLog(PigInputFormat.class);
public static final String PIG_INPUTS = "pig.inputs";
+ public static final String PIG_INPUT_TARGETS = "pig.inpTargets";
+ public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures";
+ public static final String PIG_INPUT_LIMITS = "pig.inpLimits";
/**
* @deprecated Use {@link UDFContext} instead in the following way to get
@@ -109,7 +112,7 @@ public class PigInputFormat extends Inpu
List<Long> inpLimitLists =
(ArrayList<Long>)ObjectSerializer.deserialize(
- conf.get("pig.inpLimits"));
+ conf.get(PIG_INPUT_LIMITS));
return new PigRecordReader(inputFormat, pigSplit, loadFunc, context,
inpLimitLists.get(pigSplit.getInputIndex()));
}
@@ -171,7 +174,7 @@ public class PigInputFormat extends Inpu
Configuration conf) throws IOException {
List<String> inpSignatureLists =
(ArrayList<String>)ObjectSerializer.deserialize(
- conf.get("pig.inpSignatures"));
+ conf.get(PIG_INPUT_SIGNATURES));
// signature can be null for intermediate jobs where it will not
// be required to be passed down
if(inpSignatureLists.get(inputIndex) != null) {
@@ -197,9 +200,9 @@ public class PigInputFormat extends Inpu
PigContext pigContext;
try {
inputs = (ArrayList<FileSpec>) ObjectSerializer
- .deserialize(conf.get("pig.inputs"));
+ .deserialize(conf.get(PIG_INPUTS));
inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
- .deserialize(conf.get("pig.inpTargets"));
+ .deserialize(conf.get(PIG_INPUT_TARGETS));
pigContext = (PigContext) ObjectSerializer.deserialize(conf
.get("pig.pigContext"));
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Fri Oct 16 22:44:34 2015
@@ -605,6 +605,10 @@ public class POUserFunc extends Expressi
return func;
}
+ public String getSignature() {
+ return signature;
+ }
+
public void setSignature(String signature) {
this.signature = signature;
if (this.func!=null) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Fri Oct 16 22:44:34 2015
@@ -98,14 +98,17 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
+import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigInputFormatTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import
org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
@@ -114,6 +117,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.UDFContextSeparator.UDFType;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
import org.apache.tez.common.TezUtils;
@@ -169,6 +173,11 @@ public class TezDagBuilder extends TezOp
private FileSystem fs;
private long intermediateTaskInputSize;
private Set<String> inputSplitInDiskVertices;
+ private TezUDFContextSeparator udfContextSeparator;
+
+ private String serializedTezPlan;
+ private String serializedPigContext;
+ private String serializedUDFImportList;
public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
Map<String, LocalResource> localResources) {
@@ -200,6 +209,25 @@ public class TezDagBuilder extends TezOp
globalConf.getLong(
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+
+ try {
+ serializedPigContext = ObjectSerializer.serialize(pc);
+ serializedUDFImportList =
ObjectSerializer.serialize(PigContext.getPackageImportList());
+
+ udfContextSeparator = new TezUDFContextSeparator(plan,
+ new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ udfContextSeparator.visit();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getSerializedTezPlan() throws IOException {
+ if (serializedTezPlan == null) {
+ // Initialize lazy as auto parallelism might not be in play
+ serializedTezPlan = ObjectSerializer.serialize(getPlan());
+ }
+ return serializedTezPlan;
}
// Hack to turn off relocalization till TEZ-2192 is fixed.
@@ -339,8 +367,9 @@ public class TezDagBuilder extends TezOp
OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
Configuration conf =
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
- UDFContext.getUDFContext().serialize(conf);
+
if (!combinePlan.isEmpty()) {
+ udfContextSeparator.serializeUDFContextForEdge(conf, from, to,
UDFType.USERFUNC);
addCombiner(combinePlan, to, conf, isMergedInput);
}
@@ -377,9 +406,8 @@ public class TezDagBuilder extends TezOp
conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
- conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
- conf.set("udf.import.list",
- ObjectSerializer.serialize(PigContext.getPackageImportList()));
+ conf.set("pig.pigContext", serializedPigContext);
+ conf.set("udf.import.list", serializedUDFImportList);
if(to.isGlobalSort() || to.isLimitAfterSort()){
conf.set("pig.sortOrder",
@@ -407,9 +435,6 @@ public class TezDagBuilder extends TezOp
edge.partitionerClass.getName());
}
- conf.set("udf.import.list",
- ObjectSerializer.serialize(PigContext.getPackageImportList()));
-
MRToTezHelper.processMRSettings(conf, globalConf);
in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
@@ -446,11 +471,6 @@ public class TezDagBuilder extends TezOp
MRCombiner.class.getName());
conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
PigCombiner.Combine.class.getName());
- conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
- conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
- conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
- conf.set("udf.import.list",
- ObjectSerializer.serialize(PigContext.getPackageImportList()));
conf.set("pig.combinePlan", ObjectSerializer.serialize(combinePlan));
conf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
conf.set("pig.map.keytype", ObjectSerializer
@@ -473,6 +493,37 @@ public class TezDagBuilder extends TezOp
@SuppressWarnings("deprecation")
Job job = new Job(payloadConf);
payloadConf = (JobConf) job.getConfiguration();
+ payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
+ payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
+ payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
+ PigInputFormatTez.class, InputFormat.class);
+ setOutputFormat(job);
+ payloadConf.set("udf.import.list", serializedUDFImportList);
+ payloadConf.set("exectype", "TEZ");
+ MRToTezHelper.processMRSettings(payloadConf, globalConf);
+
+ // Process stores
+ LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
+
+ Configuration inputPayLoad = null;
+ Configuration outputPayLoad = null;
+
+ if (!stores.isEmpty()) {
+ outputPayLoad = new Configuration(payloadConf);
+ outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES,
+ ObjectSerializer.serialize(new ArrayList<POStore>()));
+ }
+
+ if (!(tezOp.getLoaderInfo().getLoads().isEmpty())) {
+ payloadConf.set(PigInputFormat.PIG_INPUTS,
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
+ payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
+ payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS,
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
+ inputPayLoad = new Configuration(payloadConf);
+ if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc()
instanceof DefaultIndexableLoader) {
+ inputPayLoad.set("pig.pigContext", serializedPigContext);
+ }
+ }
+ payloadConf.set("pig.pigContext", serializedPigContext);
if (tezOp.getSampleOperator() != null) {
payloadConf.set(PigProcessor.SAMPLE_VERTEX,
tezOp.getSampleOperator().getOperatorKey().toString());
@@ -494,21 +545,6 @@ public class TezDagBuilder extends TezOp
}
- payloadConf.set("pig.inputs",
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
- payloadConf.set("pig.inpSignatures",
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
- payloadConf.set("pig.inpLimits",
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
- // Process stores
- LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
-
- payloadConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
- payloadConf.set("udf.import.list",
- ObjectSerializer.serialize(PigContext.getPackageImportList()));
- payloadConf.set("exectype", "TEZ");
- payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
- payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
- payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
- PigInputFormat.class, InputFormat.class);
-
// Set parent plan for all operators in the Tez plan.
new PhyPlanSetter(tezOp.plan).visit();
@@ -582,7 +618,6 @@ public class TezDagBuilder extends TezOp
//POShuffleTezLoad accesses the comparator setting
selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
}
- setOutputFormat(job);
// set parent plan in all operators. currently the parent plan is
really
// used only when POStream, POSplit are present in the plan
@@ -592,9 +627,7 @@ public class TezDagBuilder extends TezOp
payloadConf.set(PigProcessor.PLAN,
ObjectSerializer.serialize(tezOp.plan));
- UDFContext.getUDFContext().serialize(payloadConf);
-
- MRToTezHelper.processMRSettings(payloadConf, globalConf);
+ udfContextSeparator.serializeUDFContext(payloadConf, tezOp);
if (!pc.inIllustrator) {
for (POStore store : stores) {
@@ -660,19 +693,19 @@ public class TezDagBuilder extends TezOp
}
}
if (containScatterGather && !containCustomPartitioner) {
+ vmPluginConf = (vmPluginConf == null) ?
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
// Use auto-parallelism feature of ShuffleVertexManager to
dynamically
// reduce the parallelism of the vertex
if
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
&&
!TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
vmPluginName =
PigGraceShuffleVertexManager.class.getName();
tezOp.setUseGraceParallelism(true);
+ vmPluginConf.set("pig.tez.plan",
getSerializedTezPlan());
+ vmPluginConf.set("pig.pigContext",
serializedPigContext);
} else {
vmPluginName = ShuffleVertexManager.class.getName();
}
- vmPluginConf = (vmPluginConf == null) ?
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
true);
- vmPluginConf.set("pig.tez.plan",
ObjectSerializer.serialize(getPlan()));
- vmPluginConf.set("pig.pigContext",
ObjectSerializer.serialize(pc));
if (stores.size() <= 0) {
// Intermediate reduce. Set the bytes per reducer to
be block size.
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
@@ -769,7 +802,7 @@ public class TezDagBuilder extends TezOp
MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
int splitsSerializedSize = splitsProto.getSerializedSize();
if(splitsSerializedSize > spillThreshold) {
- payloadConf.setBoolean(
+ inputPayLoad.setBoolean(
org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
false);
// Write splits to disk
@@ -794,7 +827,9 @@ public class TezDagBuilder extends TezOp
//Free up memory
tezOp.getLoaderInfo().setInputSplitInfo(null);
}
-
userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf));
+
+ udfContextSeparator.serializeUDFContext(inputPayLoad, tezOp,
UDFType.LOADFUNC);
+
userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(inputPayLoad));
vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()));
vertex.addDataSource(ld.getOperatorKey().toString(),
@@ -811,19 +846,17 @@ public class TezDagBuilder extends TezOp
Set<String> uniqueStoreOutputs = new HashSet<String>();
for (POStore store : stores) {
- ArrayList<POStore> emptyList = new ArrayList<POStore>();
ArrayList<POStore> singleStore = new ArrayList<POStore>();
singleStore.add(store);
- Configuration outputPayLoad = new Configuration(payloadConf);
- outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES,
- ObjectSerializer.serialize(emptyList));
- outputPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
+ Configuration outPayLoad = new Configuration(outputPayLoad);
+ udfContextSeparator.serializeUDFContext(outPayLoad, tezOp, store);
+ outPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
ObjectSerializer.serialize(singleStore));
OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
MROutput.class.getName()).setUserPayload(TezUtils
- .createUserPayloadFromConf(outputPayLoad));
+ .createUserPayloadFromConf(outPayLoad));
if (tezOp.getVertexGroupStores() != null) {
OperatorKey vertexGroupKey =
tezOp.getVertexGroupStores().get(store.getOperatorKey());
if (vertexGroupKey != null) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
Fri Oct 16 22:44:34 2015
@@ -134,6 +134,7 @@ public class LoaderProcessor extends Tez
// Now add the input handling operator for the Tez backend
// TODO: Move this upstream to the PhysicalPlan generation
POSimpleTezLoad tezLoad = new
POSimpleTezLoad(ld.getOperatorKey(), ld.getLFile());
+ tezLoad.setSignature(ld.getSignature());
tezLoad.setInputKey(ld.getOperatorKey().toString());
tezLoad.copyAliasFrom(ld);
tezLoad.setCacheFiles(ld.getCacheFiles());
@@ -146,10 +147,10 @@ public class LoaderProcessor extends Tez
UDFContext.getUDFContext().serialize(conf);
conf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
- conf.set("pig.inputs", ObjectSerializer.serialize(inp));
- conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
- conf.set("pig.inpSignatures",
ObjectSerializer.serialize(inpSignatureLists));
- conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+ conf.set(PigInputFormat.PIG_INPUTS,
ObjectSerializer.serialize(inp));
+ conf.set(PigInputFormat.PIG_INPUT_TARGETS,
ObjectSerializer.serialize(inpTargets));
+ conf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
ObjectSerializer.serialize(inpSignatureLists));
+ conf.set(PigInputFormat.PIG_INPUT_LIMITS,
ObjectSerializer.serialize(inpLimits));
String tmp;
long maxCombinedSplitSize = 0;
if (!tezOp.combineSmallSplits() ||
pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION,
"true").equals("false"))
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java?rev=1709122&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java
Fri Oct 16 22:44:34 2015
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import static
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez.resetUDFContextForThreadReuse;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.data.Tuple;
+
+public class PigInputFormatTez extends PigInputFormat {
+
+ @Override
+ public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ resetUDFContextForThreadReuse();
+ return super.createRecordReader(split, context);
+ }
+
+}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
Fri Oct 16 22:44:34 2015
@@ -20,21 +20,34 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
public class PigOutputFormatTez extends PigOutputFormat {
+
+ @Override
+ public RecordWriter<WritableComparable, Tuple> getRecordWriter(
+ TaskAttemptContext taskattemptcontext) throws IOException,
+ InterruptedException {
+ resetUDFContextForThreadReuse();
+ return super.getRecordWriter(taskattemptcontext);
+ }
+
@Override
public OutputCommitter getOutputCommitter(
TaskAttemptContext taskattemptcontext) throws IOException,
InterruptedException {
+ resetUDFContextForThreadReuse();
setupUdfEnvAndStores(taskattemptcontext);
// we return an instance of PigOutputCommitterTez (PIG-4202) to Hadoop
- this instance
@@ -44,6 +57,21 @@ public class PigOutputFormatTez extends
reduceStores);
}
+ public static void resetUDFContextForThreadReuse() {
+ // On the Tez AM, MROutput OutputCommitters are initialized and
setupJob
+ // called on them in a loop in the same thread.
+ // commitJob/abortJob can be called from any thread based on events
received from vertices
+
+ // On the Tez tasks, it initializes different inputs/outputs in
different Initializer threads
+ // by submitting them to a thread pool. Even though
threadpoolsize=numInputs+numOutputs
+ // a thread can be reused.
+
+ // Since deserialized UDFContext from input and output payload contains
+ // information only for that input or output reduce payload sizes, we
need to
+ // ensure it is deserialized everytime before use in a thread to get
the right one.
+ UDFContext.getUDFContext().reset();
+ }
+
public static class PigOutputCommitterTez extends PigOutputCommitter {
public PigOutputCommitterTez(TaskAttemptContext context,
@@ -54,39 +82,35 @@ public class PigOutputFormatTez extends
@Override
public void setupJob(JobContext context) throws IOException {
- cleanupForContainerReuse();
+ resetUDFContextForThreadReuse();
try {
super.setupJob(context);
} finally {
- cleanupForContainerReuse();
+ resetUDFContextForThreadReuse();
}
}
@Override
public void commitJob(JobContext context) throws IOException {
- cleanupForContainerReuse();
+ resetUDFContextForThreadReuse();
try {
super.commitJob(context);
} finally {
- cleanupForContainerReuse();
+ resetUDFContextForThreadReuse();
}
}
@Override
public void abortJob(JobContext context, State state)
throws IOException {
- cleanupForContainerReuse();
+ resetUDFContextForThreadReuse();
try {
super.abortJob(context, state);
} finally {
- cleanupForContainerReuse();
+ resetUDFContextForThreadReuse();
}
}
- private void cleanupForContainerReuse() {
- UDFContext.getUDFContext().reset();
- }
-
}
}
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java?rev=1709122&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java
Fri Oct 16 22:44:34 2015
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContextSeparator;
+import org.apache.pig.impl.util.UDFContextSeparator.UDFType;
+
+public class TezUDFContextSeparator extends TezOpPlanVisitor{
+
+ private UDFContextSeparator udfContextSeparator;
+
+ public TezUDFContextSeparator(TezOperPlan plan,
+ PlanWalker<TezOperator, TezOperPlan> walker) {
+ super(plan, walker);
+ udfContextSeparator = new UDFContextSeparator();
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+ if (!tezOperator.isVertexGroup()) {
+ udfContextSeparator.setPlan(tezOperator.plan,
tezOperator.getOperatorKey().toString());
+ udfContextSeparator.visit();
+
+ for (Entry<OperatorKey, TezEdgeDescriptor> entry :
tezOperator.outEdges.entrySet()) {
+ PhysicalPlan combinePlan = entry.getValue().combinePlan;
+ if (!combinePlan.isEmpty()) {
+ udfContextSeparator.setPlan(combinePlan,
+ tezOperator.getOperatorKey().toString() + "-" +
entry.getKey().toString());
+ udfContextSeparator.visit();
+ }
+ }
+ }
+ }
+
+ public void serializeUDFContext(Configuration conf, TezOperator tezOp)
throws IOException {
+ // Serialize all - LoadFunc, StoreFunc, UserFunc
+ udfContextSeparator.serializeUDFContext(conf,
tezOp.getOperatorKey().toString(), UDFType.values());
+ }
+
+ public void serializeUDFContext(Configuration conf, TezOperator tezOp,
+ UDFType udfType) throws IOException {
+ udfContextSeparator.serializeUDFContext(conf,
tezOp.getOperatorKey().toString(), udfType);
+ }
+
+ public void serializeUDFContextForEdge(Configuration conf,
+ TezOperator from, TezOperator to, UDFType udfType) throws
IOException {
+ udfContextSeparator.serializeUDFContext(conf,
+ from.getOperatorKey().toString() + "-" +
to.getOperatorKey().toString(), udfType);
+ }
+
+ public void serializeUDFContext(Configuration conf, TezOperator tezOp,
+ POStore store) throws IOException {
+ udfContextSeparator.serializeUDFContext(conf,
tezOp.getOperatorKey().toString(), store);
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Fri Oct 16 22:44:34
2015
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -33,8 +35,9 @@ public class UDFContext {
private Configuration jconf = null;
private HashMap<UDFContextKey, Properties> udfConfs;
private Properties clientSysProps;
- private static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
- private static final String UDF_CONTEXT = "pig.udf.context";
+
+ static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
+ static final String UDF_CONTEXT = "pig.udf.context";
private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>()
{
@Override
@@ -81,6 +84,14 @@ public class UDFContext {
/*
* internal pig use only - should NOT be called from user code
*/
+ HashMap<UDFContextKey, Properties> getUdfConfs() {
+ return udfConfs;
+ }
+
+
+ /*
+ * internal pig use only - should NOT be called from user code
+ */
public void setClientSystemProps(Properties properties) {
clientSysProps = properties;
}
@@ -197,10 +208,19 @@ public class UDFContext {
* @throws IOException if underlying serialization throws it
*/
public void serialize(Configuration conf) throws IOException {
+ // Minor optimziation. Remove empty properties before serialization.
+ Iterator<Entry<UDFContextKey, Properties>> iter =
udfConfs.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<UDFContextKey, Properties> entry = iter.next();
+ if (entry.getValue().isEmpty()) {
+ iter.remove();
+ }
+ }
conf.set(UDF_CONTEXT, ObjectSerializer.serialize(udfConfs));
conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
}
+
/**
* Populate the udfConfs field. This function is intended to
* be called by Map.configure or Reduce.configure on the backend.
@@ -255,23 +275,31 @@ public class UDFContext {
* it holds the class and args of the udf, and
* implements equals() and hashCode()
*/
- private static class UDFContextKey implements Serializable{
+ static class UDFContextKey implements Serializable{
private static final long serialVersionUID = 1;
private String className;
private String[] args;
- UDFContextKey(){
- }
-
UDFContextKey(String className, String [] args){
this.className = className;
this.args = args;
}
- /* (non-Javadoc)
- * @see java.lang.Object#hashCode()
- */
+ String getClassName() {
+ return className;
+ }
+
+ String[] getArgs() {
+ return args;
+ }
+
+ @Override
+ public String toString() {
+ return "UDFContextKey [className=" + className + ", args="
+ + Arrays.toString(args) + "]";
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -282,9 +310,6 @@ public class UDFContext {
return result;
}
- /* (non-Javadoc)
- * @see java.lang.Object#equals(java.lang.Object)
- */
@Override
public boolean equals(Object obj) {
if (this == obj)
Added: pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java?rev=1709122&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java Fri Oct 16
22:44:34 2015
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.Algebraic;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext.UDFContextKey;
+
+public class UDFContextSeparator extends PhyPlanVisitor {
+
+ public static enum UDFType {
+ LOADFUNC,
+ STOREFUNC,
+ USERFUNC,
+ };
+
+ private String planOpKey;
+ private DepthFirstWalker<PhysicalOperator, PhysicalPlan> dfw;
+ private Map<String, Map<Enum<UDFType>, List<UDFContext.UDFContextKey>>>
udfContextsPerPlan;
+ private UDFContext udfContext;
+ private Set<UDFContext.UDFContextKey> allKeys;
+ private Set<UDFContext.UDFContextKey> knownKeys;
+ private Set<UDFContext.UDFContextKey> unKnownKeys;
+ private Set<UDFContext.UDFContextKey> algebraicUDFKeys;
+
+ public UDFContextSeparator(){
+ super(null, null);
+ dfw = new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(null);
+ udfContext = UDFContext.getUDFContext();
+ allKeys = udfContext.getUdfConfs().keySet();
+ knownKeys = new HashSet<UDFContext.UDFContextKey>();
+ algebraicUDFKeys = udfContext.getUdfConfs().keySet();
+ udfContextsPerPlan = new HashMap<String, Map<Enum<UDFType>,
List<UDFContext.UDFContextKey>>>();
+ }
+
+ public Set<UDFContext.UDFContextKey> getUnKnownKeys() {
+ if (unKnownKeys == null) {
+ unKnownKeys = new HashSet<UDFContext.UDFContextKey>(allKeys);
+ unKnownKeys.removeAll(knownKeys);
+ for (Entry<UDFContextKey, Properties> entry :
udfContext.getUdfConfs().entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ // Remove empty values
+ unKnownKeys.remove(entry.getKey());
+ }
+ }
+ }
+ return unKnownKeys;
+ }
+
+ public void setPlan(PhysicalPlan plan, String planOpKey){
+ mPlan = plan;
+ dfw.setPlan(plan);
+ mCurrentWalker = dfw;
+ this.planOpKey = planOpKey;
+ this.udfContextsPerPlan.put(planOpKey, new HashMap<Enum<UDFType>,
List<UDFContextKey>>());
+ }
+
+ @Override
+ public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+ if (userFunc.getFunc() instanceof Algebraic) {
+ for (UDFContext.UDFContextKey key : allKeys) {
+ if
(key.getClassName().equals(userFunc.getFunc().getClass().getName())) {
+ // If Algebraic handle differently. To be on the safer side
+ // as user might be just accessing properties by base
class name
+ // instead of by Initial, Intermediate and Final classes
+ algebraicUDFKeys.add(key);
+ }
+ }
+ } else {
+ findAndAddKeys(userFunc.getFunc().getClass().getName(),
+ userFunc.getSignature(), UDFType.USERFUNC);
+ }
+ }
+
+ @Override
+ public void visitLoad(POLoad ld) throws VisitorException {
+ findAndAddKeys(ld.getLoadFunc().getClass().getName(),
+ ld.getSignature(), UDFType.LOADFUNC);
+ }
+
+
+ @Override
+ public void visitStore(POStore st) throws VisitorException {
+ findAndAddKeys(st.getStoreFunc().getClass().getName(),
+ st.getSignature(), UDFType.STOREFUNC);
+ }
+
+ @Override
+ public void visitCast(POCast op) {
+ if (op.getFuncSpec() != null) {
+ findAndAddKeys(op.getFuncSpec().getClass().getName(),
+ null, UDFType.USERFUNC);
+ }
+ }
+
+ private void findAndAddKeys(String keyClassName, String signature, UDFType
udfType) {
+ for (UDFContext.UDFContextKey key : allKeys) {
+ if (key.getClassName().equals(keyClassName)
+ && (key.getArgs() == null
+ || signature == null
+ || Arrays.asList(key.getArgs()).contains(signature))) {
+ Map<Enum<UDFType>, List<UDFContextKey>> udfKeysByType =
udfContextsPerPlan
+ .get(planOpKey);
+ List<UDFContextKey> keyList = udfContextsPerPlan.get(planOpKey)
+ .get(udfType);
+ if (keyList == null) {
+ keyList = new ArrayList<UDFContext.UDFContextKey>();
+ udfKeysByType.put(udfType, keyList);
+ }
+ keyList.add(key);
+ knownKeys.add(key);
+ }
+ }
+ }
+
+ public void serializeUDFContext(Configuration conf, String planOpKey,
+ UDFType... udfTypes) throws IOException {
+ Map<UDFContextKey, Properties> udfConfs = udfContext.getUdfConfs();
+ HashMap<UDFContextKey, Properties> udfConfsToSerialize = new
HashMap<UDFContextKey, Properties>();
+ Map<Enum<UDFType>, List<UDFContextKey>> udfKeysByType =
udfContextsPerPlan.get(planOpKey);
+ if (udfKeysByType != null) {
+ for (UDFType udfType : udfTypes) {
+ List<UDFContextKey> keyList =
udfContextsPerPlan.get(planOpKey).get(udfType);
+ if (keyList != null) {
+ for (UDFContextKey key : keyList) {
+ udfConfsToSerialize.put(key, udfConfs.get(key));
+ }
+ }
+ if (udfType.equals(UDFType.USERFUNC)) {
+ for (UDFContextKey key : algebraicUDFKeys) {
+ udfConfsToSerialize.put(key, udfConfs.get(key));
+ }
+ }
+ }
+ }
+ serialize(conf, udfConfsToSerialize);
+ }
+
+ public void serializeUDFContext(Configuration conf, String planOpKey,
+ POStore store) throws IOException {
+ Map<UDFContextKey, Properties> udfConfs = udfContext.getUdfConfs();
+ HashMap<UDFContextKey, Properties> udfConfsToSerialize = new
HashMap<UDFContextKey, Properties>();
+ // Find keys specific to just this StoreFunc
+ Map<Enum<UDFType>, List<UDFContextKey>> udfKeysByType =
udfContextsPerPlan.get(planOpKey);
+ if (udfKeysByType != null) {
+ List<UDFContextKey> keyList =
udfContextsPerPlan.get(planOpKey).get(
+ UDFType.STOREFUNC);
+ if (keyList != null) {
+ String keyClassName =
store.getStoreFunc().getClass().getName();
+ String signature = store.getSignature();
+ for (UDFContextKey key : keyList) {
+ if (key.getClassName().equals(keyClassName)
+ && (key.getArgs() == null
+ ||
Arrays.asList(key.getArgs()).contains(signature))) {
+ udfConfsToSerialize.put(key, udfConfs.get(key));
+ }
+ }
+ }
+ }
+ serialize(conf, udfConfsToSerialize);
+ }
+
+ private void serialize(Configuration conf,
+ HashMap<UDFContextKey, Properties> udfConfsToSerialize)
+ throws IOException {
+ HashMap<UDFContextKey, Properties> udfConfs = udfContext.getUdfConfs();
+ // Add unknown ones for serialization
+ for (UDFContextKey key : getUnKnownKeys()) {
+ udfConfsToSerialize.put(key, udfConfs.get(key));
+ }
+ conf.set(UDFContext.UDF_CONTEXT,
ObjectSerializer.serialize(udfConfsToSerialize));
+ conf.set(UDFContext.CLIENT_SYS_PROPS,
ObjectSerializer.serialize(udfContext.getClientSystemProps()));
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Fri Oct
16 22:44:34 2015
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapred.Counters;
import org.apache.pig.PigCounters;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -134,7 +135,7 @@ public class TezVertexStats extends JobS
this.stores = (List<POStore>) ObjectSerializer.deserialize(
conf.get(JobControlCompiler.PIG_REDUCE_STORES));
this.loads = (List<FileSpec>) ObjectSerializer.deserialize(
- conf.get("pig.inputs"));
+ conf.get(PigInputFormat.PIG_INPUTS));
} catch (IOException e) {
LOG.warn("Failed to deserialize the store list", e);
}