Author: xuefu
Date: Tue Dec 30 20:56:26 2014
New Revision: 1648598
URL: http://svn.apache.org/r1648598
Log:
HIVE-8920: IOContext problem with multiple MapWorks cloned for multi-insert
[Spark Branch]
Modified:
hive/branches/spark/itests/src/test/resources/testconfiguration.properties
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
Modified:
hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL:
http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties
(original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties
Tue Dec 30 20:56:26 2014
@@ -741,6 +741,7 @@ spark.query.files=add_part_multiple.q, \
multi_insert_mixed.q, \
multi_insert_move_tasks_share_dependencies.q, \
multi_join_union.q, \
+ multi_join_union_src.q, \
multigroupby_singlemr.q, \
optimize_nullscan.q, \
order.q, \
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
Tue Dec 30 20:56:26 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
-import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
@@ -60,8 +59,6 @@ public class SparkMapRecordHandler exten
private static final String PLAN_KEY = "__MAP_PLAN__";
private MapOperator mo;
public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
- private boolean done;
-
private MapredLocalWork localWork = null;
private boolean isLogInfoEnabled = false;
private ExecMapperContext execContext;
@@ -91,13 +88,6 @@ public class SparkMapRecordHandler exten
}
mo.setConf(mrwork);
- // If the current thread's IOContext is not initialized (because it's
reading from a
- // cached input HadoopRDD), copy from the saved result.
- IOContext ioContext = IOContext.get();
- if (ioContext.getInputPath() == null) {
- IOContext.copy(ioContext,
IOContext.getMap().get(SparkUtilities.MAP_IO_CONTEXT));
- }
-
// initialize map operator
mo.setChildren(job);
l4j.info(mo.dump(0));
@@ -211,10 +201,6 @@ public class SparkMapRecordHandler exten
} finally {
MapredContext.close();
Utilities.clearWorkMap();
-
- // It's possible that a thread get reused for different queries, so we
need to
- // reset the input path.
- IOContext.get().setInputPath(null);
}
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Tue Dec 30 20:56:26 2014
@@ -175,7 +175,8 @@ public class SparkPlanGenerator {
JavaPairRDD<WritableComparable, Writable> hadoopRDD =
sc.hadoopRDD(jobConf, ifClass,
WritableComparable.class, Writable.class);
- MapInput result = new MapInput(sparkPlan, hadoopRDD,
cloneToWork.containsKey(mapWork));
+ // Caching is disabled for MapInput due to HIVE-8920
+ MapInput result = new MapInput(sparkPlan, hadoopRDD,
false/*cloneToWork.containsKey(mapWork)*/);
return result;
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
Tue Dec 30 20:56:26 2014
@@ -36,9 +36,6 @@ import org.apache.hadoop.io.BytesWritabl
*/
public class SparkUtilities {
- // Used to save and retrieve IOContext for multi-insertion.
- public static final String MAP_IO_CONTEXT = "MAP_IO_CONTEXT";
-
public static HiveKey copyHiveKey(HiveKey key) {
HiveKey copy = new HiveKey();
copy.setDistKeyLength(key.getDistKeyLength());
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
Tue Dec 30 20:56:26 2014
@@ -27,11 +27,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.FooterBuffer;
-import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.io.IOContext.Comparison;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -173,18 +171,6 @@ public abstract class HiveContextAwareRe
ioCxtRef.isBlockPointer = isBlockPointer;
ioCxtRef.inputPath = inputPath;
LOG.info("Processing file " + inputPath);
-
- // In spark, in multi-insert an input HadoopRDD maybe be shared by multiple
- // mappers, and if we cache it, only the first thread will have its
thread-local
- // IOContext initialized, while the rest will not.
- // To solve this issue, we need to save a copy of the initialized
IOContext, so that
- // later it can be used for other threads.
- if (HiveConf.getVar(jobConf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
- IOContext iocontext = new IOContext();
- IOContext.copy(iocontext, ioCxtRef);
- IOContext.getMap().put(SparkUtilities.MAP_IO_CONTEXT, iocontext);
- }
-
initDone = true;
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
Tue Dec 30 20:56:26 2014
@@ -25,9 +25,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
-import org.apache.hadoop.hive.ql.session.SessionState;
-
/**
* IOContext basically contains the position information of the current
@@ -45,11 +42,8 @@ public class IOContext {
};
private static Map<String, IOContext> inputNameIOContextMap = new
HashMap<String, IOContext>();
- public static Map<String, IOContext> getMap() {
- return inputNameIOContextMap;
- }
- public static IOContext get() {
+ private static IOContext get() {
return IOContext.threadLocal.get();
}
@@ -109,27 +103,6 @@ public class IOContext {
this.ioExceptions = false;
}
- /**
- * Copy all fields values from orig to dest, all existing fields in dest
will be overwritten.
- *
- * @param dest the IOContext to copy to
- * @param orig the IOContext to copy from
- */
- public static void copy(IOContext dest, IOContext orig) {
- dest.currentBlockStart = orig.currentBlockStart;
- dest.nextBlockStart = orig.nextBlockStart;
- dest.currentRow = orig.currentRow;
- dest.isBlockPointer = orig.isBlockPointer;
- dest.ioExceptions = orig.ioExceptions;
- dest.useSorted = orig.useSorted;
- dest.isBinarySearching = orig.isBinarySearching;
- dest.endBinarySearch = orig.endBinarySearch;
- dest.comparison = orig.comparison;
- dest.genericUDFClassName = orig.genericUDFClassName;
- dest.ri = orig.ri;
- dest.inputPath = orig.inputPath;
- }
-
public long getCurrentBlockStart() {
return currentBlockStart;
}