HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)

Conflicts:
        ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
        ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/423537a3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/423537a3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/423537a3

Branch: refs/heads/hive-14535
Commit: 423537a390e66667aec070356a55657a241f59b2
Parents: b60bbc2 8a6d818
Author: Sergey Shelukhin <[email protected]>
Authored: Tue Oct 25 14:03:47 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Tue Oct 25 14:03:47 2016 -0700

----------------------------------------------------------------------
 .../positive/accumulo_predicate_pushdown.q.out  |  131 +-
 .../results/positive/accumulo_queries.q.out     |  121 +-
 .../accumulo_single_sourced_multi_insert.q.out  |   39 +-
 .../java/org/apache/hive/beeline/BeeLine.java   |  184 +-
 .../java/org/apache/hive/beeline/Commands.java  |   23 +-
 .../hive/common/metrics/LegacyMetrics.java      |    6 +
 .../hive/common/metrics/common/Metrics.java     |   10 +
 .../common/metrics/common/MetricsConstant.java  |    5 +
 .../metrics/metrics2/CodahaleMetrics.java       |   17 +-
 .../metrics2/MetricVariableRatioGauge.java      |   46 +
 .../hive/common/metrics/MetricsTestUtils.java   |    7 +-
 .../metrics2/TestMetricVariableRatioGauge.java  |  115 +
 .../hive/beeline/TestBeelinePasswordOption.java |  328 +++
 itests/qtest-accumulo/pom.xml                   |    7 +
 .../test/resources/testconfiguration.properties |    4 +
 .../hadoop/hive/accumulo/AccumuloQTestUtil.java |   12 +-
 .../hadoop/hive/accumulo/AccumuloTestSetup.java |   19 +-
 .../hive/cli/control/CoreAccumuloCliDriver.java |    3 +-
 jdbc/pom.xml                                    |    7 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |    2 +
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   66 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   35 +-
 .../org/apache/hadoop/hive/ql/hooks/Entity.java |   13 +-
 .../apache/hadoop/hive/ql/hooks/ReadEntity.java |    2 +-
 .../hadoop/hive/ql/hooks/WriteEntity.java       |    6 +-
 .../hive/ql/optimizer/SimpleFetchOptimizer.java |  115 +-
 .../ql/optimizer/calcite/HiveCalciteUtil.java   |   73 +
 .../ql/optimizer/calcite/HiveRelFactories.java  |   16 +-
 .../calcite/cost/HiveAlgorithmsUtil.java        |    1 -
 .../calcite/cost/HiveOnTezCostModel.java        |   28 +-
 .../calcite/reloperators/HiveExcept.java        |   43 +
 .../calcite/reloperators/HiveIntersect.java     |   43 +
 .../calcite/rules/HiveExceptRewriteRule.java    |  375 +++
 .../calcite/rules/HiveIntersectMergeRule.java   |   88 +
 .../calcite/rules/HiveIntersectRewriteRule.java |  250 ++
 .../HiveProjectOverIntersectRemoveRule.java     |   67 +
 .../rules/HiveSortLimitPullUpConstantsRule.java |    7 +-
 .../calcite/translator/ASTConverter.java        |   26 +-
 .../stats/annotation/StatsRulesProcFactory.java |   34 +-
 .../hadoop/hive/ql/parse/CalcitePlanner.java    |  124 +-
 .../org/apache/hadoop/hive/ql/parse/HiveLexer.g |    4 +-
 .../apache/hadoop/hive/ql/parse/HiveParser.g    |   22 +-
 .../hadoop/hive/ql/parse/IdentifiersParser.g    |   18 +-
 .../org/apache/hadoop/hive/ql/parse/QBExpr.java |    2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   54 +-
 .../TestSQL11ReservedKeyWordsNegative.java      |   23 +-
 .../authorization_alter_drop_ptn.q              |   11 +
 .../clientnegative/authorization_export_ptn.q   |   19 +
 .../clientnegative/authorization_import_ptn.q   |   23 +
 .../clientnegative/authorization_truncate_2.q   |   11 +
 ql/src/test/queries/clientpositive/concat_op.q  |   45 +
 .../queries/clientpositive/double_precision.q   |    2 -
 ql/src/test/queries/clientpositive/except_all.q |   58 +
 .../queries/clientpositive/except_distinct.q    |   58 +
 .../test/queries/clientpositive/intersect_all.q |   42 +
 .../queries/clientpositive/intersect_distinct.q |   42 +
 .../queries/clientpositive/intersect_merge.q    |   27 +
 .../test/queries/clientpositive/orc_ppd_basic.q |    6 +
 .../queries/clientpositive/primitive_types.q    |   11 +
 .../queries/clientpositive/setop_no_distinct.q  |   51 +
 .../authorization_alter_drop_ptn.q.out          |   18 +
 .../authorization_export_ptn.q.out              |   19 +
 .../authorization_import_ptn.q.out              |   34 +
 .../authorization_truncate_2.q.out              |   18 +
 .../test/results/clientpositive/concat_op.q.out |  301 +++
 .../clientpositive/current_date_timestamp.q.out |  376 ---
 .../clientpositive/double_precision.q.out       |   16 -
 .../results/clientpositive/except_all.q.out     |  986 ++++++++
 .../llap/current_date_timestamp.q.out           | 2390 ++++++++++++++++++
 .../clientpositive/llap/except_distinct.q.out   |  894 +++++++
 .../clientpositive/llap/explainuser_4.q.out     |   78 +-
 .../clientpositive/llap/intersect_all.q.out     | 1697 +++++++++++++
 .../llap/intersect_distinct.q.out               | 1292 ++++++++++
 .../clientpositive/llap/intersect_merge.q.out   | 1956 ++++++++++++++
 .../clientpositive/llap/orc_ppd_basic.q.out     |  237 +-
 .../llap/orc_predicate_pushdown.q.out           |   20 +-
 .../llap/parquet_predicate_pushdown.q.out       |   44 +-
 .../llap/tez_dynpart_hashjoin_1.q.out           |  120 +-
 .../llap/tez_vector_dynpart_hashjoin_1.q.out    |  122 +-
 .../llap/vector_between_columns.q.out           |    6 +-
 .../clientpositive/llap/vector_between_in.q.out |   34 +-
 .../results/clientpositive/perf/query12.q.out   |    2 +-
 .../results/clientpositive/perf/query13.q.out   |   24 +-
 .../results/clientpositive/perf/query20.q.out   |    4 +-
 .../results/clientpositive/perf/query21.q.out   |    8 +-
 .../results/clientpositive/perf/query22.q.out   |    4 +-
 .../results/clientpositive/perf/query25.q.out   |    8 +-
 .../results/clientpositive/perf/query28.q.out   |   36 +-
 .../results/clientpositive/perf/query29.q.out   |    4 +-
 .../results/clientpositive/perf/query32.q.out   |    8 +-
 .../results/clientpositive/perf/query34.q.out   |   12 +-
 .../results/clientpositive/perf/query40.q.out   |    8 +-
 .../results/clientpositive/perf/query48.q.out   |   18 +-
 .../results/clientpositive/perf/query51.q.out   |    8 +-
 .../results/clientpositive/perf/query54.q.out   |    4 +-
 .../results/clientpositive/perf/query58.q.out   |   12 +-
 .../results/clientpositive/perf/query64.q.out   |    8 +-
 .../results/clientpositive/perf/query65.q.out   |    8 +-
 .../results/clientpositive/perf/query66.q.out   |    8 +-
 .../results/clientpositive/perf/query67.q.out   |    4 +-
 .../results/clientpositive/perf/query68.q.out   |    4 +-
 .../results/clientpositive/perf/query70.q.out   |    8 +-
 .../results/clientpositive/perf/query73.q.out   |   12 +-
 .../results/clientpositive/perf/query79.q.out   |    4 +-
 .../results/clientpositive/perf/query80.q.out   |   12 +-
 .../results/clientpositive/perf/query82.q.out   |   14 +-
 .../results/clientpositive/perf/query85.q.out   |   42 +-
 .../results/clientpositive/perf/query87.q.out   |   12 +-
 .../results/clientpositive/perf/query90.q.out   |   16 +-
 .../results/clientpositive/perf/query94.q.out   |    4 +-
 .../results/clientpositive/perf/query95.q.out   |    4 +-
 .../results/clientpositive/perf/query97.q.out   |    8 +-
 .../results/clientpositive/perf/query98.q.out   |    4 +-
 .../clientpositive/primitive_types.q.out        |   37 +
 .../clientpositive/setop_no_distinct.q.out      |  237 ++
 .../spark/vector_between_in.q.out               |   34 +-
 .../clientpositive/tez/explainanalyze_4.q.out   |   80 +-
 .../results/clientpositive/udf_between.q.out    |    8 +-
 .../clientpositive/vector_between_columns.q.out |    6 +-
 .../service/cli/session/SessionManager.java     |   66 +
 .../cli/session/TestSessionManagerMetrics.java  |  265 +-
 .../ptest2/conf/deployed/master-mr2.properties  |    2 +-
 .../resources/test-configuration2.properties    |    2 +-
 123 files changed, 13370 insertions(+), 1384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/423537a3/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 1f89f27,cfece77..7cf83d8
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@@ -4059,32 -4091,15 +4091,32 @@@ public class DDLTask extends Task<DDLWo
        } else {
          db.createTable(tbl, crtTbl.getIfNotExists());
        }
 -      if ( crtTbl.isCTAS()) {
 +      Long mmWriteId = crtTbl.getInitialMmWriteId();
 +      if (crtTbl.isCTAS() || mmWriteId != null) {
          Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName());
 -        DataContainer dc = new DataContainer(createdTable.getTTable());
 -        SessionState.get().getLineageState().setLineage(
 -                createdTable.getPath(), dc, createdTable.getCols()
 -        );
 +        if (mmWriteId != null) {
 +          // TODO# this would be retrieved via ACID before the query runs; 
for now we rely on it
 +          //       being zero at start; we can't create a write ID before we 
create the table here.
 +          long initialWriteId = db.getNextTableWriteId(tbl.getDbName(), 
tbl.getTableName());
 +          if (initialWriteId != mmWriteId) {
 +            throw new HiveException("Initial write ID mismatch - expected " + 
mmWriteId
 +                + " but got " + initialWriteId);
 +          }
 +          // CTAS create the table on a directory that already exists; import 
creates the table
 +          // first  (in parallel with copies?), then commits after all the 
loads.
 +          if (crtTbl.isCTAS()) {
 +            db.commitMmTableWrite(tbl, initialWriteId);
 +          }
 +        }
 +        if (crtTbl.isCTAS()) {
 +          DataContainer dc = new DataContainer(createdTable.getTTable());
 +          SessionState.get().getLineageState().setLineage(
 +                  createdTable.getPath(), dc, createdTable.getCols()
 +          );
 +        }
        }
      }
-     work.getOutputs().add(new WriteEntity(tbl, 
WriteEntity.WriteType.DDL_NO_LOCK));
+     addIfAbsentByName(new WriteEntity(tbl, 
WriteEntity.WriteType.DDL_NO_LOCK));
      return 0;
    }
  

http://git-wip-us.apache.org/repos/asf/hive/blob/423537a3/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 99c52fa,8265af4..6e3ba98
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@@ -325,18 -357,13 +325,17 @@@ public class MoveTask extends Task<Move
          DataContainer dc = null;
          if (tbd.getPartitionSpec().size() == 0) {
            dc = new DataContainer(table.getTTable());
 +          Utilities.LOG14535.info("loadTable called from " + 
tbd.getSourcePath() + " into " + tbd.getTable().getTableName());
 +          if (tbd.isMmTable() && !tbd.isCommitMmWrite()) {
 +            throw new HiveException(
 +                "Only single-partition LoadTableDesc can skip commiting write 
ID");
 +          }
            db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), 
tbd.getReplace(),
 -              work.isSrcLocal(), isSkewedStoredAsDirs(tbd),
 -              work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID,
 -              hasFollowingStatsTask());
 +              work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isAcid, 
hasFollowingStatsTask(),
 +              tbd.getMmWriteId());
            if (work.getOutputs() != null) {
-             work.getOutputs().add(new WriteEntity(table,
-                 (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
-                 WriteEntity.WriteType.INSERT)));
+             DDLTask.addIfAbsentByName(new WriteEntity(table,
+               getWriteType(tbd, work.getLoadTableWork().getWriteType())), 
work.getOutputs());
            }
          } else {
            LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
@@@ -383,231 -549,24 +382,249 @@@
        return (1);
      }
    }
 +
 +  private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc 
tbd,
 +      TaskInformation ti) throws HiveException, IOException, 
InvalidOperationException {
 +    List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),  
tbd.getPartitionSpec());
 +    db.validatePartitionNameCharacters(partVals);
 +    Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
 +        + " into " + tbd.getTable().getTableName());
 +    boolean isCommitMmWrite = tbd.isCommitMmWrite();
 +    db.loadSinglePartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(), tbd.getReplace(),
 +        tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), 
work.isSrcLocal(),
 +        (work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID &&
 +         work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.INSERT_ONLY),
 +        hasFollowingStatsTask(), tbd.getMmWriteId(), isCommitMmWrite);
 +    Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 +
 +    if (ti.bucketCols != null || ti.sortCols != null) {
 +      updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
 +          ti.numBuckets, ti.sortCols);
 +    }
 +
 +    DataContainer dc = new DataContainer(table.getTTable(), 
partn.getTPartition());
 +    // add this partition to post-execution hook
 +    if (work.getOutputs() != null) {
-       work.getOutputs().add(new WriteEntity(partn,
-           (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
-               : WriteEntity.WriteType.INSERT)));
++      DDLTask.addIfAbsentByName(new WriteEntity(partn,
++        getWriteType(tbd, work.getLoadTableWork().getWriteType())), 
work.getOutputs());
 +    }
 +    return dc;
 +  }
 +
 +  private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc 
tbd,
 +      TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
 +      IOException, InvalidOperationException {
 +    DataContainer dc;
 +    List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, 
dpCtx);
 +
 +    // publish DP columns to its subscribers
 +    if (dps != null && dps.size() > 0) {
 +      pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
 +    }
 +    console.printInfo(System.getProperty("line.separator"));
 +    long startTime = System.currentTimeMillis();
 +    // load the list of DP partitions and return the list of partition specs
 +    // TODO: In a follow-up to HIVE-1361, we should refactor 
loadDynamicPartitions
 +    // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
 +    // After that check the number of DPs created to not exceed the limit and
 +    // iterate over it and call loadPartition() here.
 +    // The reason we don't do inside HIVE-1361 is the latter is large and we
 +    // want to isolate any potential issue it may introduce.
 +    if (tbd.isMmTable() && !tbd.isCommitMmWrite()) {
 +      throw new HiveException("Only single-partition LoadTableDesc can skip 
commiting write ID");
 +    }
 +    Map<Map<String, String>, Partition> dp =
 +      db.loadDynamicPartitions(
 +        tbd.getSourcePath(),
 +        tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(),
 +        tbd.getReplace(),
 +        dpCtx.getNumDPCols(),
 +        (tbd.getLbCtx() == null) ? 0 : 
tbd.getLbCtx().calculateListBucketingLevel(),
 +        work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID,
 +        SessionState.get().getTxnMgr().getCurrentTxnId(), 
hasFollowingStatsTask(),
 +        work.getLoadTableWork().getWriteType(),
 +        tbd.getMmWriteId());
 +
 +    String loadTime = "\t Time taken to load dynamic partitions: "  +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds";
 +    console.printInfo(loadTime);
 +    LOG.info(loadTime);
 +
 +    if (dp.size() == 0 && 
conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
 +      throw new HiveException("This query creates no partitions." +
 +          " To turn off this error, set 
hive.error.on.empty.partition=false.");
 +    }
 +
 +    startTime = System.currentTimeMillis();
 +    // for each partition spec, get the partition
 +    // and put it to WriteEntity for post-exec hook
 +    for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
 +      Partition partn = entry.getValue();
 +
 +      if (ti.bucketCols != null || ti.sortCols != null) {
 +        updatePartitionBucketSortColumns(
 +            db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
 +      }
 +
 +      WriteEntity enty = new WriteEntity(partn,
-           (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
-               WriteEntity.WriteType.INSERT));
++        getWriteType(tbd, work.getLoadTableWork().getWriteType()));
 +      if (work.getOutputs() != null) {
-         work.getOutputs().add(enty);
++        DDLTask.addIfAbsentByName(enty, work.getOutputs());
 +      }
 +      // Need to update the queryPlan's output as well so that post-exec hook 
get executed.
 +      // This is only needed for dynamic partitioning since for SP the the 
WriteEntity is
 +      // constructed at compile time and the queryPlan already contains that.
 +      // For DP, WriteEntity creation is deferred at this stage so we need to 
update
 +      // queryPlan here.
 +      if (queryPlan.getOutputs() == null) {
 +        queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
 +      }
 +      queryPlan.getOutputs().add(enty);
 +
 +      // update columnar lineage for each partition
 +      dc = new DataContainer(table.getTTable(), partn.getTPartition());
 +
 +      // Don't set lineage on delete as we don't have all the columns
 +      if (SessionState.get() != null &&
 +          work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.DELETE &&
 +          work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.UPDATE) {
 +        SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), 
dc,
 +            table.getCols());
 +      }
 +      LOG.info("\tLoading partition " + entry.getKey());
 +    }
 +    console.printInfo("\t Time taken for adding to write entity : " +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
 +    dc = null; // reset data container to prevent it being added again.
 +    return dc;
 +  }
 +
 +  private void inferTaskInformation(TaskInformation ti) {
 +    // Find the first ancestor of this MoveTask which is some form of map 
reduce task
 +    // (Either standard, local, or a merge)
 +    while (ti.task.getParentTasks() != null && 
ti.task.getParentTasks().size() == 1) {
 +      ti.task = (Task)ti.task.getParentTasks().get(0);
 +      // If it was a merge task or a local map reduce task, nothing can be 
inferred
 +      if (ti.task instanceof MergeFileTask || ti.task instanceof 
MapredLocalTask) {
 +        break;
 +      }
 +
 +      // If it's a standard map reduce task, check what, if anything, it 
inferred about
 +      // the directory this move task is moving
 +      if (ti.task instanceof MapRedTask) {
 +        MapredWork work = (MapredWork)ti.task.getWork();
 +        MapWork mapWork = work.getMapWork();
 +        ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
 +        ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
 +        if (work.getReduceWork() != null) {
 +          ti.numBuckets = work.getReduceWork().getNumReduceTasks();
 +        }
 +
 +        if (ti.bucketCols != null || ti.sortCols != null) {
 +          // This must be a final map reduce task (the task containing the 
file sink
 +          // operator that writes the final output)
 +          assert work.isFinalMapRed();
 +        }
 +        break;
 +      }
 +
 +      // If it's a move task, get the path the files were moved from, this is 
what any
 +      // preceding map reduce task inferred information about, and moving 
does not invalidate
 +      // those assumptions
 +      // This can happen when a conditional merge is added before the final 
MoveTask, but the
 +      // condition for merging is not met, see GenMRFileSink1.
 +      if (ti.task instanceof MoveTask) {
 +        MoveTask mt = (MoveTask)ti.task;
 +        if (mt.getWork().getLoadFileWork() != null) {
 +          ti.path = 
mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
 +        }
 +      }
 +    }
 +  }
 +
 +  private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
 +      throws HiveException {
 +    if (work.getCheckFileFormat()) {
 +      // Get all files from the src directory
 +      FileStatus[] dirs;
 +      ArrayList<FileStatus> files;
 +      FileSystem srcFs; // source filesystem
 +      try {
 +        srcFs = tbd.getSourcePath().getFileSystem(conf);
 +        dirs = srcFs.globStatus(tbd.getSourcePath());
 +        files = new ArrayList<FileStatus>();
 +        for (int i = 0; (dirs != null && i < dirs.length); i++) {
 +          files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER)));
 +          // We only check one file, so exit the loop when we have at least
 +          // one.
 +          if (files.size() > 0) {
 +            break;
 +          }
 +        }
 +      } catch (IOException e) {
 +        throw new HiveException(
 +            "addFiles: filesystem error in check phase", e);
 +      }
 +
 +      // handle file format check for table level
 +      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
 +        boolean flag = true;
 +        // work.checkFileFormat is set to true only for Load Task, so 
assumption here is
 +        // dynamic partition context is null
 +        if (tbd.getDPCtx() == null) {
 +          if (tbd.getPartitionSpec() == null || 
tbd.getPartitionSpec().isEmpty()) {
 +            // Check if the file format of the file matches that of the table.
 +            flag = HiveFileFormatUtils.checkInputFormat(
 +                srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
 +          } else {
 +            // Check if the file format of the file matches that of the 
partition
 +            Partition oldPart = db.getPartition(table, 
tbd.getPartitionSpec(), false);
 +            if (oldPart == null) {
 +              // this means we have just created a table and are specifying 
partition in the
 +              // load statement (without pre-creating the partition), in 
which case lets use
 +              // table input format class. inheritTableSpecs defaults to true 
so when a new
 +              // partition is created later it will automatically inherit 
input format
 +              // from table object
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, tbd.getTable().getInputFileFormatClass(), 
files);
 +            } else {
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, oldPart.getInputFormatClass(), files);
 +            }
 +          }
 +          if (!flag) {
 +            throw new HiveException(
 +                "Wrong file format. Please check the file's format.");
 +          }
 +        } else {
 +          LOG.warn("Skipping file format check as dpCtx is not null");
 +        }
 +      }
 +    }
 +  }
 +
++
+   /**
+    * so to make sure we crate WriteEntity with the right WriteType.  This is 
(at this point) only
+    * for consistency since LockManager (which is the only thing that pays 
attention to WriteType)
+    * has done it's job before the query ran.
+    */
+   WriteEntity.WriteType getWriteType(LoadTableDesc tbd, AcidUtils.Operation 
operation) {
+     if(tbd.getReplace()) {
+       return WriteEntity.WriteType.INSERT_OVERWRITE;
+     }
+     switch (operation) {
+       case DELETE:
+         return WriteEntity.WriteType.DELETE;
+       case UPDATE:
+         return WriteEntity.WriteType.UPDATE;
+       default:
+         return WriteEntity.WriteType.INSERT;
+     }
+   }
++
    private boolean isSkewedStoredAsDirs(LoadTableDesc tbd) {
      return (tbd.getLbCtx() == null) ? false : tbd.getLbCtx()
          .isSkewedStoredAsDir();

http://git-wip-us.apache.org/repos/asf/hive/blob/423537a3/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 0d83abf,17dfd03..36c9049
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@@ -6608,9 -6660,43 +6624,8 @@@ public class SemanticAnalyzer extends B
          setStatsForNonNativeTable(dest_tab);
        }
  
 -      WriteEntity output = null;
 -
 -      // Here only register the whole table for post-exec hook if no DP 
present
 -      // in the case of DP, we will register WriteEntity in MoveTask when the
 -      // list of dynamically created partitions are known.
 -      if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
 -        output = new WriteEntity(dest_tab, determineWriteType(ltd, 
isNonNativeTable));
 -        if (!outputs.add(output)) {
 -          throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
 -              .getMsg(dest_tab.getTableName()));
 -        }
 -      }
 -      if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
 -        // No static partition specified
 -        if (dpCtx.getNumSPCols() == 0) {
 -          output = new WriteEntity(dest_tab, determineWriteType(ltd, 
isNonNativeTable), false);
 -          outputs.add(output);
 -        }
 -        // part of the partition specified
 -        // Create a DummyPartition in this case. Since, the metastore does 
not store partial
 -        // partitions currently, we need to store dummy partitions
 -        else {
 -          try {
 -            String ppath = dpCtx.getSPPath();
 -            ppath = ppath.substring(0, ppath.length() - 1);
 -            DummyPartition p =
 -                new DummyPartition(dest_tab, dest_tab.getDbName()
 -                    + "@" + dest_tab.getTableName() + "@" + ppath,
 -                    partSpec);
 -            output = new WriteEntity(p, getWriteType(), false);
 -            outputs.add(output);
 -          } catch (HiveException e) {
 -            throw new SemanticException(e.getMessage(), e);
 -          }
 -        }
 -      }
 -
 +      WriteEntity output = generateTableWriteEntity(
 +          dest_tab, partSpec, ltd, dpCtx, isNonNativeTable);
- 
        ctx.getLoadTableOutputMap().put(ltd, output);
        break;
      }
@@@ -7007,111 -7037,33 +7022,111 @@@
        SessionState.get().getLineageState()
                .mapDirToFop(tlocation, (FileSinkOperator) output);
      }
 +  }
  
 -    if (LOG.isDebugEnabled()) {
 -      LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
 -          + dest_path + " row schema: " + inputRR.toString());
 +  private WriteEntity generateTableWriteEntity(Table dest_tab,
 +      Map<String, String> partSpec, LoadTableDesc ltd,
 +      DynamicPartitionCtx dpCtx, boolean isNonNativeTable)
 +      throws SemanticException {
 +    WriteEntity output = null;
 +
 +    // Here only register the whole table for post-exec hook if no DP present
 +    // in the case of DP, we will register WriteEntity in MoveTask when the
 +    // list of dynamically created partitions are known.
 +    if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
 +      output = new WriteEntity(dest_tab, determineWriteType(ltd, 
isNonNativeTable));
 +      if (!outputs.add(output)) {
 +        throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
 +            .getMsg(dest_tab.getTableName()));
 +      }
 +    }
 +    if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
 +      // No static partition specified
 +      if (dpCtx.getNumSPCols() == 0) {
 +        output = new WriteEntity(dest_tab, determineWriteType(ltd, 
isNonNativeTable), false);
 +        outputs.add(output);
 +      }
 +      // part of the partition specified
 +      // Create a DummyPartition in this case. Since, the metastore does not 
store partial
 +      // partitions currently, we need to store dummy partitions
 +      else {
 +        try {
 +          String ppath = dpCtx.getSPPath();
 +          ppath = ppath.substring(0, ppath.length() - 1);
 +          DummyPartition p =
 +              new DummyPartition(dest_tab, dest_tab.getDbName()
 +                  + "@" + dest_tab.getTableName() + "@" + ppath,
 +                  partSpec);
-           output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false);
++          output = new WriteEntity(p, getWriteType(), false);
 +          outputs.add(output);
 +        } catch (HiveException e) {
 +          throw new SemanticException(e.getMessage(), e);
 +        }
 +      }
      }
 +    return output;
 +  }
  
 -    FileSinkOperator fso = (FileSinkOperator) output;
 -    fso.getConf().setTable(dest_tab);
 -    fsopToTable.put(fso, dest_tab);
 -    // the following code is used to collect column stats when
 -    // hive.stats.autogather=true
 -    // and it is an insert overwrite or insert into table
 -    if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
 -        && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
 -        && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
 -      if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
 -        genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, 
qb.getParseInfo()
 -            .isInsertIntoTable(dest_tab.getDbName(), 
dest_tab.getTableName()));
 -      } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
 -        genAutoColumnStatsGatheringPipeline(qb, table_desc, 
dest_part.getSpec(), input, qb
 -            .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), 
dest_tab.getTableName()));
 +  private void checkExternalTable(Table dest_tab) throws SemanticException {
 +    if 
((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
 +        (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) {
 +      throw new SemanticException(
 +          ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
 +    }
 +  }
  
 +  private void checkImmutableTable(QB qb, Table dest_tab, Path dest_path, 
boolean isPart)
 +      throws SemanticException {
 +    // If the query here is an INSERT_INTO and the target is an immutable 
table,
 +    // verify that our destination is empty before proceeding
 +    if (!dest_tab.isImmutable() || !qb.getParseInfo().isInsertIntoTable(
 +        dest_tab.getDbName(), dest_tab.getTableName())) {
 +      return;
 +    }
 +    try {
 +      FileSystem fs = dest_path.getFileSystem(conf);
 +      if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){
 +        LOG.warn("Attempted write into an immutable table : "
 +            + dest_tab.getTableName() + " : " + dest_path);
 +        throw new SemanticException(
 +            
ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
        }
 +    } catch (IOException ioe) {
 +        LOG.warn("Error while trying to determine if immutable table "
 +            + (isPart ? "partition " : "") + "has any data : "  + 
dest_tab.getTableName()
 +            + " : " + dest_path);
 +      throw new 
SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
      }
 -    return output;
    }
  
 +  private DynamicPartitionCtx checkDynPart(QB qb, QBMetaData qbm, Table 
dest_tab,
 +      Map<String, String> partSpec, String dest) throws SemanticException {
 +    List<FieldSchema> parts = dest_tab.getPartitionKeys();
 +    if (parts == null || parts.isEmpty()) return null; // table is not 
partitioned
 +    if (partSpec == null || partSpec.size() == 0) { // user did NOT specify 
partition
 +      throw new 
SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest),
 +          ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
 +    }
 +    DynamicPartitionCtx dpCtx = qbm.getDPCtx(dest);
 +    if (dpCtx == null) {
 +      dest_tab.validatePartColumnNames(partSpec, false);
 +      dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
 +          conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
 +          conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
 +      qbm.setDPCtx(dest, dpCtx);
 +    }
 +
 +    if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { 
// allow DP
 +      throw new 
SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest),
 +          ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
 +    }
 +    if ((dest_tab.getNumBuckets() > 0)) {
 +      dpCtx.setNumBuckets(dest_tab.getNumBuckets());
 +    }
 +    return dpCtx;
 +  }
 +
 +
    private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc 
table_desc,
        Map<String, String> partSpec, Operator curr, boolean isInsertInto) 
throws SemanticException {
      String tableName = table_desc.getTableName();

Reply via email to