Repository: hive
Updated Branches:
  refs/heads/master 91082e5ff -> ed82cfa91


HIVE-14783 : bucketing column should be part of sorting for delete/update 
operation when spdo is on (Ashutosh Chauhan via Prasanth J)
Addendum patch

Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed82cfa9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed82cfa9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed82cfa9

Branch: refs/heads/master
Commit: ed82cfa914769cfabfc7460b7b5abbdae71e562a
Parents: 91082e5
Author: Ashutosh Chauhan <hashut...@apache.org>
Authored: Wed Sep 21 15:18:37 2016 -0700
Committer: Ashutosh Chauhan <hashut...@apache.org>
Committed: Wed Sep 21 15:19:18 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 10 +++++-----
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java |  9 +++++++++
 .../optimizer/SortedDynPartitionOptimizer.java  |  4 ++--
 .../dynpart_sort_optimization_acid.q.out        | 20 ++++++++++----------
 4 files changed, 26 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ed82cfa9/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index e386717..eeba6cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -766,19 +766,19 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         if (fpaths.acidLastBucket != bucketNum) {
           fpaths.acidLastBucket = bucketNum;
           // Switch files
-          fpaths.updaters[++fpaths.acidFileOffset] = 
HiveFileFormatUtils.getAcidRecordUpdater(
-              jc, conf.getTableInfo(), bucketNum, conf, 
fpaths.outPaths[fpaths.acidFileOffset],
+          
fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 : ++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
+              jc, conf.getTableInfo(), bucketNum, conf, 
fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 :fpaths.acidFileOffset],
               rowInspector, reporter, 0);
           if (isDebugEnabled) {
             LOG.debug("Created updater for bucket number " + bucketNum + " 
using file " +
-                fpaths.outPaths[fpaths.acidFileOffset]);
+                
fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 :fpaths.acidFileOffset]);
           }
         }
 
         if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
-          
fpaths.updaters[fpaths.acidFileOffset].update(conf.getTransactionId(), row);
+          
fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 :fpaths.acidFileOffset].update(conf.getTransactionId(), row);
         } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
-          
fpaths.updaters[fpaths.acidFileOffset].delete(conf.getTransactionId(), row);
+          
fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 :fpaths.acidFileOffset].delete(conf.getTransactionId(), row);
         } else {
           throw new HiveException("Unknown write type " + 
conf.getWriteType().toString());
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed82cfa9/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index a9885d8..4eea6b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -78,6 +80,7 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
   private transient ObjectInspector[] partitionObjectInspectors;
   private transient ObjectInspector[] bucketObjectInspectors;
   private transient int buckColIdxInKey;
+  private transient int buckColIdxInKeyForAcid = -1;
   private boolean firstRow;
   private transient int tag;
   private boolean skipTag = false;
@@ -183,6 +186,9 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
       keyEval = new ExprNodeEvaluator[keys.size()];
       int i = 0;
       for (ExprNodeDesc e : keys) {
+        if (e instanceof ExprNodeConstantDesc && 
("_bucket_number").equals(((ExprNodeConstantDesc)e).getValue())) {
+          buckColIdxInKeyForAcid = i;
+        }
         keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
       }
 
@@ -359,6 +365,9 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
         // In the non-partitioned case we still want to compute the bucket 
number for updates and
         // deletes.
         bucketNumber = computeBucketNumber(row, conf.getNumBuckets());
+        if (buckColIdxInKeyForAcid != -1) {
+          cachedKeys[0][buckColIdxInKeyForAcid] = new 
Text(String.valueOf(bucketNumber));
+        }
       }
 
       HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/ed82cfa9/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 8b4af72..926386b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -247,7 +247,7 @@ public class SortedDynPartitionOptimizer extends Transform {
         }
       }
       RowSchema selRS = new RowSchema(fsParent.getSchema());
-      if (!bucketColumns.isEmpty()) {
+      if (!bucketColumns.isEmpty() || fsOp.getConf().getWriteType() == 
Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) {
         descs.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, 
ReduceField.KEY.toString()+".'"+BUCKET_NUMBER_COL_NAME+"'", null, false));
         colNames.add("'"+BUCKET_NUMBER_COL_NAME+"'");
         ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, 
TypeInfoFactory.stringTypeInfo, selRS.getSignature().get(0).getTabAlias(), 
true, true);
@@ -268,7 +268,7 @@ public class SortedDynPartitionOptimizer extends Transform {
 
       // Set if partition sorted or partition bucket sorted
       fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED);
-      if (bucketColumns.size() > 0) {
+      if (bucketColumns.size() > 0 || fsOp.getConf().getWriteType() == 
Operation.DELETE || fsOp.getConf().getWriteType() == Operation.UPDATE) {
         
fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_BUCKET_SORTED);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ed82cfa9/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out 
b/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out
index 1838d6a..111ce18 100644
--- a/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out
+++ b/ql/src/test/results/clientpositive/dynpart_sort_optimization_acid.q.out
@@ -422,8 +422,8 @@ STAGE PLANS:
                   Statistics: Num rows: 892 Data size: 2676 Basic stats: 
COMPLETE Column stats: NONE
       Reduce Operator Tree:
         Select Operator
-          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), 'foo' (type: string), 
'bar' (type: string), KEY._col3 (type: string)
-          outputColumnNames: _col0, _col1, _col2, _col3
+          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), 'foo' (type: string), 
'bar' (type: string), KEY._col3 (type: string), KEY.'_bucket_number' (type: 
string)
+          outputColumnNames: _col0, _col1, _col2, _col3, '_bucket_number'
           Statistics: Num rows: 892 Data size: 2676 Basic stats: COMPLETE 
Column stats: NONE
           File Output Operator
             compressed: false
@@ -1042,8 +1042,8 @@ STAGE PLANS:
                   Statistics: Num rows: 1517 Data size: 4551 Basic stats: 
COMPLETE Column stats: NONE
       Reduce Operator Tree:
         Select Operator
-          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), 'foo' (type: string), 
'bar' (type: string), '2008-04-08' (type: string), KEY._col4 (type: int)
-          outputColumnNames: _col0, _col1, _col2, _col3, _col4
+          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), 'foo' (type: string), 
'bar' (type: string), '2008-04-08' (type: string), KEY._col4 (type: int), 
KEY.'_bucket_number' (type: string)
+          outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
'_bucket_number'
           Statistics: Num rows: 1517 Data size: 4551 Basic stats: COMPLETE 
Column stats: NONE
           File Output Operator
             compressed: false
@@ -1152,8 +1152,8 @@ STAGE PLANS:
                   Statistics: Num rows: 2979 Data size: 8937 Basic stats: 
COMPLETE Column stats: NONE
       Reduce Operator Tree:
         Select Operator
-          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), KEY._col1 (type: 
string), KEY._col2 (type: int)
-          outputColumnNames: _col0, _col1, _col2
+          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), KEY._col1 (type: 
string), KEY._col2 (type: int), KEY.'_bucket_number' (type: string)
+          outputColumnNames: _col0, _col1, _col2, '_bucket_number'
           Statistics: Num rows: 2979 Data size: 8937 Basic stats: COMPLETE 
Column stats: NONE
           File Output Operator
             compressed: false
@@ -1327,8 +1327,8 @@ STAGE PLANS:
                   value expressions: _col1 (type: string), 'bar' (type: string)
       Reduce Operator Tree:
         Select Operator
-          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col1 (type: 
string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: 
int)
-          outputColumnNames: _col0, _col1, _col2, _col3, _col4
+          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col1 (type: 
string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: 
int), KEY.'_bucket_number' (type: string)
+          outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
'_bucket_number'
           Statistics: Num rows: 23 Data size: 2322 Basic stats: COMPLETE 
Column stats: NONE
           File Output Operator
             compressed: false
@@ -1407,8 +1407,8 @@ STAGE PLANS:
                   value expressions: _col1 (type: string), 'bar' (type: string)
       Reduce Operator Tree:
         Select Operator
-          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col1 (type: 
string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: 
int)
-          outputColumnNames: _col0, _col1, _col2, _col3, _col4
+          expressions: KEY._col0 (type: 
struct<transactionid:bigint,bucketid:int,rowid:bigint>), VALUE._col1 (type: 
string), VALUE._col2 (type: string), KEY._col3 (type: string), KEY._col4 (type: 
int), KEY.'_bucket_number' (type: string)
+          outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
'_bucket_number'
           Statistics: Num rows: 45 Data size: 4550 Basic stats: COMPLETE 
Column stats: NONE
           File Output Operator
             compressed: false

Reply via email to