Repository: hive
Updated Branches:
  refs/heads/master 844ec3431 -> 92f764e05


http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
new file mode 100644
index 0000000..e9b8711
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
@@ -0,0 +1,73 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.stripQuotes;
+
+public class LoadFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LoadFunction.class);
+  private Context context;
+  private final FunctionEvent event;
+  private final String dbNameToLoadIn;
+  private final TaskTracker tracker;
+
+  public LoadFunction(Context context, FunctionEvent event, String 
dbNameToLoadIn,
+      TaskTracker existingTracker) {
+    this.context = context;
+    this.event = event;
+    this.dbNameToLoadIn = dbNameToLoadIn;
+    this.tracker = new TaskTracker(existingTracker);
+  }
+
+  public TaskTracker tasks() throws IOException, SemanticException {
+    URI fromURI = EximUtil
+        .getValidatedURI(context.hiveConf, 
stripQuotes(event.rootDir().toUri().toString()));
+    Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), 
fromURI.getPath());
+
+    try {
+      CreateFunctionHandler handler = new CreateFunctionHandler();
+      List<Task<? extends Serializable>> tasks = handler.handle(
+          new MessageHandler.Context(
+              dbNameToLoadIn, null, fromPath.toString(), null, null, 
context.hiveConf,
+              context.hiveDb, null, LOG)
+      );
+      tasks.forEach(tracker::addTask);
+      return tracker;
+    } catch (Exception e) {
+      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
new file mode 100644
index 0000000..48ffa77
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
@@ -0,0 +1,58 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
+
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+
+import java.io.Serializable;
+
+public class ReplicationState implements Serializable {
+
+  public static class PartitionState {
+    final String tableName;
+    public final AddPartitionDesc lastReplicatedPartition;
+
+    public PartitionState(String tableName, AddPartitionDesc 
lastReplicatedPartition) {
+      this.tableName = tableName;
+      this.lastReplicatedPartition = lastReplicatedPartition;
+    }
+  }
+
+  // null :: for non - partitioned table.
+  public final PartitionState partitionState;
+  // for non partitioned table this will represent the last tableName 
replicated, else its the name of the
+  // current partitioned table with last partition replicated denoted by 
"lastPartitionReplicated"
+  public final String lastTableReplicated;
+  // last function name is replicated, null if function replication was in 
progress when we created this state.
+  public final String functionName;
+
+  public ReplicationState(PartitionState partitionState) {
+    this.partitionState = partitionState;
+    this.functionName = null;
+    this.lastTableReplicated = null;
+  }
+
+  @Override
+  public String toString() {
+    return "ReplicationState{" +
+        ", partitionState=" + partitionState +
+        ", lastTableReplicated='" + lastTableReplicated + '\'' +
+        ", functionName='" + functionName + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
new file mode 100644
index 0000000..f246b8a
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
@@ -0,0 +1,113 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class will be responsible to track how many tasks have been created,
+ * organization of tasks such that after the number of tasks for next 
execution are created
+ * we create a dependency collection task(DCT) -> another bootstrap task,
+ * and then add DCT as dependent to all existing tasks that are created so the 
cycle can continue.
+ */
+public class TaskTracker {
+  private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class);
+  /**
+   * used to identify the list of tasks at root level for a given level like 
table /  db / partition.
+   * this does not include the task dependency notion of "table tasks < ---- 
partition task"
+   */
+  private final List<Task<? extends Serializable>> tasks = new ArrayList<>();
+  private ReplicationState replicationState = null;
+  // since tasks themselves can be graphs we want to limit the number of 
created
+  // tasks including all of dependencies.
+  private int numberOfTasks = 0;
+  private final int maxTasksAllowed;
+
+  public TaskTracker(int defaultMaxTasks) {
+    maxTasksAllowed = defaultMaxTasks;
+  }
+
+  public TaskTracker(TaskTracker existing) {
+    maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks;
+  }
+
+  /**
+   * this method is used to identify all the tasks in a graph.
+   * the graph however might get created in a disjoint fashion, in which case 
we can just update
+   * the number of tasks using the "update" method.
+   */
+  public void addTask(Task<? extends Serializable> task) {
+    tasks.add(task);
+    updateTaskCount(task);
+  }
+
+  private void updateTaskCount(Task<? extends Serializable> task) {
+    numberOfTasks += 1;
+    if (task.getChildTasks() != null) {
+      for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+        updateTaskCount(childTask);
+      }
+    }
+  }
+
+  public boolean canAddMoreTasks() {
+    return numberOfTasks < maxTasksAllowed;
+  }
+
+  public boolean hasTasks() {
+    return numberOfTasks != 0;
+  }
+
+  public void update(TaskTracker withAnother) {
+    numberOfTasks += withAnother.numberOfTasks;
+    if (withAnother.hasReplicationState()) {
+      this.replicationState = withAnother.replicationState;
+    }
+  }
+
+  public void setReplicationState(ReplicationState state) {
+    this.replicationState = state;
+  }
+
+  public boolean hasReplicationState() {
+    return replicationState != null;
+  }
+
+  public ReplicationState replicationState() {
+    return replicationState;
+  }
+
+  public List<Task<? extends Serializable>> tasks() {
+    return tasks;
+  }
+
+  public void debugLog(String forEventType) {
+    LOG.debug("{} event with total / root number of tasks:{}/{}", 
forEventType, numberOfTasks,
+        tasks.size());
+  }
+
+  public int numberOfTasks() {
+    return numberOfTasks;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
new file mode 100644
index 0000000..f088ba9
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -0,0 +1,283 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState;
+import static 
org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
+import static 
org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.partSpecToString;
+
+public class LoadPartitions {
+  private static Logger LOG = LoggerFactory.getLogger(LoadPartitions.class);
+
+  private final Context context;
+  private final TableContext tableContext;
+  private final TableEvent event;
+  private final TaskTracker tracker;
+  private final AddPartitionDesc lastReplicatedPartition;
+
+  private final ImportTableDesc tableDesc;
+  private Table table;
+
+  public LoadPartitions(Context context, TaskTracker tableTracker, TableEvent 
event,
+      String dbNameToLoadIn, TableContext tableContext) throws HiveException, 
IOException {
+    this(context, tableContext, tableTracker, event, dbNameToLoadIn, null);
+  }
+
+  public LoadPartitions(Context context, TableContext tableContext, 
TaskTracker limiter,
+      TableEvent event, String dbNameToLoadIn, AddPartitionDesc 
lastReplicatedPartition)
+      throws HiveException, IOException {
+    this.tracker = new TaskTracker(limiter);
+    this.event = event;
+    this.context = context;
+    this.lastReplicatedPartition = lastReplicatedPartition;
+    this.tableContext = tableContext;
+
+    this.tableDesc = 
tableContext.overrideProperties(event.tableDesc(dbNameToLoadIn));
+    this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, 
context.hiveDb);
+  }
+
+  private String location() throws MetaException, HiveException {
+    Database parentDb = 
context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+    if (!tableContext.waitOnPrecursor()) {
+      return context.warehouse.getDefaultTablePath(parentDb, 
tableDesc.getTableName()).toString();
+    } else {
+      Path tablePath = new Path(
+          
context.warehouse.getDefaultDatabasePath(tableDesc.getDatabaseName()),
+          
MetaStoreUtils.encodeTableName(tableDesc.getTableName().toLowerCase())
+      );
+      return context.warehouse.getDnsPath(tablePath).toString();
+    }
+  }
+
+  public TaskTracker tasks() throws SemanticException {
+    try {
+      /*
+      We are doing this both in load table and load partitions
+       */
+      if (tableDesc.getLocation() == null) {
+        tableDesc.setLocation(location());
+      }
+
+      if (table == null) {
+        //new table
+
+        table = new Table(tableDesc.getDatabaseName(), 
tableDesc.getTableName());
+        if (isPartitioned(tableDesc)) {
+          updateReplicationState(initialReplicationState());
+          return forNewTable();
+        }
+      } else {
+        // existing
+
+        if (table.isPartitioned()) {
+          List<AddPartitionDesc> partitionDescs = 
event.partitionDescriptions(tableDesc);
+          if (!event.replicationSpec().isMetadataOnly() && 
!partitionDescs.isEmpty()) {
+            updateReplicationState(initialReplicationState());
+            return forExistingTable(lastReplicatedPartition);
+          }
+        }
+      }
+      return tracker;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  private void updateReplicationState(ReplicationState replicationState) 
throws SemanticException {
+    if (!tracker.canAddMoreTasks()) {
+      tracker.setReplicationState(replicationState);
+    }
+  }
+
+  private ReplicationState initialReplicationState() throws SemanticException {
+    return new ReplicationState(
+        new PartitionState(tableDesc.getTableName(), lastReplicatedPartition)
+    );
+  }
+
+  private TaskTracker forNewTable() throws Exception {
+    Iterator<AddPartitionDesc> iterator = 
event.partitionDescriptions(tableDesc).iterator();
+    while (iterator.hasNext() && tracker.canAddMoreTasks()) {
+      AddPartitionDesc addPartitionDesc = iterator.next();
+      tracker.addTask(addSinglePartition(table, addPartitionDesc));
+      ReplicationState currentReplicationState =
+          new ReplicationState(new PartitionState(table.getTableName(), 
addPartitionDesc));
+      updateReplicationState(currentReplicationState);
+    }
+    return tracker;
+  }
+
+  /**
+   * returns the root task for adding a partition
+   */
+  private Task<? extends Serializable> addSinglePartition(Table table,
+      AddPartitionDesc addPartitionDesc) throws MetaException, IOException, 
HiveException {
+    AddPartitionDesc.OnePartitionDesc partSpec = 
addPartitionDesc.getPartition(0);
+    Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
+    Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, 
partSpec);
+    partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+    LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+        + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+        + partSpec.getLocation());
+    Path tmpPath = 
context.utils.getExternalTmpPath(replicaWarehousePartitionLocation);
+
+    Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+        event.replicationSpec(),
+        sourceWarehousePartitionLocation,
+        tmpPath,
+        context.hiveConf
+    );
+
+    Task<?> addPartTask = TaskFactory.get(
+        new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc),
+        context.hiveConf
+    );
+
+    Task<?> movePartitionTask = movePartitionTask(table, partSpec, tmpPath);
+
+    copyTask.addDependentTask(addPartTask);
+    addPartTask.addDependentTask(movePartitionTask);
+    return copyTask;
+  }
+
+  /**
+   * This will create the move of partition data from temp path to actual path
+   */
+  private Task<?> movePartitionTask(Table table, 
AddPartitionDesc.OnePartitionDesc partSpec,
+      Path tmpPath) {
+    LoadTableDesc loadTableWork = new LoadTableDesc(
+        tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
+        event.replicationSpec().isReplace()
+    );
+    loadTableWork.setInheritTableSpecs(false);
+    MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), 
loadTableWork, null, false);
+    return TaskFactory.get(work, context.hiveConf);
+  }
+
+  private Path locationOnReplicaWarehouse(Table table, 
AddPartitionDesc.OnePartitionDesc partSpec)
+      throws MetaException, HiveException, IOException {
+    String child = Warehouse.makePartPath(partSpec.getPartSpec());
+    if (tableDesc.getLocation() == null) {
+      if (table.getDataLocation() == null) {
+        Database parentDb = 
context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+        return new Path(
+            context.warehouse.getDefaultTablePath(parentDb, 
tableDesc.getTableName()), child);
+      } else {
+        return new Path(table.getDataLocation().toString(), child);
+      }
+    } else {
+      return new Path(tableDesc.getLocation(), child);
+    }
+  }
+
+  private Task<? extends Serializable> alterSinglePartition(AddPartitionDesc 
desc,
+      ReplicationSpec replicationSpec, Partition ptn) {
+    desc.setReplaceMode(true);
+    if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) 
{
+      desc.setReplicationSpec(replicationSpec);
+    }
+    desc.getPartition(0).setLocation(ptn.getLocation()); // use existing 
location
+    return TaskFactory.get(
+        new DDLWork(new HashSet<>(), new HashSet<>(), desc),
+        context.hiveConf
+    );
+  }
+
+  private TaskTracker forExistingTable(AddPartitionDesc 
lastPartitionReplicated) throws Exception {
+    boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated 
== null);
+    ReplicationSpec replicationSpec = event.replicationSpec();
+    LOG.debug("table partitioned");
+    for (AddPartitionDesc addPartitionDesc : 
event.partitionDescriptions(tableDesc)) {
+      /*
+      encounteredTheLastReplicatedPartition will be set, when we break 
creation of partition tasks
+      for a table, as we have reached the limit of number of tasks we should 
create for execution.
+      in this case on the next run we have to iterate over the partitions desc 
to reach the last replicated
+      partition so that we can start replicating partitions after that.
+       */
+      if (encounteredTheLastReplicatedPartition && tracker.canAddMoreTasks()) {
+        Map<String, String> partSpec = 
addPartitionDesc.getPartition(0).getPartSpec();
+        Partition ptn;
+
+        if ((ptn = context.hiveDb.getPartition(table, partSpec, false)) == 
null) {
+          if (!replicationSpec.isMetadataOnly()) {
+            forNewTable();
+          }
+        } else {
+          // If replicating, then the partition already existing means we need 
to replace, maybe, if
+          // the destination ptn's repl.last.id is older than the 
replacement's.
+          if (replicationSpec.allowReplacementInto(ptn.getParameters())) {
+            if (replicationSpec.isMetadataOnly()) {
+              tracker.addTask(alterSinglePartition(addPartitionDesc, 
replicationSpec, ptn));
+              if (!tracker.canAddMoreTasks()) {
+                tracker.setReplicationState(
+                    new ReplicationState(new 
PartitionState(table.getTableName(), addPartitionDesc)
+                    )
+                );
+              }
+            } else {
+              forNewTable();
+            }
+          } else {
+            // ignore this ptn, do nothing, not an error.
+          }
+        }
+      } else {
+        Map<String, String> currentSpec = 
addPartitionDesc.getPartition(0).getPartSpec();
+        Map<String, String> lastReplicatedPartSpec =
+            lastPartitionReplicated.getPartition(0).getPartSpec();
+        encounteredTheLastReplicatedPartition = 
lastReplicatedPartSpec.equals(currentSpec);
+      }
+    }
+    return tracker;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
new file mode 100644
index 0000000..cbb964a
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -0,0 +1,216 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.TreeMap;
+
+import static 
org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
+
+public class LoadTable {
+  private final static Logger LOG = LoggerFactory.getLogger(LoadTable.class);
+  //  private final Helper helper;
+  private final Context context;
+  private final TableContext tableContext;
+  private final TaskTracker tracker;
+  private final TableEvent event;
+
+  public LoadTable(TableEvent event, Context context, TableContext 
tableContext, TaskTracker limiter)
+      throws SemanticException, IOException {
+    this.event = event;
+    this.context = context;
+    this.tableContext = tableContext;
+    this.tracker = new TaskTracker(limiter);
+  }
+
+  public TaskTracker tasks() throws SemanticException {
+    // Path being passed to us is a table dump location. We go ahead and load 
it in as needed.
+    // If tblName is null, then we default to the table name specified in 
_metadata, which is good.
+    // or are both specified, in which case, that's what we are intended to 
create the new table as.
+    try {
+      if (event.shouldNotReplicate()) {
+        return tracker;
+      }
+      String dbName = tableContext.dbNameToLoadIn; //this can never be null or 
empty;
+      // Create table associated with the import
+      // Executed if relevant, and used to contain all the other details about 
the table if not.
+      ImportTableDesc tableDesc = 
tableContext.overrideProperties(event.tableDesc(dbName));
+      Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, 
context.hiveDb);
+      ReplicationSpec replicationSpec = event.replicationSpec();
+
+      // Normally, on import, trying to create a table or a partition in a db 
that does not yet exist
+      // is a error condition. However, in the case of a REPL LOAD, it is 
possible that we are trying
+      // to create tasks to create a table inside a db that as-of-now does not 
exist, but there is
+      // a precursor Task waiting that will create it before this is 
encountered. Thus, we instantiate
+      // defaults and do not error out in that case.
+      // the above will change now since we are going to split replication 
load in multiple execution
+      // tasks and hence we could have created the database earlier in which 
case the waitOnPrecursor will
+      // be false and hence if db Not found we should error out.
+      Database parentDb = 
context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+      if (parentDb == null) {
+        if (!tableContext.waitOnPrecursor()) {
+          throw new SemanticException(
+              
ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName()));
+        }
+      }
+
+      if (table == null) {
+        // If table doesn't exist, allow creating a new one only if the 
database state is older than the update.
+        if ((parentDb != null) && (!replicationSpec
+            .allowReplacementInto(parentDb.getParameters()))) {
+          // If the target table exists and is newer or same as current update 
based on repl.last.id, then just noop it.
+          return tracker;
+        }
+      } else {
+        if (!replicationSpec.allowReplacementInto(table.getParameters())) {
+          // If the target table exists and is newer or same as current update 
based on repl.last.id, then just noop it.
+          return tracker;
+        }
+      }
+
+      if (tableDesc.getLocation() == null) {
+        tableDesc.setLocation(location(tableDesc, parentDb));
+      }
+
+
+  /* Note: In the following section, Metadata-only import handling logic is
+     interleaved with regular repl-import logic. The rule of thumb being
+     followed here is that MD-only imports are essentially ALTERs. They do
+     not load data, and should not be "creating" any metadata - they should
+     be replacing instead. The only place it makes sense for a MD-only import
+     to create is in the case of a table that's been dropped and recreated,
+     or in the case of an unpartitioned table. In all other cases, it should
+     behave like a noop or a pure MD alter.
+  */
+      if (table == null) {
+        return newTableTasks(tableDesc);
+      } else {
+        return existingTableTasks(tableDesc, table, replicationSpec);
+      }
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  private TaskTracker existingTableTasks(ImportTableDesc tblDesc, Table table,
+      ReplicationSpec replicationSpec) {
+    if (!table.isPartitioned()) {
+
+      LOG.debug("table non-partitioned");
+      if (!replicationSpec.allowReplacementInto(table.getParameters())) {
+        return tracker; // silently return, table is newer than our 
replacement.
+      }
+
+      Task<? extends Serializable> alterTableTask = alterTableTask(tblDesc, 
replicationSpec);
+      if (replicationSpec.isMetadataOnly()) {
+        tracker.addTask(alterTableTask);
+      } else {
+        Task<?> loadTableTask =
+            loadTableTask(table, replicationSpec, event.metadataPath(), 
event.metadataPath());
+        alterTableTask.addDependentTask(loadTableTask);
+        tracker.addTask(alterTableTask);
+      }
+    }
+    return tracker;
+  }
+
+  private TaskTracker newTableTasks(ImportTableDesc tblDesc) throws 
SemanticException {
+    Table table;
+    table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
+    // Either we're dropping and re-creating, or the table didn't exist, and 
we're creating.
+    Task<?> createTableTask =
+        tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), 
context.hiveConf);
+    if (event.replicationSpec().isMetadataOnly()) {
+      tracker.addTask(createTableTask);
+      return tracker;
+    }
+    if (!isPartitioned(tblDesc)) {
+      LOG.debug("adding dependent CopyWork/MoveWork for table");
+      Task<?> loadTableTask =
+          loadTableTask(table, event.replicationSpec(), new 
Path(tblDesc.getLocation()),
+              event.metadataPath());
+      createTableTask.addDependentTask(loadTableTask);
+    }
+    tracker.addTask(createTableTask);
+    return tracker;
+  }
+
+  private String location(ImportTableDesc tblDesc, Database parentDb)
+      throws MetaException, SemanticException {
+    if (!tableContext.waitOnPrecursor()) {
+      return context.warehouse.getDefaultTablePath(parentDb, 
tblDesc.getTableName()).toString();
+    } else {
+      Path tablePath = new Path(
+          context.warehouse.getDefaultDatabasePath(tblDesc.getDatabaseName()),
+          MetaStoreUtils.encodeTableName(tblDesc.getTableName().toLowerCase())
+      );
+      return context.warehouse.getDnsPath(tablePath).toString();
+    }
+  }
+
+  private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, 
Path tgtPath,
+      Path fromURI) {
+    Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME);
+    Path tmpPath = context.utils.getExternalTmpPath(tgtPath);
+    Task<?> copyTask =
+        ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, 
context.hiveConf);
+
+    LoadTableDesc loadTableWork = new LoadTableDesc(
+        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), 
replicationSpec.isReplace()
+    );
+    MoveWork moveWork =
+        new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, 
false);
+    Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
+    copyTask.addDependentTask(loadTableTask);
+    return copyTask;
+  }
+
+  private Task<? extends Serializable> alterTableTask(ImportTableDesc 
tableDesc,
+      ReplicationSpec replicationSpec) {
+    tableDesc.setReplaceMode(true);
+    if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) 
{
+      tableDesc.setReplicationSpec(replicationSpec);
+    }
+    return tableDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), 
context.hiveConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
new file mode 100644
index 0000000..39986f5
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
@@ -0,0 +1,49 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+
+public class TableContext {
+  final String dbNameToLoadIn;
+  private final TaskTracker parentTracker;
+  // this will only be available when we are doing table load only in 
replication not otherwise
+  private final String tableNameToLoadIn;
+
+  public TableContext(TaskTracker parentTracker, String dbNameToLoadIn,
+      String tableNameToLoadIn) {
+    this.dbNameToLoadIn = dbNameToLoadIn;
+    this.parentTracker = parentTracker;
+    this.tableNameToLoadIn = tableNameToLoadIn;
+  }
+
+  boolean waitOnPrecursor() {
+    return parentTracker.hasTasks();
+  }
+
+  ImportTableDesc overrideProperties(ImportTableDesc importTableDesc)
+      throws SemanticException {
+    if (StringUtils.isNotBlank(tableNameToLoadIn)) {
+      importTableDesc.setTableName(tableNameToLoadIn);
+    }
+    return importTableDesc;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
new file mode 100644
index 0000000..2a7cca1
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
@@ -0,0 +1,37 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+
+public class Context {
+  public final HiveConf hiveConf;
+  public final Hive hiveDb;
+  public final Warehouse warehouse;
+  public final PathUtils utils;
+
+  public Context(HiveConf hiveConf, Hive hiveDb) throws MetaException {
+    this.hiveConf = hiveConf;
+    this.hiveDb = hiveDb;
+    this.warehouse = new Warehouse(hiveConf);
+    this.utils = new PathUtils(hiveConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java
new file mode 100644
index 0000000..d0b7bda
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java
@@ -0,0 +1,105 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.ql.Context.EXT_PREFIX;
+import static org.apache.hadoop.hive.ql.Context.generateExecutionId;
+
+public class PathUtils {
+  private static int pathId = 10000;
+  private static Logger LOG = LoggerFactory.getLogger(PathUtils.class);
+
+  private final Map<String, Path> fsScratchDirs = new HashMap<>();
+  private final String stagingDir;
+  private final HiveConf hiveConf;
+
+  PathUtils(HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+    stagingDir = HiveConf.getVar(hiveConf, HiveConf.ConfVars.STAGINGDIR);
+  }
+
+  public synchronized Path getExternalTmpPath(Path path) {
+    URI extURI = path.toUri();
+    if (extURI.getScheme().equals("viewfs")) {
+      // if we are on viewfs we don't want to use /tmp as tmp dir since rename 
from /tmp/..
+      // to final /user/hive/warehouse/ will fail later, so instead pick tmp 
dir
+      // on same namespace as tbl dir.
+      return new Path(getStagingDir(path.getParent()),
+          EXT_PREFIX + Integer.toString(++pathId));
+    }
+    Path fullyQualifiedPath = new Path(extURI.getScheme(), 
extURI.getAuthority(), extURI.getPath());
+    return new Path(getStagingDir(fullyQualifiedPath), EXT_PREFIX + 
Integer.toString(++pathId));
+  }
+
+  private Path getStagingDir(Path inputPath) {
+    final URI inputPathUri = inputPath.toUri();
+    final String inputPathName = inputPathUri.getPath();
+    final String fileSystemAsString = inputPathUri.getScheme() + ":" + 
inputPathUri.getAuthority();
+
+    String stagingPathName;
+    if (!inputPathName.contains(stagingDir)) {
+      stagingPathName = new Path(inputPathName, stagingDir).toString();
+    } else {
+      stagingPathName =
+          inputPathName.substring(0, inputPathName.indexOf(stagingDir) + 
stagingDir.length());
+    }
+
+    final String key =
+        fileSystemAsString + "-" + stagingPathName + "-" + 
TaskRunner.getTaskRunnerID();
+
+    Path dir = fsScratchDirs.get(key);
+    try {
+      FileSystem fileSystem = inputPath.getFileSystem(hiveConf);
+      if (dir == null) {
+        // Append task specific info to stagingPathName, instead of creating a 
sub-directory.
+        // This way we don't have to worry about deleting the stagingPathName 
separately at
+        // end of query execution.
+        Path path = new Path(
+            stagingPathName + "_" + generateExecutionId() + "-" + 
TaskRunner.getTaskRunnerID());
+        dir = fileSystem.makeQualified(path);
+
+        LOG.debug("Created staging dir = " + dir + " for path = " + inputPath);
+
+        if (!FileUtils.mkdir(fileSystem, dir, hiveConf)) {
+          throw new IllegalStateException(
+              "Cannot create staging directory  '" + dir.toString() + "'");
+        }
+        fileSystem.deleteOnExit(dir);
+      }
+      fsScratchDirs.put(key, dir);
+      return dir;
+
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "Cannot create staging directory '" + dir.toString() + "': " + 
e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index f3f206b..37edd5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.slf4j.Logger;
 
 /**
  * ImportSemanticAnalyzer.
@@ -353,7 +354,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   }
 
   private static Task<?> createTableTask(ImportTableDesc tableDesc, 
EximUtil.SemanticAnalyzerWrapperContext x){
-    return tableDesc.getCreateTableTask(x);
+    return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), 
x.getConf());
   }
 
   private static Task<?> dropTableTask(Table table, 
EximUtil.SemanticAnalyzerWrapperContext x){
@@ -370,7 +371,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){
       tableDesc.setReplicationSpec(replicationSpec);
     }
-    return tableDesc.getCreateTableTask(x);
+    return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), 
x.getConf());
   }
 
   private static Task<? extends Serializable> alterSinglePartition(
@@ -452,29 +453,29 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           Warehouse.makePartPath(partSpec.getPartSpec()));
     }
     FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
-    checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
+    checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
     partSpec.setLocation(tgtPath.toString());
   }
 
-  private static void checkTargetLocationEmpty(FileSystem fs, Path targetPath, 
ReplicationSpec replicationSpec,
-                                        
EximUtil.SemanticAnalyzerWrapperContext x)
+  public static void checkTargetLocationEmpty(FileSystem fs, Path targetPath, 
ReplicationSpec replicationSpec,
+      Logger logger)
       throws IOException, SemanticException {
     if (replicationSpec.isInReplicationScope()){
       // replication scope allows replacement, and does not require empty 
directories
       return;
     }
-    x.getLOG().debug("checking emptiness of " + targetPath.toString());
+    logger.debug("checking emptiness of " + targetPath.toString());
     if (fs.exists(targetPath)) {
       FileStatus[] status = fs.listStatus(targetPath, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
       if (status.length > 0) {
-        x.getLOG().debug("Files inc. " + status[0].getPath().toString()
+        logger.debug("Files inc. " + status[0].getPath().toString()
             + " found in path : " + targetPath.toString());
         throw new SemanticException(ErrorMsg.TABLE_DATA_EXISTS.getMsg());
       }
     }
   }
 
-  private static String partSpecToString(Map<String, String> partSpec) {
+  public static String partSpecToString(Map<String, String> partSpec) {
     StringBuilder sb = new StringBuilder();
     boolean firstTime = true;
     for (Map.Entry<String, String> entry : partSpec.entrySet()) {
@@ -489,7 +490,8 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     return sb.toString();
   }
 
-  private static void checkTable(Table table, ImportTableDesc tableDesc, 
ReplicationSpec replicationSpec, HiveConf conf)
+  public static void checkTable(Table table, ImportTableDesc tableDesc,
+      ReplicationSpec replicationSpec, HiveConf conf)
       throws SemanticException, URISyntaxException {
     // This method gets called only in the scope that a destination table 
already exists, so
     // we're validating if the table is an appropriate destination to import 
into
@@ -739,7 +741,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         // ensure if destination is not empty only for regular import
         Path tgtPath = new Path(table.getDataLocation().toString());
         FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
-        checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
+        checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
         loadTable(fromURI, table, false, tgtPath, replicationSpec,x);
       }
       // Set this to read because we can't overwrite any existing partitions
@@ -774,7 +776,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
             tablePath = wh.getDefaultTablePath(parentDb, 
tblDesc.getTableName());
           }
           FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
-          checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x);
+          checkTargetLocationEmpty(tgtFs, tablePath, 
replicationSpec,x.getLOG());
           t.addDependentTask(loadTable(fromURI, table, false, tablePath, 
replicationSpec, x));
         }
       }
@@ -935,7 +937,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   }
 
-  private static boolean isPartitioned(ImportTableDesc tblDesc) {
+  public static boolean isPartitioned(ImportTableDesc tblDesc) {
     return !(tblDesc.getPartCols() == null || tblDesc.getPartCols().isEmpty());
   }
 
@@ -943,7 +945,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
    * Utility method that returns a table if one corresponding to the 
destination
    * tblDesc is found. Returns null if no such table is found.
    */
-   private static Table tableIfExists(ImportTableDesc tblDesc, Hive db) throws 
HiveException {
+  public static Table tableIfExists(ImportTableDesc tblDesc, Hive db) throws 
HiveException {
     try {
       return db.getTable(tblDesc.getDatabaseName(),tblDesc.getTableName());
     } catch (InvalidTableException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index b41facd..d4fc340 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -17,35 +17,26 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
-import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
-import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -53,14 +44,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.Serializable;
-import java.net.URI;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -85,7 +72,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   private static String testInjectDumpDir = null; // unit tests can overwrite 
this to affect default dump behaviour
   private static final String dumpSchema = 
"dump_dir,last_repl_id#string,string";
 
-  private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
+  public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
   private final static Logger REPL_STATE_LOG = 
LoggerFactory.getLogger("ReplState");
 
   ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
@@ -294,8 +281,9 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       }
 
       if ((!evDump) && (tblNameOrPattern != null) && 
!(tblNameOrPattern.isEmpty())) {
-        // not an event dump, and table name pattern specified, this has to be 
a tbl-level dump
-        rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, 
path, null, null, null));
+        ReplLoadWork replLoadWork =
+            new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, 
tblNameOrPattern);
+        rootTasks.add(TaskFactory.get(replLoadWork, conf));
         return;
       }
 
@@ -324,9 +312,12 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
                   + " does not correspond to REPL LOAD expecting to load to a 
singular destination point.");
         }
 
-        for (FileStatus dir : dirsInLoadPath) {
-          analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
-        }
+        ReplLoadWork replLoadWork = new ReplLoadWork(conf, 
loadPath.toString(), dbNameOrPattern);
+        rootTasks.add(TaskFactory.get(replLoadWork, conf));
+        //
+        //        for (FileStatus dir : dirsInLoadPath) {
+        //          analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
+        //        }
       } else {
         // Event dump, each sub-dir is an individual event dump.
         // We need to guarantee that the directory listing we got is in order 
of evid.
@@ -439,7 +430,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
         for (String tableName : tablesUpdated.keySet()){
           // weird - AlterTableDesc requires a HashMap to update props instead 
of a Map.
-          HashMap<String,String> mapProp = new HashMap<String,String>();
+          HashMap<String, String> mapProp = new HashMap<>();
           String eventId = tablesUpdated.get(tableName).toString();
 
           mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId);
@@ -453,7 +444,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           taskChainTail = updateReplIdTask;
         }
         for (String dbName : dbsUpdated.keySet()){
-          Map<String,String> mapProp = new HashMap<String,String>();
+          Map<String, String> mapProp = new HashMap<>();
           String eventId = dbsUpdated.get(dbName).toString();
 
           mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId);
@@ -500,187 +491,6 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     return tasks;
   }
 
-  private boolean existEmptyDb(String dbName) throws 
InvalidOperationException, HiveException {
-    Hive hiveDb = Hive.get();
-    Database db = hiveDb.getDatabase(dbName);
-    if (null != db) {
-      List<String> allTables = hiveDb.getAllTables(dbName);
-      List<String> allFunctions = hiveDb.getFunctions(dbName, "*");
-      if (!allTables.isEmpty()) {
-        throw new InvalidOperationException(
-                "Database " + db.getName() + " is not empty. One or more 
tables exist.");
-      }
-      if (!allFunctions.isEmpty()) {
-        throw new InvalidOperationException(
-                "Database " + db.getName() + " is not empty. One or more 
functions exist.");
-      }
-
-      return true;
-    }
-
-    return false;
-  }
-
-  private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus 
dir)
-      throws SemanticException {
-    try {
-      // Path being passed to us is a db dump location. We go ahead and load 
as needed.
-      // dbName might be null or empty, in which case we keep the original db 
name for the new
-      // database creation
-
-      // Two steps here - first, we read the _metadata file here, and create a 
CreateDatabaseDesc
-      // associated with that
-      // Then, we iterate over all subdirs, and create table imports for each.
-
-      MetaData rv = new MetaData();
-      try {
-        rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), 
EximUtil.METADATA_NAME));
-      } catch (IOException e) {
-        throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
-      }
-
-      Database dbObj = rv.getDatabase();
-
-      if (dbObj == null) {
-        throw new IllegalArgumentException(
-            "_metadata file read did not contain a db object - invalid dump.");
-      }
-
-      if ((dbName == null) || (dbName.isEmpty())) {
-        // We use dbName specified as long as it is not null/empty. If so, 
then we use the original
-        // name
-        // recorded in the thrift object.
-        dbName = dbObj.getName();
-      }
-
-      REPL_STATE_LOG.info("Repl Load: Started analyzing Repl Load for DB: {} 
from Dump Dir: {}, Dump Type: BOOTSTRAP",
-                          dbName, dir.getPath().toUri().toString());
-
-      Task<? extends Serializable> dbRootTask = null;
-      if (existEmptyDb(dbName)) {
-        AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, 
dbObj.getParameters(), null);
-        dbRootTask = TaskFactory.get(new DDLWork(inputs, outputs, 
alterDbDesc), conf);
-      } else {
-        CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc();
-        createDbDesc.setName(dbName);
-        createDbDesc.setComment(dbObj.getDescription());
-        createDbDesc.setDatabaseProperties(dbObj.getParameters());
-        // note that we do not set location - for repl load, we want that 
auto-created.
-
-        createDbDesc.setIfNotExists(false);
-        // If it exists, we want this to be an error condition. Repl Load is 
not intended to replace a
-        // db.
-        // TODO: we might revisit this in create-drop-recreate cases, needs 
some thinking on.
-        dbRootTask = TaskFactory.get(new DDLWork(inputs, outputs, 
createDbDesc), conf);
-      }
-
-      rootTasks.add(dbRootTask);
-      FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), 
EximUtil.getDirectoryFilter(fs));
-
-      for (FileStatus tableDir : 
Collections2.filter(Arrays.asList(dirsInDbPath), new TableDirPredicate())) {
-        analyzeTableLoad(
-            dbName, null, tableDir.getPath().toUri().toString(), dbRootTask, 
null, null);
-        REPL_STATE_LOG.info("Repl Load: Analyzed table/view/partition load 
from path {}",
-                            tableDir.getPath().toUri().toString());
-      }
-
-      //Function load
-      Path functionMetaDataRoot = new Path(dir.getPath(), 
FUNCTIONS_ROOT_DIR_NAME);
-      if (fs.exists(functionMetaDataRoot)) {
-        List<FileStatus> functionDirectories =
-            Arrays.asList(fs.listStatus(functionMetaDataRoot, 
EximUtil.getDirectoryFilter(fs)));
-        for (FileStatus functionDir : functionDirectories) {
-          analyzeFunctionLoad(dbName, functionDir, dbRootTask);
-          REPL_STATE_LOG.info("Repl Load: Analyzed function load from path {}",
-                              functionDir.getPath().toUri().toString());
-        }
-      }
-
-      REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl Load for DB: {} 
and created import (DDL/COPY/MOVE) tasks",
-              dbName);
-    } catch (Exception e) {
-      throw new SemanticException(e);
-    }
-  }
-
-  private static class TableDirPredicate implements Predicate<FileStatus> {
-    @Override
-    public boolean apply(FileStatus fileStatus) {
-      return !fileStatus.getPath().getName().contains(FUNCTIONS_ROOT_DIR_NAME);
-    }
-  }
-
-  private void analyzeFunctionLoad(String dbName, FileStatus functionDir,
-      Task<? extends Serializable> createDbTask) throws IOException, 
SemanticException {
-    URI fromURI = EximUtil
-        .getValidatedURI(conf, 
stripQuotes(functionDir.getPath().toUri().toString()));
-    Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), 
fromURI.getPath());
-
-    try {
-      CreateFunctionHandler handler = new CreateFunctionHandler();
-      List<Task<? extends Serializable>> tasksList = handler.handle(
-          new MessageHandler.Context(
-              dbName, null, fromPath.toString(), createDbTask, null, conf, db,
-              null, LOG)
-      );
-
-      tasksList.forEach(task -> {
-        createDbTask.addDependentTask(task);
-        LOG.debug("Added {}:{} as a precursor of {}:{}",
-            createDbTask.getClass(), createDbTask.getId(), task.getClass(),
-            task.getId());
-
-      });
-      inputs.addAll(handler.readEntities());
-    } catch (Exception e) {
-      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
-    }
-  }
-
-  private List<Task<? extends Serializable>> analyzeTableLoad(
-      String dbName, String tblName, String locn,
-      Task<? extends Serializable> precursor,
-      Map<String,Long> dbsUpdated, Map<String,Long> tablesUpdated) throws 
SemanticException {
-    // Path being passed to us is a table dump location. We go ahead and load 
it in as needed.
-    // If tblName is null, then we default to the table name specified in 
_metadata, which is good.
-    // or are both specified, in which case, that's what we are intended to 
create the new table as.
-    if (dbName == null || dbName.isEmpty()) {
-      throw new SemanticException("Database name cannot be null for a table 
load");
-    }
-    try {
-      // no location set on repl loads
-      boolean isLocationSet = false;
-      // all repl imports are non-external
-      boolean isExternalSet = false;
-      // bootstrap loads are not partition level
-      boolean isPartSpecSet = false;
-      // repl loads are not partition level
-      LinkedHashMap<String, String> parsedPartSpec = null;
-      // no location for repl imports
-      String parsedLocation = null;
-      List<Task<? extends Serializable>> importTasks = new ArrayList<Task<? 
extends Serializable>>();
-
-      EximUtil.SemanticAnalyzerWrapperContext x =
-          new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, 
outputs, importTasks, LOG,
-              ctx);
-      ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, 
isPartSpecSet,
-          (precursor != null), parsedLocation, tblName, dbName, 
parsedPartSpec, locn, x,
-          dbsUpdated, tablesUpdated);
-
-      if (precursor != null) {
-        for (Task<? extends Serializable> t : importTasks) {
-          precursor.addDependentTask(t);
-          LOG.debug("Added {}:{} as a precursor of {}:{}",
-              precursor.getClass(), precursor.getId(), t.getClass(), 
t.getId());
-        }
-      }
-
-      return importTasks;
-    } catch (Exception e) {
-      throw new SemanticException(e);
-    }
-  }
-
   // REPL STATUS
   private void initReplStatus(ASTNode ast) {
     int numChildren = ast.getChildCount();

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
index bb02c26..e574a47 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
@@ -18,14 +18,19 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import java.io.Serializable;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
@@ -304,12 +309,13 @@ public class ImportTableDesc {
     return dbName;
   }
 
-  public Task <?> getCreateTableTask(EximUtil.SemanticAnalyzerWrapperContext 
x) {
+  public Task<? extends Serializable> getCreateTableTask(HashSet<ReadEntity> 
inputs, HashSet<WriteEntity> outputs,
+      HiveConf conf) {
     switch (getTableType()) {
       case TABLE:
-        return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), 
createTblDesc), x.getConf());
+        return TaskFactory.get(new DDLWork(inputs, outputs, createTblDesc), 
conf);
       case VIEW:
-        return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), 
createViewDesc), x.getConf());
+        return TaskFactory.get(new DDLWork(inputs, outputs, createViewDesc), 
conf);
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTrackerTest.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTrackerTest.java
new file mode 100644
index 0000000..aa29a5a
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTrackerTest.java
@@ -0,0 +1,29 @@
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+  public class TaskTrackerTest {
+  @Mock
+  private Task<? extends Serializable> task;
+
+  @Test
+  public void taskTrackerCompositionInitializesTheMaxTasksCorrectly() {
+    TaskTracker taskTracker = new TaskTracker(1);
+    assertTrue(taskTracker.canAddMoreTasks());
+    taskTracker.addTask(task);
+    assertFalse(taskTracker.canAddMoreTasks());
+
+    TaskTracker taskTracker2 = new TaskTracker(taskTracker);
+    assertFalse(taskTracker2.canAddMoreTasks());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out 
b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
index 16cd9b7..01b57a7 100644
--- a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
+++ b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
@@ -70,11 +70,7 @@ POSTHOOK: Input: database:test_replload_adminpriv_src
 #### A masked pattern was here ####
 PREHOOK: type: REPLLOAD
 #### A masked pattern was here ####
-PREHOOK: Output: test_replload_adminpriv_tgt1@dummy_tbl
-#### A masked pattern was here ####
 POSTHOOK: type: REPLLOAD
-#### A masked pattern was here ####
-POSTHOOK: Output: test_replload_adminpriv_tgt1@dummy_tbl
 PREHOOK: query: show tables test_replload_adminpriv_tgt1
 PREHOOK: type: SHOWTABLES
 PREHOOK: Input: database:default

Reply via email to