Author: szehon
Date: Thu Nov 20 02:55:46 2014
New Revision: 1640654
URL: http://svn.apache.org/r1640654
Log:
HIVE-8883 : Investigate test failures on auto_join30.q [Spark Branch] (Chao Sun
via Szehon)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1640654&r1=1640653&r2=1640654&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
Thu Nov 20 02:55:46 2014
@@ -67,7 +67,13 @@ public class HashTableLoader implements
MapJoinTableContainer[] mapJoinTables,
MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws
HiveException {
- String currentInputPath = context.getCurrentInputPath().toString();
+ // Note: it's possible that a MJ operator is in a ReduceWork, in which
case the
+ // currentInputPath will be null. But, since currentInputPath is only
interesting
+ // for bucket join case, and for bucket join the MJ operator will always
be in
+ // a MapWork, this should be OK.
+ String currentInputPath =
+ context.getCurrentInputPath() == null ? null :
context.getCurrentInputPath().toString();
+
LOG.info("******* Load from HashTable for input file: " +
currentInputPath);
MapredLocalWork localWork = context.getLocalWork();
try {
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1640654&r1=1640653&r2=1640654&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
Thu Nov 20 02:55:46 2014
@@ -34,11 +34,13 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -101,6 +103,7 @@ public class SparkReduceRecordHandler ex
private StructObjectInspector[] valueStructInspectors;
/* this is only used in the error code path */
private List<VectorExpressionWriter>[] valueStringWriters;
+ private MapredLocalWork localWork = null;
public void init(JobConf job, OutputCollector output, Reporter reporter) {
super.init(job, output, reporter);
@@ -197,8 +200,9 @@ public class SparkReduceRecordHandler ex
}
ExecMapperContext execContext = new ExecMapperContext(job);
+ localWork = gWork.getMapRedLocalWork();
execContext.setJc(jc);
- execContext.setLocalWork(gWork.getMapRedLocalWork());
+ execContext.setLocalWork(localWork);
reducer.setExecContext(execContext);
reducer.setReporter(rp);
@@ -209,6 +213,14 @@ public class SparkReduceRecordHandler ex
try {
LOG.info(reducer.dump(0));
reducer.initialize(jc, rowObjectInspector);
+
+ if (localWork != null) {
+ for (Operator<? extends OperatorDesc> dummyOp :
localWork.getDummyParentOp()) {
+ dummyOp.setExecContext(execContext);
+ dummyOp.initialize(jc, null);
+ }
+ }
+
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
@@ -218,6 +230,7 @@ public class SparkReduceRecordHandler ex
throw new RuntimeException("Reduce operator initialization failed", e);
}
}
+
}
@Override
@@ -416,6 +429,13 @@ public class SparkReduceRecordHandler ex
}
reducer.close(abort);
+
+ if (localWork != null) {
+ for (Operator<? extends OperatorDesc> dummyOp :
localWork.getDummyParentOp()) {
+ dummyOp.close(abort);
+ }
+ }
+
ReportStats rps = new ReportStats(rp, jc);
reducer.preorderMap(rps);