HIVE-14993 make WriteEntity distinguish writeType (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/394fc47d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/394fc47d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/394fc47d Branch: refs/heads/hive-14535 Commit: 394fc47daf1c9e6c2b5406b8f0a57163a2678315 Parents: 6cca991 Author: Eugene Koifman <[email protected]> Authored: Sat Oct 22 11:49:41 2016 -0700 Committer: Eugene Koifman <[email protected]> Committed: Sat Oct 22 11:49:41 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 66 +++++++++++++++----- .../apache/hadoop/hive/ql/exec/MoveTask.java | 34 +++++++--- .../org/apache/hadoop/hive/ql/hooks/Entity.java | 13 ++-- .../apache/hadoop/hive/ql/hooks/ReadEntity.java | 2 +- .../hadoop/hive/ql/hooks/WriteEntity.java | 6 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 15 +++-- 6 files changed, 93 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index acf570f..cfece77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -928,7 +928,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { ); if (HiveUtils.getIndexHandler(conf, crtIndex.getIndexTypeHandlerClass()).usesIndexTable()) { Table indexTable = db.getTable(indexTableName); - work.getOutputs().add(new WriteEntity(indexTable, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(indexTable, WriteEntity.WriteType.DDL_NO_LOCK)); } return 0; } @@ -1024,7 +1024,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { private int addPartitions(Hive db, AddPartitionDesc addPartitionDesc) throws HiveException { List<Partition> parts = db.createPartitions(addPartitionDesc); for (Partition part : parts) { - work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.INSERT)); + addIfAbsentByName(new WriteEntity(part, WriteEntity.WriteType.INSERT)); } return 0; } @@ -1058,7 +1058,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { .getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false); work.getInputs().add(new ReadEntity(oldPart)); // We've already obtained a lock on the table, don't lock the partition too - work.getOutputs().add(new WriteEntity(newPart, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(newPart, WriteEntity.WriteType.DDL_NO_LOCK)); return 0; } @@ -1150,7 +1150,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { work.getInputs().add(new ReadEntity(tbl)); // We've already locked the table as the input, don't relock it as the output. - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); return 0; } @@ -1176,7 +1176,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { throw new HiveException("Uable to update table"); } work.getInputs().add(new ReadEntity(tbl)); - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); } else { Partition part = db.getPartition(tbl, touchDesc.getPartSpec(), false); if (part == null) { @@ -1188,7 +1188,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { throw new HiveException(e); } work.getInputs().add(new ReadEntity(part)); - work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(part, WriteEntity.WriteType.DDL_NO_LOCK)); } return 0; } @@ -3388,14 +3388,46 @@ public class DDLTask extends Task<DDLWork> implements Serializable { if (allPartitions != null ) { for (Partition tmpPart: allPartitions) { work.getInputs().add(new ReadEntity(tmpPart)); - work.getOutputs().add(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK)); } } else { work.getInputs().add(new ReadEntity(oldTbl)); - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); } return 0; } + /** + * There are many places where "duplicate" Read/WriteEnity objects are added. The way this was + * initially implemented, the duplicate just replaced the previous object. + * (work.getOutputs() is a Set and WriteEntity#equals() relies on name) + * This may be benign for ReadEntity and perhaps was benign for WriteEntity before WriteType was + * added. Now that WriteEntity has a WriteType it replaces it with one with possibly different + * {@link org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType}. It's hard to imagine + * how this is desirable. + * + * As of HIVE-14993, WriteEntity with different WriteType must be considered different. + * So WriteEntity create in DDLTask cause extra output in golden files, but only because + * DDLTask sets a different WriteType for the same Entity. + * + * In the spirit of bug-for-bug compatibility, this method ensures we only add new + * WriteEntity if it's really new. + * + * @return {@code true} if item was added + */ + static boolean addIfAbsentByName(WriteEntity newWriteEntity, Set<WriteEntity> outputs) { + for(WriteEntity writeEntity : outputs) { + if(writeEntity.getName().equalsIgnoreCase(newWriteEntity.getName())) { + LOG.debug("Ignoring request to add " + newWriteEntity.toStringDetail() + " because " + + writeEntity.toStringDetail() + " is present"); + return false; + } + } + outputs.add(newWriteEntity); + return true; + } + private boolean addIfAbsentByName(WriteEntity newWriteEntity) { + return addIfAbsentByName(newWriteEntity, work.getOutputs()); + } private boolean isSchemaEvolutionEnabled(Table tbl) { boolean isAcid = AcidUtils.isTablePropertyTransactional(tbl.getMetadata()); @@ -3807,7 +3839,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { for (Partition partition : droppedParts) { console.printInfo("Dropped the partition " + partition.getName()); // We have already locked the table, don't lock the partitions. - work.getOutputs().add(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK)); }; } @@ -3900,7 +3932,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { db.dropTable(dropTbl.getTableName(), dropTbl.getIfPurge()); if (tbl != null) { // We have already locked the table in DDLSemanticAnalyzer, don't do it again here - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); } } @@ -4067,7 +4099,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { ); } } - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); return 0; } @@ -4215,7 +4247,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { // create the table db.createTable(tbl, crtTbl.getIfNotExists()); - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); return 0; } @@ -4258,10 +4290,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } catch (InvalidOperationException e) { throw new HiveException(e); } - work.getOutputs().add(new WriteEntity(oldview, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(oldview, WriteEntity.WriteType.DDL_NO_LOCK)); } else { // This is a replace, so we need an exclusive lock - work.getOutputs().add(new WriteEntity(oldview, WriteEntity.WriteType.DDL_EXCLUSIVE)); + addIfAbsentByName(new WriteEntity(oldview, WriteEntity.WriteType.DDL_EXCLUSIVE)); } } else { // create new view @@ -4310,7 +4342,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } db.createTable(tbl, crtView.getIfNotExists()); - work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); + addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); } return 0; } @@ -4385,10 +4417,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable { // Reuse the partition specs from dest partition since they should be the same work.getInputs().add(new ReadEntity(new Partition(sourceTable, partition.getSpec(), null))); - work.getOutputs().add(new WriteEntity(new Partition(sourceTable, partition.getSpec(), null), + addIfAbsentByName(new WriteEntity(new Partition(sourceTable, partition.getSpec(), null), WriteEntity.WriteType.DELETE)); - work.getOutputs().add(new WriteEntity(new Partition(destTable, partition.getSpec(), null), + addIfAbsentByName(new WriteEntity(new Partition(destTable, partition.getSpec(), null), WriteEntity.WriteType.INSERT)); } http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index ec21cd6..8265af4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -362,9 +362,8 @@ public class MoveTask extends Task<MoveWork> implements Serializable { work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask()); if (work.getOutputs() != null) { - work.getOutputs().add(new WriteEntity(table, - (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT))); + DDLTask.addIfAbsentByName(new WriteEntity(table, + getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); } } else { LOG.info("Partition is: " + tbd.getPartitionSpec().toString()); @@ -467,10 +466,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable { } WriteEntity enty = new WriteEntity(partn, - (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT)); + getWriteType(tbd, work.getLoadTableWork().getWriteType())); if (work.getOutputs() != null) { - work.getOutputs().add(enty); + DDLTask.addIfAbsentByName(enty, work.getOutputs()); } // Need to update the queryPlan's output as well so that post-exec hook get executed. // This is only needed for dynamic partitioning since for SP the the WriteEntity is @@ -515,9 +513,8 @@ public class MoveTask extends Task<MoveWork> implements Serializable { dc = new DataContainer(table.getTTable(), partn.getTPartition()); // add this partition to post-execution hook if (work.getOutputs() != null) { - work.getOutputs().add(new WriteEntity(partn, - (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE - : WriteEntity.WriteType.INSERT))); + DDLTask.addIfAbsentByName(new WriteEntity(partn, + getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); } } } @@ -552,7 +549,24 @@ public class MoveTask extends Task<MoveWork> implements Serializable { return (1); } } - + /** + * so to make sure we crate WriteEntity with the right WriteType. This is (at this point) only + * for consistency since LockManager (which is the only thing that pays attention to WriteType) + * has done it's job before the query ran. + */ + WriteEntity.WriteType getWriteType(LoadTableDesc tbd, AcidUtils.Operation operation) { + if(tbd.getReplace()) { + return WriteEntity.WriteType.INSERT_OVERWRITE; + } + switch (operation) { + case DELETE: + return WriteEntity.WriteType.DELETE; + case UPDATE: + return WriteEntity.WriteType.UPDATE; + default: + return WriteEntity.WriteType.INSERT; + } + } private boolean isSkewedStoredAsDirs(LoadTableDesc tbd) { return (tbd.getLbCtx() == null) ? false : tbd.getLbCtx() .isSkewedStoredAsDir(); http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java index 174b5a8..0842066 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java @@ -79,7 +79,7 @@ public class Entity implements Serializable { * This is derived from t and p, but we need to serialize this field to make * sure Entity.hashCode() does not need to recursively read into t and p. */ - private String name; + private final String name; /** * Whether the output is complete or not. For eg, for dynamic partitions, the @@ -99,10 +99,6 @@ public class Entity implements Serializable { return name; } - public void setName(String name) { - this.name = name; - } - public Database getDatabase() { return database; } @@ -162,6 +158,7 @@ public class Entity implements Serializable { * Only used by serialization. */ public Entity() { + name = null; } /** @@ -326,7 +323,7 @@ public class Entity implements Serializable { */ @Override public String toString() { - return name; + return getName(); } private String computeName() { @@ -360,7 +357,7 @@ public class Entity implements Serializable { if (o instanceof Entity) { Entity ore = (Entity) o; - return (toString().equalsIgnoreCase(ore.toString())); + return (getName().equalsIgnoreCase(ore.getName())); } else { return false; } @@ -371,7 +368,7 @@ public class Entity implements Serializable { */ @Override public int hashCode() { - return toString().hashCode(); + return getName().hashCode(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java index fccb243..3d7de69 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java @@ -148,7 +148,7 @@ public class ReadEntity extends Entity implements Serializable { if (o instanceof ReadEntity) { ReadEntity ore = (ReadEntity) o; - return (toString().equalsIgnoreCase(ore.toString())); + return (getName().equalsIgnoreCase(ore.getName())); } else { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java index 2194a6d..9e18638 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java @@ -168,12 +168,16 @@ public class WriteEntity extends Entity implements Serializable { if (o instanceof WriteEntity) { WriteEntity ore = (WriteEntity) o; - return (toString().equalsIgnoreCase(ore.toString())); + return (getName().equalsIgnoreCase(ore.getName())) && this.writeType == ore.writeType; } else { return false; } } + public String toStringDetail() { + return "WriteEntity(" + toString() + ") Type=" + getType() + " WriteType=" + getWriteType(); + } + public boolean isTempURI() { return isTempURI; } http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 9d58193..9db8a22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6673,7 +6673,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { new DummyPartition(dest_tab, dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath, partSpec); - output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false); + output = new WriteEntity(p, getWriteType(), false); outputs.add(output); } catch (HiveException e) { throw new SemanticException(e.getMessage(), e); @@ -6746,9 +6746,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); - if (!outputs.add(new WriteEntity(dest_part, (ltd.getReplace() ? - WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT)))) { + + if (!outputs.add(new WriteEntity(dest_part, + determineWriteType(ltd, dest_tab.isNonNative())))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_tab.getTableName() + "@" + dest_part.getName())); } @@ -13034,8 +13034,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // and don't have a rational way to guess, so assume the most // conservative case. if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE; - else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT); + else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType()); + } + private WriteEntity.WriteType getWriteType() { + return updating() ? WriteEntity.WriteType.UPDATE : + (deleting() ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT); } private boolean isAcidOutputFormat(Class<? extends OutputFormat> of) {
