HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
Conflicts:
ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/423537a3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/423537a3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/423537a3
Branch: refs/heads/hive-14535
Commit: 423537a390e66667aec070356a55657a241f59b2
Parents: b60bbc2 8a6d818
Author: Sergey Shelukhin <[email protected]>
Authored: Tue Oct 25 14:03:47 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Tue Oct 25 14:03:47 2016 -0700
----------------------------------------------------------------------
.../positive/accumulo_predicate_pushdown.q.out | 131 +-
.../results/positive/accumulo_queries.q.out | 121 +-
.../accumulo_single_sourced_multi_insert.q.out | 39 +-
.../java/org/apache/hive/beeline/BeeLine.java | 184 +-
.../java/org/apache/hive/beeline/Commands.java | 23 +-
.../hive/common/metrics/LegacyMetrics.java | 6 +
.../hive/common/metrics/common/Metrics.java | 10 +
.../common/metrics/common/MetricsConstant.java | 5 +
.../metrics/metrics2/CodahaleMetrics.java | 17 +-
.../metrics2/MetricVariableRatioGauge.java | 46 +
.../hive/common/metrics/MetricsTestUtils.java | 7 +-
.../metrics2/TestMetricVariableRatioGauge.java | 115 +
.../hive/beeline/TestBeelinePasswordOption.java | 328 +++
itests/qtest-accumulo/pom.xml | 7 +
.../test/resources/testconfiguration.properties | 4 +
.../hadoop/hive/accumulo/AccumuloQTestUtil.java | 12 +-
.../hadoop/hive/accumulo/AccumuloTestSetup.java | 19 +-
.../hive/cli/control/CoreAccumuloCliDriver.java | 3 +-
jdbc/pom.xml | 7 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 66 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 35 +-
.../org/apache/hadoop/hive/ql/hooks/Entity.java | 13 +-
.../apache/hadoop/hive/ql/hooks/ReadEntity.java | 2 +-
.../hadoop/hive/ql/hooks/WriteEntity.java | 6 +-
.../hive/ql/optimizer/SimpleFetchOptimizer.java | 115 +-
.../ql/optimizer/calcite/HiveCalciteUtil.java | 73 +
.../ql/optimizer/calcite/HiveRelFactories.java | 16 +-
.../calcite/cost/HiveAlgorithmsUtil.java | 1 -
.../calcite/cost/HiveOnTezCostModel.java | 28 +-
.../calcite/reloperators/HiveExcept.java | 43 +
.../calcite/reloperators/HiveIntersect.java | 43 +
.../calcite/rules/HiveExceptRewriteRule.java | 375 +++
.../calcite/rules/HiveIntersectMergeRule.java | 88 +
.../calcite/rules/HiveIntersectRewriteRule.java | 250 ++
.../HiveProjectOverIntersectRemoveRule.java | 67 +
.../rules/HiveSortLimitPullUpConstantsRule.java | 7 +-
.../calcite/translator/ASTConverter.java | 26 +-
.../stats/annotation/StatsRulesProcFactory.java | 34 +-
.../hadoop/hive/ql/parse/CalcitePlanner.java | 124 +-
.../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 4 +-
.../apache/hadoop/hive/ql/parse/HiveParser.g | 22 +-
.../hadoop/hive/ql/parse/IdentifiersParser.g | 18 +-
.../org/apache/hadoop/hive/ql/parse/QBExpr.java | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 54 +-
.../TestSQL11ReservedKeyWordsNegative.java | 23 +-
.../authorization_alter_drop_ptn.q | 11 +
.../clientnegative/authorization_export_ptn.q | 19 +
.../clientnegative/authorization_import_ptn.q | 23 +
.../clientnegative/authorization_truncate_2.q | 11 +
ql/src/test/queries/clientpositive/concat_op.q | 45 +
.../queries/clientpositive/double_precision.q | 2 -
ql/src/test/queries/clientpositive/except_all.q | 58 +
.../queries/clientpositive/except_distinct.q | 58 +
.../test/queries/clientpositive/intersect_all.q | 42 +
.../queries/clientpositive/intersect_distinct.q | 42 +
.../queries/clientpositive/intersect_merge.q | 27 +
.../test/queries/clientpositive/orc_ppd_basic.q | 6 +
.../queries/clientpositive/primitive_types.q | 11 +
.../queries/clientpositive/setop_no_distinct.q | 51 +
.../authorization_alter_drop_ptn.q.out | 18 +
.../authorization_export_ptn.q.out | 19 +
.../authorization_import_ptn.q.out | 34 +
.../authorization_truncate_2.q.out | 18 +
.../test/results/clientpositive/concat_op.q.out | 301 +++
.../clientpositive/current_date_timestamp.q.out | 376 ---
.../clientpositive/double_precision.q.out | 16 -
.../results/clientpositive/except_all.q.out | 986 ++++++++
.../llap/current_date_timestamp.q.out | 2390 ++++++++++++++++++
.../clientpositive/llap/except_distinct.q.out | 894 +++++++
.../clientpositive/llap/explainuser_4.q.out | 78 +-
.../clientpositive/llap/intersect_all.q.out | 1697 +++++++++++++
.../llap/intersect_distinct.q.out | 1292 ++++++++++
.../clientpositive/llap/intersect_merge.q.out | 1956 ++++++++++++++
.../clientpositive/llap/orc_ppd_basic.q.out | 237 +-
.../llap/orc_predicate_pushdown.q.out | 20 +-
.../llap/parquet_predicate_pushdown.q.out | 44 +-
.../llap/tez_dynpart_hashjoin_1.q.out | 120 +-
.../llap/tez_vector_dynpart_hashjoin_1.q.out | 122 +-
.../llap/vector_between_columns.q.out | 6 +-
.../clientpositive/llap/vector_between_in.q.out | 34 +-
.../results/clientpositive/perf/query12.q.out | 2 +-
.../results/clientpositive/perf/query13.q.out | 24 +-
.../results/clientpositive/perf/query20.q.out | 4 +-
.../results/clientpositive/perf/query21.q.out | 8 +-
.../results/clientpositive/perf/query22.q.out | 4 +-
.../results/clientpositive/perf/query25.q.out | 8 +-
.../results/clientpositive/perf/query28.q.out | 36 +-
.../results/clientpositive/perf/query29.q.out | 4 +-
.../results/clientpositive/perf/query32.q.out | 8 +-
.../results/clientpositive/perf/query34.q.out | 12 +-
.../results/clientpositive/perf/query40.q.out | 8 +-
.../results/clientpositive/perf/query48.q.out | 18 +-
.../results/clientpositive/perf/query51.q.out | 8 +-
.../results/clientpositive/perf/query54.q.out | 4 +-
.../results/clientpositive/perf/query58.q.out | 12 +-
.../results/clientpositive/perf/query64.q.out | 8 +-
.../results/clientpositive/perf/query65.q.out | 8 +-
.../results/clientpositive/perf/query66.q.out | 8 +-
.../results/clientpositive/perf/query67.q.out | 4 +-
.../results/clientpositive/perf/query68.q.out | 4 +-
.../results/clientpositive/perf/query70.q.out | 8 +-
.../results/clientpositive/perf/query73.q.out | 12 +-
.../results/clientpositive/perf/query79.q.out | 4 +-
.../results/clientpositive/perf/query80.q.out | 12 +-
.../results/clientpositive/perf/query82.q.out | 14 +-
.../results/clientpositive/perf/query85.q.out | 42 +-
.../results/clientpositive/perf/query87.q.out | 12 +-
.../results/clientpositive/perf/query90.q.out | 16 +-
.../results/clientpositive/perf/query94.q.out | 4 +-
.../results/clientpositive/perf/query95.q.out | 4 +-
.../results/clientpositive/perf/query97.q.out | 8 +-
.../results/clientpositive/perf/query98.q.out | 4 +-
.../clientpositive/primitive_types.q.out | 37 +
.../clientpositive/setop_no_distinct.q.out | 237 ++
.../spark/vector_between_in.q.out | 34 +-
.../clientpositive/tez/explainanalyze_4.q.out | 80 +-
.../results/clientpositive/udf_between.q.out | 8 +-
.../clientpositive/vector_between_columns.q.out | 6 +-
.../service/cli/session/SessionManager.java | 66 +
.../cli/session/TestSessionManagerMetrics.java | 265 +-
.../ptest2/conf/deployed/master-mr2.properties | 2 +-
.../resources/test-configuration2.properties | 2 +-
123 files changed, 13370 insertions(+), 1384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/423537a3/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 1f89f27,cfece77..7cf83d8
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@@ -4059,32 -4091,15 +4091,32 @@@ public class DDLTask extends Task<DDLWo
} else {
db.createTable(tbl, crtTbl.getIfNotExists());
}
- if ( crtTbl.isCTAS()) {
+ Long mmWriteId = crtTbl.getInitialMmWriteId();
+ if (crtTbl.isCTAS() || mmWriteId != null) {
Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName());
- DataContainer dc = new DataContainer(createdTable.getTTable());
- SessionState.get().getLineageState().setLineage(
- createdTable.getPath(), dc, createdTable.getCols()
- );
+ if (mmWriteId != null) {
+ // TODO# this would be retrieved via ACID before the query runs;
for now we rely on it
+ // being zero at start; we can't create a write ID before we
create the table here.
+ long initialWriteId = db.getNextTableWriteId(tbl.getDbName(),
tbl.getTableName());
+ if (initialWriteId != mmWriteId) {
+ throw new HiveException("Initial write ID mismatch - expected " +
mmWriteId
+ + " but got " + initialWriteId);
+ }
+ // CTAS create the table on a directory that already exists; import
creates the table
+ // first (in parallel with copies?), then commits after all the
loads.
+ if (crtTbl.isCTAS()) {
+ db.commitMmTableWrite(tbl, initialWriteId);
+ }
+ }
+ if (crtTbl.isCTAS()) {
+ DataContainer dc = new DataContainer(createdTable.getTTable());
+ SessionState.get().getLineageState().setLineage(
+ createdTable.getPath(), dc, createdTable.getCols()
+ );
+ }
}
}
- work.getOutputs().add(new WriteEntity(tbl,
WriteEntity.WriteType.DDL_NO_LOCK));
+ addIfAbsentByName(new WriteEntity(tbl,
WriteEntity.WriteType.DDL_NO_LOCK));
return 0;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/423537a3/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 99c52fa,8265af4..6e3ba98
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@@ -325,18 -357,13 +325,17 @@@ public class MoveTask extends Task<Move
DataContainer dc = null;
if (tbd.getPartitionSpec().size() == 0) {
dc = new DataContainer(table.getTTable());
+ Utilities.LOG14535.info("loadTable called from " +
tbd.getSourcePath() + " into " + tbd.getTable().getTableName());
+ if (tbd.isMmTable() && !tbd.isCommitMmWrite()) {
+ throw new HiveException(
+ "Only single-partition LoadTableDesc can skip commiting write
ID");
+ }
db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(),
tbd.getReplace(),
- work.isSrcLocal(), isSkewedStoredAsDirs(tbd),
- work.getLoadTableWork().getWriteType() !=
AcidUtils.Operation.NOT_ACID,
- hasFollowingStatsTask());
+ work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isAcid,
hasFollowingStatsTask(),
+ tbd.getMmWriteId());
if (work.getOutputs() != null) {
- work.getOutputs().add(new WriteEntity(table,
- (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
- WriteEntity.WriteType.INSERT)));
+ DDLTask.addIfAbsentByName(new WriteEntity(table,
+ getWriteType(tbd, work.getLoadTableWork().getWriteType())),
work.getOutputs());
}
} else {
LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
@@@ -383,231 -549,24 +382,249 @@@
return (1);
}
}
+
+ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc
tbd,
+ TaskInformation ti) throws HiveException, IOException,
InvalidOperationException {
+ List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
tbd.getPartitionSpec());
+ db.validatePartitionNameCharacters(partVals);
+ Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
+ + " into " + tbd.getTable().getTableName());
+ boolean isCommitMmWrite = tbd.isCommitMmWrite();
+ db.loadSinglePartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
+ tbd.getPartitionSpec(), tbd.getReplace(),
+ tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd),
work.isSrcLocal(),
+ (work.getLoadTableWork().getWriteType() !=
AcidUtils.Operation.NOT_ACID &&
+ work.getLoadTableWork().getWriteType() !=
AcidUtils.Operation.INSERT_ONLY),
+ hasFollowingStatsTask(), tbd.getMmWriteId(), isCommitMmWrite);
+ Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+
+ if (ti.bucketCols != null || ti.sortCols != null) {
+ updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
+ ti.numBuckets, ti.sortCols);
+ }
+
+ DataContainer dc = new DataContainer(table.getTTable(),
partn.getTPartition());
+ // add this partition to post-execution hook
+ if (work.getOutputs() != null) {
- work.getOutputs().add(new WriteEntity(partn,
- (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
- : WriteEntity.WriteType.INSERT)));
++ DDLTask.addIfAbsentByName(new WriteEntity(partn,
++ getWriteType(tbd, work.getLoadTableWork().getWriteType())),
work.getOutputs());
+ }
+ return dc;
+ }
+
+ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc
tbd,
+ TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
+ IOException, InvalidOperationException {
+ DataContainer dc;
+ List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf,
dpCtx);
+
+ // publish DP columns to its subscribers
+ if (dps != null && dps.size() > 0) {
+ pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
+ }
+ console.printInfo(System.getProperty("line.separator"));
+ long startTime = System.currentTimeMillis();
+ // load the list of DP partitions and return the list of partition specs
+ // TODO: In a follow-up to HIVE-1361, we should refactor
loadDynamicPartitions
+ // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
+ // After that check the number of DPs created to not exceed the limit and
+ // iterate over it and call loadPartition() here.
+ // The reason we don't do inside HIVE-1361 is the latter is large and we
+ // want to isolate any potential issue it may introduce.
+ if (tbd.isMmTable() && !tbd.isCommitMmWrite()) {
+ throw new HiveException("Only single-partition LoadTableDesc can skip
commiting write ID");
+ }
+ Map<Map<String, String>, Partition> dp =
+ db.loadDynamicPartitions(
+ tbd.getSourcePath(),
+ tbd.getTable().getTableName(),
+ tbd.getPartitionSpec(),
+ tbd.getReplace(),
+ dpCtx.getNumDPCols(),
+ (tbd.getLbCtx() == null) ? 0 :
tbd.getLbCtx().calculateListBucketingLevel(),
+ work.getLoadTableWork().getWriteType() !=
AcidUtils.Operation.NOT_ACID,
+ SessionState.get().getTxnMgr().getCurrentTxnId(),
hasFollowingStatsTask(),
+ work.getLoadTableWork().getWriteType(),
+ tbd.getMmWriteId());
+
+ String loadTime = "\t Time taken to load dynamic partitions: " +
+ (System.currentTimeMillis() - startTime)/1000.0 + " seconds";
+ console.printInfo(loadTime);
+ LOG.info(loadTime);
+
+ if (dp.size() == 0 &&
conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
+ throw new HiveException("This query creates no partitions." +
+ " To turn off this error, set
hive.error.on.empty.partition=false.");
+ }
+
+ startTime = System.currentTimeMillis();
+ // for each partition spec, get the partition
+ // and put it to WriteEntity for post-exec hook
+ for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
+ Partition partn = entry.getValue();
+
+ if (ti.bucketCols != null || ti.sortCols != null) {
+ updatePartitionBucketSortColumns(
+ db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
+ }
+
+ WriteEntity enty = new WriteEntity(partn,
- (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
- WriteEntity.WriteType.INSERT));
++ getWriteType(tbd, work.getLoadTableWork().getWriteType()));
+ if (work.getOutputs() != null) {
- work.getOutputs().add(enty);
++ DDLTask.addIfAbsentByName(enty, work.getOutputs());
+ }
+ // Need to update the queryPlan's output as well so that post-exec hook
get executed.
+ // This is only needed for dynamic partitioning since for SP the the
WriteEntity is
+ // constructed at compile time and the queryPlan already contains that.
+ // For DP, WriteEntity creation is deferred at this stage so we need to
update
+ // queryPlan here.
+ if (queryPlan.getOutputs() == null) {
+ queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
+ }
+ queryPlan.getOutputs().add(enty);
+
+ // update columnar lineage for each partition
+ dc = new DataContainer(table.getTTable(), partn.getTPartition());
+
+ // Don't set lineage on delete as we don't have all the columns
+ if (SessionState.get() != null &&
+ work.getLoadTableWork().getWriteType() !=
AcidUtils.Operation.DELETE &&
+ work.getLoadTableWork().getWriteType() !=
AcidUtils.Operation.UPDATE) {
+ SessionState.get().getLineageState().setLineage(tbd.getSourcePath(),
dc,
+ table.getCols());
+ }
+ LOG.info("\tLoading partition " + entry.getKey());
+ }
+ console.printInfo("\t Time taken for adding to write entity : " +
+ (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+ dc = null; // reset data container to prevent it being added again.
+ return dc;
+ }
+
+ private void inferTaskInformation(TaskInformation ti) {
+ // Find the first ancestor of this MoveTask which is some form of map
reduce task
+ // (Either standard, local, or a merge)
+ while (ti.task.getParentTasks() != null &&
ti.task.getParentTasks().size() == 1) {
+ ti.task = (Task)ti.task.getParentTasks().get(0);
+ // If it was a merge task or a local map reduce task, nothing can be
inferred
+ if (ti.task instanceof MergeFileTask || ti.task instanceof
MapredLocalTask) {
+ break;
+ }
+
+ // If it's a standard map reduce task, check what, if anything, it
inferred about
+ // the directory this move task is moving
+ if (ti.task instanceof MapRedTask) {
+ MapredWork work = (MapredWork)ti.task.getWork();
+ MapWork mapWork = work.getMapWork();
+ ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
+ ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
+ if (work.getReduceWork() != null) {
+ ti.numBuckets = work.getReduceWork().getNumReduceTasks();
+ }
+
+ if (ti.bucketCols != null || ti.sortCols != null) {
+ // This must be a final map reduce task (the task containing the
file sink
+ // operator that writes the final output)
+ assert work.isFinalMapRed();
+ }
+ break;
+ }
+
+ // If it's a move task, get the path the files were moved from, this is
what any
+ // preceding map reduce task inferred information about, and moving
does not invalidate
+ // those assumptions
+ // This can happen when a conditional merge is added before the final
MoveTask, but the
+ // condition for merging is not met, see GenMRFileSink1.
+ if (ti.task instanceof MoveTask) {
+ MoveTask mt = (MoveTask)ti.task;
+ if (mt.getWork().getLoadFileWork() != null) {
+ ti.path =
mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
+ }
+ }
+ }
+ }
+
+ private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
+ throws HiveException {
+ if (work.getCheckFileFormat()) {
+ // Get all files from the src directory
+ FileStatus[] dirs;
+ ArrayList<FileStatus> files;
+ FileSystem srcFs; // source filesystem
+ try {
+ srcFs = tbd.getSourcePath().getFileSystem(conf);
+ dirs = srcFs.globStatus(tbd.getSourcePath());
+ files = new ArrayList<FileStatus>();
+ for (int i = 0; (dirs != null && i < dirs.length); i++) {
+ files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER)));
+ // We only check one file, so exit the loop when we have at least
+ // one.
+ if (files.size() > 0) {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException(
+ "addFiles: filesystem error in check phase", e);
+ }
+
+ // handle file format check for table level
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
+ boolean flag = true;
+ // work.checkFileFormat is set to true only for Load Task, so
assumption here is
+ // dynamic partition context is null
+ if (tbd.getDPCtx() == null) {
+ if (tbd.getPartitionSpec() == null ||
tbd.getPartitionSpec().isEmpty()) {
+ // Check if the file format of the file matches that of the table.
+ flag = HiveFileFormatUtils.checkInputFormat(
+ srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
+ } else {
+ // Check if the file format of the file matches that of the
partition
+ Partition oldPart = db.getPartition(table,
tbd.getPartitionSpec(), false);
+ if (oldPart == null) {
+ // this means we have just created a table and are specifying
partition in the
+ // load statement (without pre-creating the partition), in
which case lets use
+ // table input format class. inheritTableSpecs defaults to true
so when a new
+ // partition is created later it will automatically inherit
input format
+ // from table object
+ flag = HiveFileFormatUtils.checkInputFormat(
+ srcFs, conf, tbd.getTable().getInputFileFormatClass(),
files);
+ } else {
+ flag = HiveFileFormatUtils.checkInputFormat(
+ srcFs, conf, oldPart.getInputFormatClass(), files);
+ }
+ }
+ if (!flag) {
+ throw new HiveException(
+ "Wrong file format. Please check the file's format.");
+ }
+ } else {
+ LOG.warn("Skipping file format check as dpCtx is not null");
+ }
+ }
+ }
+ }
+
++
+ /**
+ * so to make sure we crate WriteEntity with the right WriteType. This is
(at this point) only
+ * for consistency since LockManager (which is the only thing that pays
attention to WriteType)
+ * has done it's job before the query ran.
+ */
+ WriteEntity.WriteType getWriteType(LoadTableDesc tbd, AcidUtils.Operation
operation) {
+ if(tbd.getReplace()) {
+ return WriteEntity.WriteType.INSERT_OVERWRITE;
+ }
+ switch (operation) {
+ case DELETE:
+ return WriteEntity.WriteType.DELETE;
+ case UPDATE:
+ return WriteEntity.WriteType.UPDATE;
+ default:
+ return WriteEntity.WriteType.INSERT;
+ }
+ }
++
private boolean isSkewedStoredAsDirs(LoadTableDesc tbd) {
return (tbd.getLbCtx() == null) ? false : tbd.getLbCtx()
.isSkewedStoredAsDir();
http://git-wip-us.apache.org/repos/asf/hive/blob/423537a3/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 0d83abf,17dfd03..36c9049
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@@ -6608,9 -6660,43 +6624,8 @@@ public class SemanticAnalyzer extends B
setStatsForNonNativeTable(dest_tab);
}
- WriteEntity output = null;
-
- // Here only register the whole table for post-exec hook if no DP
present
- // in the case of DP, we will register WriteEntity in MoveTask when the
- // list of dynamically created partitions are known.
- if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
- output = new WriteEntity(dest_tab, determineWriteType(ltd,
isNonNativeTable));
- if (!outputs.add(output)) {
- throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
- .getMsg(dest_tab.getTableName()));
- }
- }
- if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
- // No static partition specified
- if (dpCtx.getNumSPCols() == 0) {
- output = new WriteEntity(dest_tab, determineWriteType(ltd,
isNonNativeTable), false);
- outputs.add(output);
- }
- // part of the partition specified
- // Create a DummyPartition in this case. Since, the metastore does
not store partial
- // partitions currently, we need to store dummy partitions
- else {
- try {
- String ppath = dpCtx.getSPPath();
- ppath = ppath.substring(0, ppath.length() - 1);
- DummyPartition p =
- new DummyPartition(dest_tab, dest_tab.getDbName()
- + "@" + dest_tab.getTableName() + "@" + ppath,
- partSpec);
- output = new WriteEntity(p, getWriteType(), false);
- outputs.add(output);
- } catch (HiveException e) {
- throw new SemanticException(e.getMessage(), e);
- }
- }
- }
-
+ WriteEntity output = generateTableWriteEntity(
+ dest_tab, partSpec, ltd, dpCtx, isNonNativeTable);
-
ctx.getLoadTableOutputMap().put(ltd, output);
break;
}
@@@ -7007,111 -7037,33 +7022,111 @@@
SessionState.get().getLineageState()
.mapDirToFop(tlocation, (FileSinkOperator) output);
}
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
- + dest_path + " row schema: " + inputRR.toString());
+ private WriteEntity generateTableWriteEntity(Table dest_tab,
+ Map<String, String> partSpec, LoadTableDesc ltd,
+ DynamicPartitionCtx dpCtx, boolean isNonNativeTable)
+ throws SemanticException {
+ WriteEntity output = null;
+
+ // Here only register the whole table for post-exec hook if no DP present
+ // in the case of DP, we will register WriteEntity in MoveTask when the
+ // list of dynamically created partitions are known.
+ if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
+ output = new WriteEntity(dest_tab, determineWriteType(ltd,
isNonNativeTable));
+ if (!outputs.add(output)) {
+ throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
+ .getMsg(dest_tab.getTableName()));
+ }
+ }
+ if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
+ // No static partition specified
+ if (dpCtx.getNumSPCols() == 0) {
+ output = new WriteEntity(dest_tab, determineWriteType(ltd,
isNonNativeTable), false);
+ outputs.add(output);
+ }
+ // part of the partition specified
+ // Create a DummyPartition in this case. Since, the metastore does not
store partial
+ // partitions currently, we need to store dummy partitions
+ else {
+ try {
+ String ppath = dpCtx.getSPPath();
+ ppath = ppath.substring(0, ppath.length() - 1);
+ DummyPartition p =
+ new DummyPartition(dest_tab, dest_tab.getDbName()
+ + "@" + dest_tab.getTableName() + "@" + ppath,
+ partSpec);
- output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false);
++ output = new WriteEntity(p, getWriteType(), false);
+ outputs.add(output);
+ } catch (HiveException e) {
+ throw new SemanticException(e.getMessage(), e);
+ }
+ }
}
+ return output;
+ }
- FileSinkOperator fso = (FileSinkOperator) output;
- fso.getConf().setTable(dest_tab);
- fsopToTable.put(fso, dest_tab);
- // the following code is used to collect column stats when
- // hive.stats.autogather=true
- // and it is an insert overwrite or insert into table
- if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
- && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
- && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
- if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
- genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input,
qb.getParseInfo()
- .isInsertIntoTable(dest_tab.getDbName(),
dest_tab.getTableName()));
- } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
- genAutoColumnStatsGatheringPipeline(qb, table_desc,
dest_part.getSpec(), input, qb
- .getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
dest_tab.getTableName()));
+ private void checkExternalTable(Table dest_tab) throws SemanticException {
+ if
((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
+ (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) {
+ throw new SemanticException(
+ ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
+ }
+ }
+ private void checkImmutableTable(QB qb, Table dest_tab, Path dest_path,
boolean isPart)
+ throws SemanticException {
+ // If the query here is an INSERT_INTO and the target is an immutable
table,
+ // verify that our destination is empty before proceeding
+ if (!dest_tab.isImmutable() || !qb.getParseInfo().isInsertIntoTable(
+ dest_tab.getDbName(), dest_tab.getTableName())) {
+ return;
+ }
+ try {
+ FileSystem fs = dest_path.getFileSystem(conf);
+ if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){
+ LOG.warn("Attempted write into an immutable table : "
+ + dest_tab.getTableName() + " : " + dest_path);
+ throw new SemanticException(
+
ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
}
+ } catch (IOException ioe) {
+ LOG.warn("Error while trying to determine if immutable table "
+ + (isPart ? "partition " : "") + "has any data : " +
dest_tab.getTableName()
+ + " : " + dest_path);
+ throw new
SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
}
- return output;
}
+ private DynamicPartitionCtx checkDynPart(QB qb, QBMetaData qbm, Table
dest_tab,
+ Map<String, String> partSpec, String dest) throws SemanticException {
+ List<FieldSchema> parts = dest_tab.getPartitionKeys();
+ if (parts == null || parts.isEmpty()) return null; // table is not
partitioned
+ if (partSpec == null || partSpec.size() == 0) { // user did NOT specify
partition
+ throw new
SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest),
+ ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
+ }
+ DynamicPartitionCtx dpCtx = qbm.getDPCtx(dest);
+ if (dpCtx == null) {
+ dest_tab.validatePartColumnNames(partSpec, false);
+ dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
+ conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
+ conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
+ qbm.setDPCtx(dest, dpCtx);
+ }
+
+ if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) {
// allow DP
+ throw new
SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest),
+ ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
+ }
+ if ((dest_tab.getNumBuckets() > 0)) {
+ dpCtx.setNumBuckets(dest_tab.getNumBuckets());
+ }
+ return dpCtx;
+ }
+
+
private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc
table_desc,
Map<String, String> partSpec, Operator curr, boolean isInsertInto)
throws SemanticException {
String tableName = table_desc.getTableName();