Repository: hive
Updated Branches:
  refs/heads/llap a35f1adb7 -> cb24a7678


HIVE-10896 : LLAP: the return of the stuck DAG (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb24a767
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb24a767
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb24a767

Branch: refs/heads/llap
Commit: cb24a76783ac3522ce415e51656208f2e24ee11b
Parents: a35f1ad
Author: Sergey Shelukhin <[email protected]>
Authored: Wed Jun 3 11:33:13 2015 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Wed Jun 3 11:33:13 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/AbstractFileMergeOperator.java |  5 +-
 .../hive/ql/exec/AbstractMapJoinOperator.java   |  6 +-
 .../hive/ql/exec/AppMasterEventOperator.java    |  5 +-
 .../hadoop/hive/ql/exec/CollectOperator.java    |  5 +-
 .../hadoop/hive/ql/exec/CommonJoinOperator.java |  5 +-
 .../hive/ql/exec/CommonMergeJoinOperator.java   |  5 +-
 .../hadoop/hive/ql/exec/DemuxOperator.java      |  6 +-
 .../hadoop/hive/ql/exec/DummyStoreOperator.java |  5 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  5 +-
 .../hadoop/hive/ql/exec/FilterOperator.java     |  5 +-
 .../hadoop/hive/ql/exec/ForwardOperator.java    |  4 +-
 .../hadoop/hive/ql/exec/GroupByOperator.java    |  5 +-
 .../hive/ql/exec/HashTableDummyOperator.java    |  5 +-
 .../hive/ql/exec/HashTableSinkOperator.java     |  6 +-
 .../hadoop/hive/ql/exec/JoinOperator.java       |  5 +-
 .../ql/exec/LateralViewForwardOperator.java     |  4 +-
 .../hive/ql/exec/LateralViewJoinOperator.java   |  6 +-
 .../hadoop/hive/ql/exec/LimitOperator.java      |  5 +-
 .../hadoop/hive/ql/exec/ListSinkOperator.java   |  5 +-
 .../hadoop/hive/ql/exec/MapJoinOperator.java    | 10 +---
 .../apache/hadoop/hive/ql/exec/MapOperator.java |  4 +-
 .../apache/hadoop/hive/ql/exec/MuxOperator.java |  5 +-
 .../apache/hadoop/hive/ql/exec/Operator.java    | 62 +++++++++++++-------
 .../apache/hadoop/hive/ql/exec/PTFOperator.java |  5 +-
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java |  5 +-
 .../hadoop/hive/ql/exec/SMBMapJoinOperator.java |  5 +-
 .../hadoop/hive/ql/exec/ScriptOperator.java     |  5 +-
 .../hadoop/hive/ql/exec/SelectOperator.java     |  7 +--
 .../ql/exec/SparkHashTableSinkOperator.java     |  5 +-
 .../hadoop/hive/ql/exec/TableScanOperator.java  | 13 ++--
 .../hadoop/hive/ql/exec/UDTFOperator.java       |  5 +-
 .../hadoop/hive/ql/exec/UnionOperator.java      |  5 +-
 .../vector/VectorAppMasterEventOperator.java    |  8 +--
 .../ql/exec/vector/VectorFileSinkOperator.java  |  9 +--
 .../ql/exec/vector/VectorFilterOperator.java    |  5 +-
 .../ql/exec/vector/VectorGroupByOperator.java   |  6 +-
 .../exec/vector/VectorMapJoinBaseOperator.java  |  7 +--
 .../ql/exec/vector/VectorMapJoinOperator.java   |  6 +-
 .../VectorMapJoinOuterFilteredOperator.java     |  6 +-
 .../exec/vector/VectorReduceSinkOperator.java   |  7 +--
 .../exec/vector/VectorSMBMapJoinOperator.java   |  5 +-
 .../ql/exec/vector/VectorSelectOperator.java    |  7 +--
 .../mapjoin/VectorMapJoinCommonOperator.java    |  6 +-
 .../vector/util/FakeCaptureOutputOperator.java  |  5 +-
 .../util/FakeVectorDataSourceOperator.java      |  4 +-
 45 files changed, 135 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index 1e70602..f1c32b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -65,8 +65,8 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
   protected transient DynamicPartitionCtx dpCtx;
 
   @Override
-  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  public void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     this.jc = new JobConf(hconf);
     incompatFileSet = new HashSet<Path>();
     autoDelete = false;
@@ -94,7 +94,6 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
       throw new HiveException("Failed to initialize AbstractFileMergeOperator",
           e);
     }
-    return result;
   }
 
   // sets up temp and task temp path

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
index f948861..7302688 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
@@ -59,7 +59,7 @@ public abstract class AbstractMapJoinOperator <T extends 
MapJoinDesc> extends Co
 
   @Override
   @SuppressWarnings("unchecked")
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+  protected void initializeOp(Configuration hconf) throws HiveException {
     if (conf.getGenJoinKeys()) {
       int tagLen = conf.getTagLength();
       joinKeys = new List[tagLen];
@@ -68,7 +68,7 @@ public abstract class AbstractMapJoinOperator <T extends 
MapJoinDesc> extends Co
           inputObjInspectors,NOTSKIPBIGTABLE, tagLen);
     }
 
-    Collection<Future<?>> result = super.initializeOp(hconf);
+    super.initializeOp(hconf);
 
     numMapRowsRead = 0;
 
@@ -82,8 +82,6 @@ public abstract class AbstractMapJoinOperator <T extends 
MapJoinDesc> extends Co
         posBigTable, joinCacheSize,spillTableDesc, conf,
         !hasFilter(posBigTable), reporter);
     storage[posBigTable] = bigPosRC;
-
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
index dd2be03..7114177 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
@@ -52,14 +52,13 @@ public class AppMasterEventOperator extends 
Operator<AppMasterEventDesc> {
   protected transient long MAX_SIZE;
 
   @Override
-  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  public void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
 
     MAX_SIZE = HiveConf.getLongVar(hconf, 
ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_MAX_EVENT_SIZE);
     serializer =
         (Serializer) 
ReflectionUtils.newInstance(conf.getTable().getDeserializerClass(), null);
     initDataBuffer(false);
-    return result;
   }
 
   protected void initDataBuffer(boolean skipPruning) throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
index a7fbfe7..e2f4f58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
@@ -43,11 +43,10 @@ public class CollectOperator extends Operator<CollectDesc> 
implements
   transient int maxSize;
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     rowList = new ArrayList<Object>();
     maxSize = conf.getBufferSize().intValue();
-    return result;
   }
 
   boolean firstRow = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
index 55c5079..bcb9fce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
@@ -187,8 +187,8 @@ public abstract class CommonJoinOperator<T extends 
JoinDesc> extends
 
   @Override
   @SuppressWarnings("unchecked")
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     this.handleSkewJoin = conf.getHandleSkewJoin();
     this.hconf = hconf;
 
@@ -322,7 +322,6 @@ public abstract class CommonJoinOperator<T extends 
JoinDesc> extends
     if (isLogInfoEnabled) {
       LOG.info("JOIN " + outputObjInspector.getTypeName() + " totalsz = " + 
totalSz);
     }
-    return result;
   }
 
   transient boolean newGroupStarted = false;

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index d1d5e2b..6c57db3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -95,8 +95,8 @@ public class CommonMergeJoinOperator extends 
AbstractMapJoinOperator<CommonMerge
 
   @SuppressWarnings("unchecked")
   @Override
-  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  public void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     firstFetchHappened = false;
     fetchInputAtClose = getFetchInputAtCloseList();
 
@@ -148,7 +148,6 @@ public class CommonMergeJoinOperator extends 
AbstractMapJoinOperator<CommonMerge
     }
 
     sources = ((TezContext) MapredContext.get()).getRecordSources();
-    return result;
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
index b7b054a..41389bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
@@ -110,8 +110,8 @@ public class DemuxOperator extends Operator<DemuxDesc>
   private int[][] newChildOperatorsTag;
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     // A DemuxOperator should have at least one child
     if (childOperatorsArray.length == 0) {
       throw new HiveException(
@@ -183,7 +183,7 @@ public class DemuxOperator extends Operator<DemuxDesc>
     if (isLogInfoEnabled) {
       LOG.info("newChildOperatorsTag " + 
Arrays.toString(newChildOperatorsTag));
     }
-    return result;
+
   }
 
   private int[] toArray(List<Integer> list) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
index 6a68059..0c12570 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
@@ -74,8 +74,8 @@ public class DummyStoreOperator extends 
Operator<DummyStoreDesc> implements Seri
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> ret = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     /*
      * The conversion to standard object inspector was necessitated by 
HIVE-5973. The issue
      * happens when a select operator preceeds this operator as in the case of 
a subquery. The
@@ -90,7 +90,6 @@ public class DummyStoreOperator extends 
Operator<DummyStoreDesc> implements Seri
      */
     outputObjInspector = 
ObjectInspectorUtils.getStandardObjectInspector(inputObjInspectors[0]);
     result = new InspectableObject(null, outputObjInspector);
-    return ret;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 553113e..0499b70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -325,8 +325,8 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     try {
       this.hconf = hconf;
       filesCreated = false;
@@ -445,7 +445,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       e.printStackTrace();
       throw new HiveException(e);
     }
-    return result;
   }
 
   private void logOutputFormatError(Configuration hconf, HiveException ex) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
index 65301c0..ed78593 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
@@ -50,8 +50,8 @@ public class FilterOperator extends Operator<FilterDesc> 
implements
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
@@ -65,7 +65,6 @@ public class FilterOperator extends Operator<FilterDesc> 
implements
     } catch (Throwable e) {
       throw new HiveException(e);
     }
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
index 6cd8c80..7a4c58a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
@@ -62,7 +62,7 @@ public class ForwardOperator extends Operator<ForwardDesc> 
implements
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    return super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 9867739..1693ec3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -180,8 +180,8 @@ public class GroupByOperator extends Operator<GroupByDesc> {
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     numRowsInput = 0;
     numRowsHashTbl = 0;
 
@@ -393,7 +393,6 @@ public class GroupByOperator extends Operator<GroupByDesc> {
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
     memoryThreshold = this.getConf().getMemoryThreshold();
-    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
index 2829a9d..1de8c76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
@@ -33,8 +33,8 @@ public class HashTableDummyOperator extends 
Operator<HashTableDummyDesc> impleme
   private static final long serialVersionUID = 1L;
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     TableDesc tbl = this.getConf().getTbl();
     try {
       Deserializer serde = tbl.getDeserializerClass().newInstance();
@@ -44,7 +44,6 @@ public class HashTableDummyOperator extends 
Operator<HashTableDummyDesc> impleme
       LOG.error("Generating output obj inspector from dummy object error", e);
       e.printStackTrace();
     }
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
index 96283cd..ad93623 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
@@ -116,8 +116,8 @@ public class HashTableSinkOperator extends 
TerminalOperator<HashTableSinkDesc> i
 
   @Override
   @SuppressWarnings("unchecked")
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     boolean isSilent = HiveConf.getBoolVar(hconf, 
HiveConf.ConfVars.HIVESESSIONSILENT);
     console = new LogHelper(LOG, isSilent);
     memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, 
conf.getHashtableMemoryUsage());
@@ -192,7 +192,7 @@ public class HashTableSinkOperator extends 
TerminalOperator<HashTableSinkDesc> i
     } catch (SerDeException e) {
       throw new HiveException(e);
     }
-    return result;
+
   }
 
   public MapJoinTableContainer[] getMapJoinTables() {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
index 069d91a..3b92ab6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
@@ -56,15 +56,14 @@ public class JoinOperator extends 
CommonJoinOperator<JoinDesc> implements Serial
   private final transient LongWritable skewjoin_followup_jobs = new 
LongWritable(0);
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     if (handleSkewJoin) {
       skewJoinKeyContext = new SkewJoinHandler(this);
       skewJoinKeyContext.initiliaze(hconf);
       skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs);
     }
     statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS.toString(), 
skewjoin_followup_jobs);
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
index e1479c0..e866eed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
@@ -56,7 +56,7 @@ public class LateralViewForwardOperator extends 
Operator<LateralViewForwardDesc>
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    return super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
index 15b8387..55bb08f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
@@ -84,8 +84,8 @@ public class LateralViewJoinOperator extends 
Operator<LateralViewJoinDesc> {
   public static final byte UDTF_TAG = 1;
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
 
     ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
     ArrayList<String> fieldNames = conf.getOutputInternalColNames();
@@ -107,8 +107,6 @@ public class LateralViewJoinOperator extends 
Operator<LateralViewJoinDesc> {
 
     outputObjInspector = ObjectInspectorFactory
         .getStandardStructObjectInspector(fieldNames, ois);
-    return result;
-
   }
 
   // acc is short for accumulator. It's used to build the row before forwarding

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
index 86519a6..8fe96be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
@@ -39,13 +39,12 @@ public class LimitOperator extends Operator<LimitDesc> 
implements Serializable {
   protected transient boolean isMap;
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     limit = conf.getLimit();
     leastRow = conf.getLeastRows();
     currCount = 0;
     isMap = hconf.getBoolean("mapred.task.is.map", true);
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
index 87917dc..919e72f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
@@ -44,14 +44,13 @@ public class ListSinkOperator extends 
Operator<ListSinkDesc> {
   private transient int numRows;
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     try {
       fetcher = initializeFetcher(hconf);
     } catch (Exception e) {
       throw new HiveException(e);
     }
-    return result;
   }
 
   private FetchFormatter initializeFetcher(Configuration conf) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index cb1f44d..2405d41 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -122,14 +122,11 @@ public class MapJoinOperator extends 
AbstractMapJoinOperator<MapJoinDesc> implem
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+  protected void initializeOp(Configuration hconf) throws HiveException {
     this.hconf = hconf;
     unwrapContainer = new UnwrapRowContainer[conf.getTagLength()];
 
-    Collection<Future<?>> result = super.initializeOp(hconf);
-    if (result == null) {
-      result = new HashSet<Future<?>>();
-    }
+    super.initializeOp(hconf);
 
     int tagLen = conf.getTagLength();
 
@@ -175,13 +172,12 @@ public class MapJoinOperator extends 
AbstractMapJoinOperator<MapJoinDesc> implem
                   return loadHashTable(mapContext, mrContext);
                 }
               });
-      result.add(future);
+      asyncInitOperations.add(future);
     } else if (mapContext == null || mapContext.getLocalWork() == null
         || mapContext.getLocalWork().getInputFileChangeSensitive() == false) {
       loadHashTable(mapContext, mrContext);
       hashTblInitedOnce = true;
     }
-    return result;
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index 014d2a1..00992d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -420,8 +420,8 @@ public class MapOperator extends Operator<MapWork> 
implements Serializable, Clon
   }
 
   @Override
-  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    return super.initializeOp(hconf);
+  public void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
   }
 
   public void initializeMapOperator(Configuration hconf) throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
index eb4dff3..2760a8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
@@ -171,8 +171,8 @@ public class MuxOperator extends Operator<MuxDesc> 
implements Serializable{
   private transient long[] nextCntrs;
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
 
     // A MuxOperator should only have a single child
     if (childOperatorsArray.length != 1) {
@@ -208,7 +208,6 @@ public class MuxOperator extends Operator<MuxDesc> 
implements Serializable{
       cntrs[i] = 0;
       nextCntrs[i] = 1;
     }
-    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 3a6c5f1..63eed9d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -72,6 +72,7 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
   protected AtomicBoolean abortOp;
   private transient ExecMapperContext execContext;
   private transient boolean rootInitializeCalled = false;
+  protected final transient Collection<Future<?>> asyncInitOperations = new 
HashSet<>();
 
   private static AtomicInteger seqId;
 
@@ -362,39 +363,58 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
     // derived classes can set this to different object if needed
     outputObjInspector = inputObjInspectors[0];
 
-    Collection<Future<?>> asyncInitOperations = initializeOp(hconf);
-
-    // sanity checks
-    if (!rootInitializeCalled
-       || asyncInitOperations == null
-       || childOperatorsArray.length != childOperators.size()) {
-      throw new AssertionError("Internal error during operator 
initialization");
-    }
+    boolean isInitOk = false;
+    try {
+      initializeOp(hconf);
+      // sanity checks
+      if (!rootInitializeCalled
+          || childOperatorsArray.length != childOperators.size()) {
+        throw new AssertionError("Internal error during operator 
initialization");
+      }
+      if (isLogInfoEnabled) {
+        LOG.info("Initialization Done " + id + " " + getName());
+      }
 
-    if (isLogInfoEnabled) {
-      LOG.info("Initialization Done " + id + " " + getName());
+      initializeChildren(hconf);
+      isInitOk = true;
+    } finally {
+      // TODO: ugly hack because Java doesn't have dtors and Tez input hangs 
on shutdown.
+      if (!isInitOk) {
+        cancelAsyncInitOps();
+      }
     }
 
-    initializeChildren(hconf);
-
     // let's wait on the async ops before continuing
     completeInitialization(asyncInitOperations);
   }
 
+  private void cancelAsyncInitOps() {
+    for (Future<?> f : asyncInitOperations) {
+      f.cancel(true);
+    }
+    asyncInitOperations.clear();
+  }
+
   private void completeInitialization(Collection<Future<?>> fs) throws 
HiveException {
     Object[] os = new Object[fs.size()];
     int i = 0;
+    Throwable asyncEx = null;
     for (Future<?> f : fs) {
-      try {
-        if (abortOp.get()) {
-          f.cancel(true);
-        } else {
+      if (abortOp.get() || asyncEx != null) {
+        // We were aborted, interrupted or one of the operations failed; 
terminate all.
+        f.cancel(true);
+      } else {
+        try {
           os[i++] = f.get();
+        } catch (Throwable t) {
+          f.cancel(true);
+          asyncEx = t;
         }
-      } catch (Exception e) {
-        throw new HiveException(e);
       }
     }
+    if (asyncEx != null) {
+      throw new HiveException("Async initialization failed", asyncEx);
+    }
     completeInitializationOp(os);
   }
 
@@ -421,9 +441,8 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
   /**
    * Operator specific initialization.
    */
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+  protected void initializeOp(Configuration hconf) throws HiveException {
     rootInitializeCalled = true;
-    return new ArrayList<Future<?>>();
   }
 
   /**
@@ -1363,8 +1382,7 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
     }
 
     @Override
-    protected Collection<Future<?>> initializeOp(Configuration conf) {
-      return childOperators;
+    protected void initializeOp(Configuration conf) {
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
index 7d465d2..113ac21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
@@ -67,8 +67,8 @@ public class PTFOperator extends Operator<PTFDesc> implements 
Serializable {
    * 4. Create input partition to store rows coming from previous operator
    */
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration jobConf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(jobConf);
+  protected void initializeOp(Configuration jobConf) throws HiveException {
+    super.initializeOp(jobConf);
     hiveConf = jobConf;
     isMapOperator = conf.isMapSide();
 
@@ -86,7 +86,6 @@ public class PTFOperator extends Operator<PTFDesc> implements 
Serializable {
     ptfInvocation = setupChain();
     ptfInvocation.initializeStreaming(jobConf, isMapOperator);
     firstMapRow = true;
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 859a28f..81650ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -156,8 +156,8 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
   private final transient LongWritable recordCounter = new LongWritable();
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     try {
 
       numRows = 0;
@@ -248,7 +248,6 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
       LOG.error(msg, e);
       throw new RuntimeException(e);
     }
-    return result;
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
index 390a12e..b094fd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
@@ -93,7 +93,7 @@ public class SMBMapJoinOperator extends 
AbstractMapJoinOperator<SMBJoinDesc> imp
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+  protected void initializeOp(Configuration hconf) throws HiveException {
 
     // If there is a sort-merge join followed by a regular join, the 
SMBJoinOperator may not
     // get initialized at all. Consider the following query:
@@ -101,7 +101,7 @@ public class SMBMapJoinOperator extends 
AbstractMapJoinOperator<SMBJoinDesc> imp
     // For the mapper processing C, The SMJ is not initialized, no need to 
close it either.
     initDone = true;
 
-    Collection<Future<?>> result = super.initializeOp(hconf);
+    super.initializeOp(hconf);
 
     closeCalled = false;
 
@@ -156,7 +156,6 @@ public class SMBMapJoinOperator extends 
AbstractMapJoinOperator<SMBJoinDesc> imp
       }
       foundNextKeyGroup[pos] = false;
     }
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
index f2eed44..35ed67e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
@@ -261,8 +261,8 @@ public class ScriptOperator extends Operator<ScriptDesc> 
implements
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     firstRow = true;
 
     statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), 
deserialize_error_count);
@@ -286,7 +286,6 @@ public class ScriptOperator extends Operator<ScriptDesc> 
implements
     } catch (Exception e) {
       throw new HiveException(ErrorMsg.SCRIPT_INIT_ERROR.getErrorCodedMsg(), 
e);
     }
-    return result;
   }
 
   boolean isBrokenPipeException(IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
index cd7fb92..b1b8459 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
@@ -44,12 +44,12 @@ public class SelectOperator extends Operator<SelectDesc> 
implements Serializable
   private transient boolean isSelectStarNoCompute = false;
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
       isSelectStarNoCompute = true;
-      return result;
+      return;
     }
     List<ExprNodeDesc> colList = conf.getColList();
     eval = new ExprNodeEvaluator[colList.size()];
@@ -66,7 +66,6 @@ public class SelectOperator extends Operator<SelectDesc> 
implements Serializable
     }
     outputObjInspector = initEvaluatorsAndReturnStruct(eval, 
conf.getOutputColumnNames(),
         inputObjInspectors[0]);
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
index 94144a2..4a5f0b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
@@ -61,14 +61,13 @@ public class SparkHashTableSinkOperator
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()];
     inputOIs[tag] = inputObjInspectors[0];
     conf.setTagOrder(new Byte[]{ tag });
     htsOperator.setConf(conf);
     htsOperator.initialize(hconf, inputOIs);
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index cbf02e9..a586bc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -193,17 +193,17 @@ public class TableScanOperator extends 
Operator<TableScanDesc> implements
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     inputFileChanged = false;
 
     if (conf == null) {
-      return result;
+      return;
     }
 
     rowLimit = conf.getRowLimit();
     if (!conf.isGatherStats()) {
-      return result;
+      return;
     }
 
     this.hconf = hconf;
@@ -217,11 +217,6 @@ public class TableScanOperator extends 
Operator<TableScanDesc> implements
     defaultPartitionName = HiveConf.getVar(hconf, 
HiveConf.ConfVars.DEFAULTPARTITIONNAME);
     currentStat = null;
     stats = new HashMap<String, Stat>();
-    if (conf.getPartColumns() == null || conf.getPartColumns().size() == 0) {
-      // NON PARTITIONED table
-      return result;
-    }
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
index 94cecd0..e64fa7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
@@ -59,8 +59,8 @@ public class UDTFOperator extends Operator<UDTFDesc> 
implements Serializable {
   transient AutoProgressor autoProgressor;
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     genericUDTF = conf.getGenericUDTF();
     collector = new UDTFCollector(this);
 
@@ -93,7 +93,6 @@ public class UDTFOperator extends Operator<UDTFDesc> 
implements Serializable {
               hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, 
TimeUnit.MILLISECONDS));
       autoProgressor.go();
     }
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
index 9bbaadd..a49097c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
@@ -56,8 +56,8 @@ public class UnionOperator extends Operator<UnionDesc> 
implements Serializable {
    * needsTransform[].
    */
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
 
     int parents = parentOperators.size();
     parentObjInspectors = new StructObjectInspector[parents];
@@ -119,7 +119,6 @@ public class UnionOperator extends Operator<UnionDesc> 
implements Serializable {
             + "] from " + inputObjInspectors[p] + " to " + outputObjInspector);
       }
     }
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
index df82d21..e4ca2cd 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
@@ -63,7 +63,7 @@ public class VectorAppMasterEventOperator extends 
AppMasterEventOperator {
   }
 
   @Override
-  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+  public void initializeOp(Configuration hconf) throws HiveException {
 
     // We need a input object inspector that is for the row we will extract 
out of the
     // vectorized row batch, not for example, an original inspector for an ORC 
table, etc.
@@ -71,12 +71,8 @@ public class VectorAppMasterEventOperator extends 
AppMasterEventOperator {
         
VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector)
 inputObjInspectors[0]);
 
     // Call AppMasterEventOperator with new input inspector.
-    Collection<Future<?>> result = super.initializeOp(hconf);
-    assert result.isEmpty();
-
+    super.initializeOp(hconf);
     firstBatch = true;
-
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
index 2ccc9a5..09d4a8e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
@@ -59,20 +59,15 @@ public class VectorFileSinkOperator extends 
FileSinkOperator {
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+  protected void initializeOp(Configuration hconf) throws HiveException {
 
     // We need a input object inspector that is for the row we will extract 
out of the
     // vectorized row batch, not for example, an original inspector for an ORC 
table, etc.
     inputObjInspectors[0] =
         
VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector)
 inputObjInspectors[0]);
-
-    // Call FileSinkOperator with new input inspector.
-    Collection<Future<?>> result = super.initializeOp(hconf);
-    assert result.isEmpty();
+    super.initializeOp(hconf);
 
     firstBatch = true;
-
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
index d1b8939..ad65914 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
@@ -61,8 +61,8 @@ public class VectorFilterOperator extends FilterOperator {
 
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
@@ -77,7 +77,6 @@ public class VectorFilterOperator extends FilterOperator {
         filterMode = -1;
       }
     }
-    return result;
   }
 
   public void setFilterCondition(VectorExpression expr) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 39a83e3..a81d107 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -774,9 +774,8 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
-    assert result.isEmpty();
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
@@ -839,7 +838,6 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
       processingMode = this.new ProcessingModeHashAggregate();
     }
     processingMode.initialize(hconf);
-    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
index 0baec2c..b9f42dd 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
@@ -86,9 +86,8 @@ public class VectorMapJoinBaseOperator extends 
MapJoinOperator implements Vector
   }
 
   @Override
-  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  public void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
 
     vrbCtx = new VectorizedRowBatchCtx();
     vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) 
this.outputObjInspector);
@@ -96,8 +95,6 @@ public class VectorMapJoinBaseOperator extends 
MapJoinOperator implements Vector
     outputBatch = vrbCtx.createVectorizedRowBatch();
 
     outputVectorAssignRowMap = new HashMap<ObjectInspector, 
VectorAssignRowSameBatch>();
-
-    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index e9bd44a..007782d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -99,7 +99,7 @@ public class VectorMapJoinOperator extends 
VectorMapJoinBaseOperator {
   }
 
   @Override
-  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+  public void initializeOp(Configuration hconf) throws HiveException {
 
     // Use a final variable to properly parameterize the 
processVectorInspector closure.
     // Using a member variable in the closure will not do the right thing...
@@ -118,7 +118,7 @@ public class VectorMapJoinOperator extends 
VectorMapJoinBaseOperator {
         });
     singleRow = new Object[rowWriters.length];
 
-    Collection<Future<?>> result = super.initializeOp(hconf);
+    super.initializeOp(hconf);
 
     List<ExprNodeDesc> keyDesc = conf.getKeys().get(posBigTable);
     keyOutputWriters = 
VectorExpressionWriterFactory.getExpressionWriters(keyDesc);
@@ -176,8 +176,6 @@ public class VectorMapJoinOperator extends 
VectorMapJoinBaseOperator {
 
     // Filtering is handled in the input batch processing
     filterMaps[posBigTable] = null;
-
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
index a96816f..b8b1f88 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
@@ -60,7 +60,7 @@ public class VectorMapJoinOuterFilteredOperator extends 
VectorMapJoinBaseOperato
   }
 
   @Override
-  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+  public void initializeOp(Configuration hconf) throws HiveException {
 
     final int posBigTable = conf.getPosBigTable();
 
@@ -71,11 +71,9 @@ public class VectorMapJoinOuterFilteredOperator extends 
VectorMapJoinBaseOperato
 
     // Call super VectorMapJoinOuterFilteredOperator, which calls super 
MapJoinOperator with
     // new input inspector.
-    Collection<Future<?>> result = super.initializeOp(hconf);
+    super.initializeOp(hconf);
 
     firstBatch = true;
-
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
index 2c42d1f..41b2874 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
@@ -57,7 +57,7 @@ public class VectorReduceSinkOperator extends 
ReduceSinkOperator {
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
+  protected void initializeOp(Configuration hconf) throws HiveException {
 
     // We need a input object inspector that is for the row we will extract 
out of the
     // vectorized row batch, not for example, an original inspector for an ORC 
table, etc.
@@ -65,12 +65,9 @@ public class VectorReduceSinkOperator extends 
ReduceSinkOperator {
         
VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector)
 inputObjInspectors[0]);
 
     // Call ReduceSinkOperator with new input inspector.
-    Collection<Future<?>> result = super.initializeOp(hconf);
-    assert result.isEmpty();
+    super.initializeOp(hconf);
 
     firstBatch = true;
-
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
index a2f8091..323007e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
@@ -131,8 +131,8 @@ public class VectorSMBMapJoinOperator extends 
SMBMapJoinOperator implements Vect
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
 
     vrbCtx = new VectorizedRowBatchCtx();
     vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) 
this.outputObjInspector);
@@ -214,7 +214,6 @@ public class VectorSMBMapJoinOperator extends 
SMBMapJoinOperator implements Vect
     }
     // Now replace the old evaluators with our own
     joinValues[posBigTable] = vectorNodeEvaluators;
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
index 212aa99..7e95478 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
@@ -83,11 +83,11 @@ public class VectorSelectOperator extends 
Operator<SelectDesc> implements
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
-      return null;
+      return;
     }
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
@@ -106,7 +106,6 @@ public class VectorSelectOperator extends 
Operator<SelectDesc> implements
     for (int i = 0; i < projectedColumns.length; i++) {
       projectedColumns[i] = vExpressions[i].getOutputColumn();
     }
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index af78776..6683e75 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -545,8 +545,8 @@ public abstract class VectorMapJoinCommonOperator extends 
MapJoinOperator implem
   }
 
   @Override
-  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(hconf);
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
 
     if (LOG.isDebugEnabled()) {
       // Determine the name of our map or reduce task for debug tracing.
@@ -606,8 +606,6 @@ public abstract class VectorMapJoinCommonOperator extends 
MapJoinOperator implem
         i++;
       }
     }
-
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java
index 93a6aed..7454b01 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java
@@ -74,10 +74,9 @@ public class FakeCaptureOutputOperator extends 
Operator<FakeCaptureOutputDesc>
   }
 
   @Override
-  public Collection<Future<?>> initializeOp(Configuration conf) throws 
HiveException {
-    Collection<Future<?>> result = super.initializeOp(conf);
+  public void initializeOp(Configuration conf) throws HiveException {
+    super.initializeOp(conf);
     rows = new ArrayList<Object>();
-    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java
index fe990f8..3bea307 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java
@@ -62,8 +62,8 @@ public class FakeVectorDataSourceOperator extends 
Operator<FakeVectorDataSourceO
   }
 
   @Override
-  public Collection<Future<?>> initializeOp(Configuration conf) throws 
HiveException {
-    return super.initializeOp(conf);
+  public void initializeOp(Configuration conf) throws HiveException {
+    super.initializeOp(conf);
   }
 
   @Override

Reply via email to