rbalamohan commented on a change in pull request #1485:
URL: https://github.com/apache/hive/pull/1485#discussion_r486786544



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
##########
@@ -216,29 +216,47 @@ public FSPaths(Path specPath, boolean isMmTable, boolean 
isDirectInsert, boolean
     }
 
     public void closeWriters(boolean abort) throws HiveException {
+      Exception exception = null;
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
             outWriters[idx].close(abort);
             updateProgress();
           } catch (IOException e) {
-            throw new HiveException(e);
+            exception = e;
+            LOG.error("Error closing " + outWriters[idx].toString(), e);
+            // continue closing others
           }
         }
       }
-      try {
+      for (int i = 0; i < updaters.length; i++) {
+        if (updaters[i] != null) {
+          SerDeStats stats = updaters[i].getStats();
+          // Ignore 0 row files except in case of insert overwrite
+          if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
+            outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+          }
+          try {
+            updaters[i].close(abort);
+          } catch (IOException e) {
+            exception = e;
+            LOG.error("Error closing " + updaters[i].toString(), e);
+            // continue closing others
+          }
+        }
+      }
+      // Made an attempt to close all writers.
+      if (exception != null) {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
-            SerDeStats stats = updaters[i].getStats();
-            // Ignore 0 row files except in case of insert overwrite
-            if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
-              outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+            try {
+              fs.delete(updaters[i].getUpdatedFilePath(), true);
+            } catch (IOException e) {
+              e.printStackTrace();

Review comment:
       LOG?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
##########
@@ -284,6 +285,11 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, 
sortPositions, sortOrder, sortNullOrder,
           allRSCols, bucketColumns, numBuckets, fsParent, 
fsOp.getConf().getWriteType());
+      // we have to make sure not to reorder the child operators as it might 
cause weird behavior in the tasks at
+      // the same level. when there is auto stats gather at the same level as 
another operation then it might
+      // cause unnecessary preemption. Maintaining the order here to avoid 
such preemption and possible errors

Review comment:
       Plz add TEZ-3296 as ref if possible.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
##########
@@ -216,29 +216,47 @@ public FSPaths(Path specPath, boolean isMmTable, boolean 
isDirectInsert, boolean
     }
 
     public void closeWriters(boolean abort) throws HiveException {
+      Exception exception = null;
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
             outWriters[idx].close(abort);
             updateProgress();
           } catch (IOException e) {
-            throw new HiveException(e);
+            exception = e;
+            LOG.error("Error closing " + outWriters[idx].toString(), e);
+            // continue closing others
           }
         }
       }
-      try {
+      for (int i = 0; i < updaters.length; i++) {
+        if (updaters[i] != null) {
+          SerDeStats stats = updaters[i].getStats();
+          // Ignore 0 row files except in case of insert overwrite
+          if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
+            outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+          }
+          try {
+            updaters[i].close(abort);
+          } catch (IOException e) {
+            exception = e;
+            LOG.error("Error closing " + updaters[i].toString(), e);
+            // continue closing others
+          }
+        }
+      }
+      // Made an attempt to close all writers.
+      if (exception != null) {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
-            SerDeStats stats = updaters[i].getStats();
-            // Ignore 0 row files except in case of insert overwrite
-            if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
-              outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+            try {
+              fs.delete(updaters[i].getUpdatedFilePath(), true);
+            } catch (IOException e) {
+              e.printStackTrace();

Review comment:
       LOG?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
##########
@@ -284,6 +285,11 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, 
sortPositions, sortOrder, sortNullOrder,
           allRSCols, bucketColumns, numBuckets, fsParent, 
fsOp.getConf().getWriteType());
+      // we have to make sure not to reorder the child operators as it might 
cause weird behavior in the tasks at
+      // the same level. when there is auto stats gather at the same level as 
another operation then it might
+      // cause unnecessary preemption. Maintaining the order here to avoid 
such preemption and possible errors

Review comment:
       Plz add TEZ-3296 as ref if possible.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
##########
@@ -216,29 +216,47 @@ public FSPaths(Path specPath, boolean isMmTable, boolean 
isDirectInsert, boolean
     }
 
     public void closeWriters(boolean abort) throws HiveException {
+      Exception exception = null;
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
             outWriters[idx].close(abort);
             updateProgress();
           } catch (IOException e) {
-            throw new HiveException(e);
+            exception = e;
+            LOG.error("Error closing " + outWriters[idx].toString(), e);
+            // continue closing others
           }
         }
       }
-      try {
+      for (int i = 0; i < updaters.length; i++) {
+        if (updaters[i] != null) {
+          SerDeStats stats = updaters[i].getStats();
+          // Ignore 0 row files except in case of insert overwrite
+          if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
+            outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+          }
+          try {
+            updaters[i].close(abort);
+          } catch (IOException e) {
+            exception = e;
+            LOG.error("Error closing " + updaters[i].toString(), e);
+            // continue closing others
+          }
+        }
+      }
+      // Made an attempt to close all writers.
+      if (exception != null) {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
-            SerDeStats stats = updaters[i].getStats();
-            // Ignore 0 row files except in case of insert overwrite
-            if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
-              outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+            try {
+              fs.delete(updaters[i].getUpdatedFilePath(), true);
+            } catch (IOException e) {
+              e.printStackTrace();

Review comment:
       LOG?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
##########
@@ -284,6 +285,11 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, 
sortPositions, sortOrder, sortNullOrder,
           allRSCols, bucketColumns, numBuckets, fsParent, 
fsOp.getConf().getWriteType());
+      // we have to make sure not to reorder the child operators as it might 
cause weird behavior in the tasks at
+      // the same level. when there is auto stats gather at the same level as 
another operation then it might
+      // cause unnecessary preemption. Maintaining the order here to avoid 
such preemption and possible errors

Review comment:
       Plz add TEZ-3296 as ref if possible.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
##########
@@ -216,29 +216,47 @@ public FSPaths(Path specPath, boolean isMmTable, boolean 
isDirectInsert, boolean
     }
 
     public void closeWriters(boolean abort) throws HiveException {
+      Exception exception = null;
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
             outWriters[idx].close(abort);
             updateProgress();
           } catch (IOException e) {
-            throw new HiveException(e);
+            exception = e;
+            LOG.error("Error closing " + outWriters[idx].toString(), e);
+            // continue closing others
           }
         }
       }
-      try {
+      for (int i = 0; i < updaters.length; i++) {
+        if (updaters[i] != null) {
+          SerDeStats stats = updaters[i].getStats();
+          // Ignore 0 row files except in case of insert overwrite
+          if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
+            outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+          }
+          try {
+            updaters[i].close(abort);
+          } catch (IOException e) {
+            exception = e;
+            LOG.error("Error closing " + updaters[i].toString(), e);
+            // continue closing others
+          }
+        }
+      }
+      // Made an attempt to close all writers.
+      if (exception != null) {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
-            SerDeStats stats = updaters[i].getStats();
-            // Ignore 0 row files except in case of insert overwrite
-            if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
-              outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+            try {
+              fs.delete(updaters[i].getUpdatedFilePath(), true);
+            } catch (IOException e) {
+              e.printStackTrace();

Review comment:
       LOG?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
##########
@@ -284,6 +285,11 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, 
sortPositions, sortOrder, sortNullOrder,
           allRSCols, bucketColumns, numBuckets, fsParent, 
fsOp.getConf().getWriteType());
+      // we have to make sure not to reorder the child operators as it might 
cause weird behavior in the tasks at
+      // the same level. when there is auto stats gather at the same level as 
another operation then it might
+      // cause unnecessary preemption. Maintaining the order here to avoid 
such preemption and possible errors

Review comment:
       Plz add TEZ-3296 as ref if possible.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
##########
@@ -216,29 +216,47 @@ public FSPaths(Path specPath, boolean isMmTable, boolean 
isDirectInsert, boolean
     }
 
     public void closeWriters(boolean abort) throws HiveException {
+      Exception exception = null;
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
             outWriters[idx].close(abort);
             updateProgress();
           } catch (IOException e) {
-            throw new HiveException(e);
+            exception = e;
+            LOG.error("Error closing " + outWriters[idx].toString(), e);
+            // continue closing others
           }
         }
       }
-      try {
+      for (int i = 0; i < updaters.length; i++) {
+        if (updaters[i] != null) {
+          SerDeStats stats = updaters[i].getStats();
+          // Ignore 0 row files except in case of insert overwrite
+          if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
+            outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+          }
+          try {
+            updaters[i].close(abort);
+          } catch (IOException e) {
+            exception = e;
+            LOG.error("Error closing " + updaters[i].toString(), e);
+            // continue closing others
+          }
+        }
+      }
+      // Made an attempt to close all writers.
+      if (exception != null) {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
-            SerDeStats stats = updaters[i].getStats();
-            // Ignore 0 row files except in case of insert overwrite
-            if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
-              outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+            try {
+              fs.delete(updaters[i].getUpdatedFilePath(), true);
+            } catch (IOException e) {
+              e.printStackTrace();

Review comment:
       LOG?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
##########
@@ -284,6 +285,11 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, 
sortPositions, sortOrder, sortNullOrder,
           allRSCols, bucketColumns, numBuckets, fsParent, 
fsOp.getConf().getWriteType());
+      // we have to make sure not to reorder the child operators as it might 
cause weird behavior in the tasks at
+      // the same level. when there is auto stats gather at the same level as 
another operation then it might
+      // cause unnecessary preemption. Maintaining the order here to avoid 
such preemption and possible errors

Review comment:
       Plz add TEZ-3296 as ref if possible.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
##########
@@ -216,29 +216,47 @@ public FSPaths(Path specPath, boolean isMmTable, boolean 
isDirectInsert, boolean
     }
 
     public void closeWriters(boolean abort) throws HiveException {
+      Exception exception = null;
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
             outWriters[idx].close(abort);
             updateProgress();
           } catch (IOException e) {
-            throw new HiveException(e);
+            exception = e;
+            LOG.error("Error closing " + outWriters[idx].toString(), e);
+            // continue closing others
           }
         }
       }
-      try {
+      for (int i = 0; i < updaters.length; i++) {
+        if (updaters[i] != null) {
+          SerDeStats stats = updaters[i].getStats();
+          // Ignore 0 row files except in case of insert overwrite
+          if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
+            outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+          }
+          try {
+            updaters[i].close(abort);
+          } catch (IOException e) {
+            exception = e;
+            LOG.error("Error closing " + updaters[i].toString(), e);
+            // continue closing others
+          }
+        }
+      }
+      // Made an attempt to close all writers.
+      if (exception != null) {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
-            SerDeStats stats = updaters[i].getStats();
-            // Ignore 0 row files except in case of insert overwrite
-            if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
-              outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+            try {
+              fs.delete(updaters[i].getUpdatedFilePath(), true);
+            } catch (IOException e) {
+              e.printStackTrace();

Review comment:
       LOG?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
##########
@@ -284,6 +285,11 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, 
sortPositions, sortOrder, sortNullOrder,
           allRSCols, bucketColumns, numBuckets, fsParent, 
fsOp.getConf().getWriteType());
+      // we have to make sure not to reorder the child operators as it might 
cause weird behavior in the tasks at
+      // the same level. when there is auto stats gather at the same level as 
another operation then it might
+      // cause unnecessary preemption. Maintaining the order here to avoid 
such preemption and possible errors

Review comment:
       Plz add TEZ-3296 as ref if possible.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
##########
@@ -216,29 +216,47 @@ public FSPaths(Path specPath, boolean isMmTable, boolean 
isDirectInsert, boolean
     }
 
     public void closeWriters(boolean abort) throws HiveException {
+      Exception exception = null;
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
             outWriters[idx].close(abort);
             updateProgress();
           } catch (IOException e) {
-            throw new HiveException(e);
+            exception = e;
+            LOG.error("Error closing " + outWriters[idx].toString(), e);
+            // continue closing others
           }
         }
       }
-      try {
+      for (int i = 0; i < updaters.length; i++) {
+        if (updaters[i] != null) {
+          SerDeStats stats = updaters[i].getStats();
+          // Ignore 0 row files except in case of insert overwrite
+          if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
+            outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+          }
+          try {
+            updaters[i].close(abort);
+          } catch (IOException e) {
+            exception = e;
+            LOG.error("Error closing " + updaters[i].toString(), e);
+            // continue closing others
+          }
+        }
+      }
+      // Made an attempt to close all writers.
+      if (exception != null) {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
-            SerDeStats stats = updaters[i].getStats();
-            // Ignore 0 row files except in case of insert overwrite
-            if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
-              outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+            try {
+              fs.delete(updaters[i].getUpdatedFilePath(), true);
+            } catch (IOException e) {
+              e.printStackTrace();

Review comment:
       LOG?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
##########
@@ -284,6 +285,11 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, 
sortPositions, sortOrder, sortNullOrder,
           allRSCols, bucketColumns, numBuckets, fsParent, 
fsOp.getConf().getWriteType());
+      // we have to make sure not to reorder the child operators as it might 
cause weird behavior in the tasks at
+      // the same level. when there is auto stats gather at the same level as 
another operation then it might
+      // cause unnecessary preemption. Maintaining the order here to avoid 
such preemption and possible errors

Review comment:
       Plz add TEZ-3296 as ref if possible.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
##########
@@ -216,29 +216,47 @@ public FSPaths(Path specPath, boolean isMmTable, boolean 
isDirectInsert, boolean
     }
 
     public void closeWriters(boolean abort) throws HiveException {
+      Exception exception = null;
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
             outWriters[idx].close(abort);
             updateProgress();
           } catch (IOException e) {
-            throw new HiveException(e);
+            exception = e;
+            LOG.error("Error closing " + outWriters[idx].toString(), e);
+            // continue closing others
           }
         }
       }
-      try {
+      for (int i = 0; i < updaters.length; i++) {
+        if (updaters[i] != null) {
+          SerDeStats stats = updaters[i].getStats();
+          // Ignore 0 row files except in case of insert overwrite
+          if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
+            outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+          }
+          try {
+            updaters[i].close(abort);
+          } catch (IOException e) {
+            exception = e;
+            LOG.error("Error closing " + updaters[i].toString(), e);
+            // continue closing others
+          }
+        }
+      }
+      // Made an attempt to close all writers.
+      if (exception != null) {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
-            SerDeStats stats = updaters[i].getStats();
-            // Ignore 0 row files except in case of insert overwrite
-            if (isDirectInsert && (stats.getRowCount() > 0 || 
isInsertOverwrite)) {
-              outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
+            try {
+              fs.delete(updaters[i].getUpdatedFilePath(), true);
+            } catch (IOException e) {
+              e.printStackTrace();

Review comment:
       LOG?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
##########
@@ -284,6 +285,11 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, 
sortPositions, sortOrder, sortNullOrder,
           allRSCols, bucketColumns, numBuckets, fsParent, 
fsOp.getConf().getWriteType());
+      // we have to make sure not to reorder the child operators as it might 
cause weird behavior in the tasks at
+      // the same level. when there is auto stats gather at the same level as 
another operation then it might
+      // cause unnecessary preemption. Maintaining the order here to avoid 
such preemption and possible errors

Review comment:
       Plz add TEZ-3296 as ref if possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to