Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 30990f485 -> 5fa983268


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
index 119ef1c..a08f61a 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
@@ -28,6 +28,7 @@ import static java.util.stream.Collectors.joining;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import 
org.apache.gobblin.data.management.conversion.hive.entities.HiveProcessingEntity;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,16 +44,18 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.DataConversionException;
-import 
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiveConversionEntity;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
 import org.apache.gobblin.data.management.copy.hive.HiveUtils;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.util.AutoReturnableObject;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -62,6 +65,19 @@ import lombok.extern.slf4j.Slf4j;
  */
 public class HiveConverterUtils {
 
+  @AllArgsConstructor
+  @Getter
+  public static enum StorageFormat {
+    TEXT_FILE("TEXTFILE"),
+    SEQUENCE_FILE("SEQUENCEFILE"),
+    ORC("ORC"),
+    PARQUET("PARQUET"),
+    AVRO("AVRO"),
+    RC_FILE("RCFILE");
+
+    private final String hiveName;
+  }
+
   /***
    * Subdirectory within destination table directory to publish data
    */
@@ -136,6 +152,44 @@ public class HiveConverterUtils {
   }
 
   /**
+   * Generates a CTAS statement to dump the contents of a table / partition 
into a new table.
+   * @param outputDbAndTable output db and table where contents should be 
written.
+   * @param sourceEntity source table / partition.
+   * @param partitionDMLInfo map of partition values.
+   * @param storageFormat format of output table.
+   * @param outputTableLocation location where files of output table should be 
written.
+   */
+  public static String 
generateStagingCTASStatementFromSelectStar(HiveDatasetFinder.DbAndTable 
outputDbAndTable,
+      HiveDatasetFinder.DbAndTable sourceEntity, Map<String, String> 
partitionDMLInfo,
+      StorageFormat storageFormat, String outputTableLocation) {
+    StringBuilder sourceQueryBuilder = new StringBuilder("SELECT * FROM 
`").append(sourceEntity.getDb())
+        .append("`.`").append(sourceEntity.getTable()).append("`");
+    if (partitionDMLInfo != null && !partitionDMLInfo.isEmpty()) {
+      sourceQueryBuilder.append(" WHERE ");
+      sourceQueryBuilder.append(partitionDMLInfo.entrySet().stream()
+          .map(e -> "`" + e.getKey() + "`='" + e.getValue() + "'")
+          .collect(joining(" AND ")));
+    }
+    return generateStagingCTASStatement(outputDbAndTable, 
sourceQueryBuilder.toString(), storageFormat, outputTableLocation);
+  }
+
+  /**
+   * Generates a CTAS statement to dump the results of a query into a new 
table.
+   * @param outputDbAndTable output db and table where contents should be 
written.
+   * @param sourceQuery query to materialize.
+   * @param storageFormat format of output table.
+   * @param outputTableLocation location where files of output table should be 
written.
+   */
+  public static String 
generateStagingCTASStatement(HiveDatasetFinder.DbAndTable outputDbAndTable,
+      String sourceQuery, StorageFormat storageFormat, String 
outputTableLocation) {
+    
Preconditions.checkArgument(!Strings.isNullOrEmpty(outputDbAndTable.getDb()) &&
+        !Strings.isNullOrEmpty(outputDbAndTable.getTable()), "Invalid output 
db and table " + outputDbAndTable);
+
+    return String.format("CREATE TEMPORARY TABLE `%s`.`%s` STORED AS %s 
LOCATION '%s' AS %s", outputDbAndTable.getDb(),
+        outputDbAndTable.getTable(), storageFormat.getHiveName(), 
outputTableLocation, sourceQuery);
+  }
+
+  /**
    * Fills data from input table into output table.
    * @param inputTblName input hive table name
    * @param outputTblName output hive table name
@@ -160,8 +214,10 @@ public class HiveConverterUtils {
     // Insert query
     dmlQuery.append(String.format("INSERT OVERWRITE TABLE `%s`.`%s` %n", 
outputDbName, outputTblName));
 
-    // Partition details
-    dmlQuery.append(partitionKeyValues(optionalPartitionDMLInfo));
+    if (optionalPartitionDMLInfo.isPresent() && 
optionalPartitionDMLInfo.get().size() > 0) {
+      // Partition details
+      dmlQuery.append(partitionKeyValues(optionalPartitionDMLInfo));
+    }
 
     dmlQuery.append(String.format("SELECT * FROM `%s`.`%s`", inputDbName, 
inputTblName));
     if (optionalPartitionDMLInfo.isPresent()) {
@@ -192,15 +248,15 @@ public class HiveConverterUtils {
    * @param partitionsDDLInfo partition type information, to be filled by this 
method
    * @param partitionsDMLInfo partition key-value pair, to be filled by this 
method
    */
-  public static void populatePartitionInfo(QueryBasedHiveConversionEntity 
conversionEntity, Map<String, String> partitionsDDLInfo,
+  public static void populatePartitionInfo(HiveProcessingEntity 
conversionEntity, Map<String, String> partitionsDDLInfo,
       Map<String, String> partitionsDMLInfo) {
 
     String partitionsInfoString = null;
     String partitionsTypeString = null;
 
-    if (conversionEntity.getHivePartition().isPresent()) {
-      partitionsInfoString = 
conversionEntity.getHivePartition().get().getName();
-      partitionsTypeString = 
conversionEntity.getHivePartition().get().getSchema().getProperty("partition_columns.types");
+    if (conversionEntity.getPartition().isPresent()) {
+      partitionsInfoString = conversionEntity.getPartition().get().getName();
+      partitionsTypeString = 
conversionEntity.getPartition().get().getSchema().getProperty("partition_columns.types");
     }
 
     if (StringUtils.isNotBlank(partitionsInfoString) || 
StringUtils.isNotBlank(partitionsTypeString)) {
@@ -235,7 +291,7 @@ public class HiveConverterUtils {
    * @param conversionEntity conversion entity used to get source directory 
permissions
    * @param workUnit workunit
    */
-  public static void createStagingDirectory(FileSystem fs, String destination, 
QueryBasedHiveConversionEntity conversionEntity,
+  public static void createStagingDirectory(FileSystem fs, String destination, 
HiveProcessingEntity conversionEntity,
       WorkUnitState workUnit) {
     /*
      * Create staging data location with the same permissions as source data 
location
@@ -250,18 +306,26 @@ public class HiveConverterUtils {
      */
     Path destinationPath = new Path(destination);
     try {
-      FileStatus sourceDataFileStatus = 
fs.getFileStatus(conversionEntity.getHiveTable().getDataLocation());
-      FsPermission sourceDataPermission = sourceDataFileStatus.getPermission();
-      if (!fs.mkdirs(destinationPath, sourceDataPermission)) {
+      FsPermission permission;
+      String group = null;
+      if (conversionEntity.getTable().getDataLocation() != null) {
+        FileStatus sourceDataFileStatus = 
fs.getFileStatus(conversionEntity.getTable().getDataLocation());
+        permission = sourceDataFileStatus.getPermission();
+        group = sourceDataFileStatus.getGroup();
+      } else {
+        permission = FsPermission.getDefault();
+      }
+
+      if (!fs.mkdirs(destinationPath, permission)) {
         throw new RuntimeException(String.format("Failed to create path %s 
with permissions %s",
-            destinationPath, sourceDataPermission));
+            destinationPath, permission));
       } else {
-        fs.setPermission(destinationPath, sourceDataPermission);
+        fs.setPermission(destinationPath, permission);
         // Set the same group as source
-        if (!workUnit.getPropAsBoolean(HIVE_DATASET_DESTINATION_SKIP_SETGROUP, 
DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP)) {
-          fs.setOwner(destinationPath, null, sourceDataFileStatus.getGroup());
+        if (group != null && 
!workUnit.getPropAsBoolean(HIVE_DATASET_DESTINATION_SKIP_SETGROUP, 
DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP)) {
+          fs.setOwner(destinationPath, null, group);
         }
-        log.info(String.format("Created %s with permissions %s and group %s", 
destinationPath, sourceDataPermission, sourceDataFileStatus.getGroup()));
+        log.info(String.format("Created %s with permissions %s and group %s", 
destinationPath, permission, group));
       }
     } catch (IOException e) {
       Throwables.propagate(e);
@@ -275,12 +339,12 @@ public class HiveConverterUtils {
    *                               such as hourly or daily.
    * @return Partition directory name.
    */
-  public static String 
getStagingDataPartitionDirName(QueryBasedHiveConversionEntity conversionEntity,
+  public static String getStagingDataPartitionDirName(HiveProcessingEntity 
conversionEntity,
       List<String> sourceDataPathIdentifier) {
 
-    if (conversionEntity.getHivePartition().isPresent()) {
+    if (conversionEntity.getPartition().isPresent()) {
       StringBuilder dirNamePrefix = new StringBuilder();
-      String sourceHivePartitionLocation = 
conversionEntity.getHivePartition().get().getDataLocation().toString();
+      String sourceHivePartitionLocation = 
conversionEntity.getPartition().get().getDataLocation().toString();
       if (null != sourceDataPathIdentifier && null != 
sourceHivePartitionLocation) {
         for (String hint : sourceDataPathIdentifier) {
           if 
(sourceHivePartitionLocation.toLowerCase().contains(hint.toLowerCase())) {
@@ -289,7 +353,7 @@ public class HiveConverterUtils {
         }
       }
 
-      return dirNamePrefix + 
conversionEntity.getHivePartition().get().getName();
+      return dirNamePrefix + conversionEntity.getPartition().get().getName();
     } else {
       return StringUtils.EMPTY;
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java
deleted file mode 100644
index 2e370c0..0000000
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.data.management.conversion.hive.task;
-
-import java.util.List;
-
-import 
org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
-import 
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
-import org.apache.gobblin.runtime.TaskContext;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-
-/**
- * A simple {@link HiveTask} for Hive view materialization.
- */
-public class HiveMaterializer extends HiveTask {
-
-  private final QueryGenerator queryGenerator;
-
-  public HiveMaterializer(TaskContext taskContext) throws Exception {
-    super(taskContext);
-    this.queryGenerator = new 
HiveMaterializerQueryGenerator(this.workUnitState);
-    if (!(workUnit.getHiveDataset() instanceof ConvertibleHiveDataset)) {
-      throw new IllegalStateException("HiveConvertExtractor is only compatible 
with ConvertibleHiveDataset");
-    }
-  }
-
-  @Override
-  public List<String> generateHiveQueries() {
-    return queryGenerator.generateQueries();
-  }
-
-  @Override
-  public QueryBasedHivePublishEntity generatePublishQueries() throws Exception 
{
-    return queryGenerator.generatePublishQueries();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java
deleted file mode 100644
index 4eee0e0..0000000
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.data.management.conversion.hive.task;
-
-import java.util.Map;
-import java.util.List;
-
-import 
org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter;
-import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.converter.DataConversionException;
-import 
org.apache.gobblin.data.management.conversion.hive.avro.AvroSchemaManager;
-import 
org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
-import 
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiveConversionEntity;
-import 
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
-import 
org.apache.gobblin.data.management.conversion.hive.entities.SchemaAwareHivePartition;
-import 
org.apache.gobblin.data.management.conversion.hive.entities.SchemaAwareHiveTable;
-import 
org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils;
-import 
org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator;
-import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
-import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
-import org.apache.gobblin.hive.HiveMetastoreClientPool;
-import org.apache.gobblin.util.AutoReturnableObject;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-/**
- * A simple query generator for {@link HiveMaterializer}.
- */
-public class HiveMaterializerQueryGenerator implements QueryGenerator {
-  private final FileSystem fs;
-  private final ConvertibleHiveDataset.ConversionConfig conversionConfig;
-  private final ConvertibleHiveDataset hiveDataset;
-  private final String inputDbName;
-  private final String inputTableName;
-  private final String outputDatabaseName;
-  private final String outputTableName;
-  private final String outputDataLocation;
-  private final String stagingTableName;
-  private final String stagingDataLocation;
-  private final List<String> sourceDataPathIdentifier;
-  private final String stagingDataPartitionDirName;
-  private final String stagingDataPartitionLocation;
-  private final Map<String, String> partitionsDDLInfo;
-  private final Map<String, String> partitionsDMLInfo;
-  private final Optional<Table> destinationTableMeta;
-  private final HiveWorkUnit workUnit;
-  private final HiveMetastoreClientPool pool;
-  private final QueryBasedHiveConversionEntity conversionEntity;
-  private final WorkUnitState workUnitState;
-
-  public HiveMaterializerQueryGenerator(WorkUnitState workUnitState) throws 
Exception {
-    this.workUnitState = workUnitState;
-    this.workUnit = new HiveWorkUnit(workUnitState.getWorkunit());
-    this.hiveDataset = (ConvertibleHiveDataset) workUnit.getHiveDataset();
-    this.inputDbName = hiveDataset.getDbAndTable().getDb();
-    this.inputTableName = hiveDataset.getDbAndTable().getTable();
-    this.fs = HiveSource.getSourceFs(workUnitState);
-    this.conversionConfig = 
hiveDataset.getConversionConfigForFormat("sameAsSource").get();
-    this.outputDatabaseName = conversionConfig.getDestinationDbName();
-    this.outputTableName = conversionConfig.getDestinationTableName();
-    this.outputDataLocation = 
HiveConverterUtils.getOutputDataLocation(conversionConfig.getDestinationDataPath());
-    this.stagingTableName = 
HiveConverterUtils.getStagingTableName(conversionConfig.getDestinationStagingTableName());
-    this.stagingDataLocation = 
HiveConverterUtils.getStagingDataLocation(conversionConfig.getDestinationDataPath(),
 stagingTableName);
-    this.sourceDataPathIdentifier = 
conversionConfig.getSourceDataPathIdentifier();
-    this.pool = 
HiveMetastoreClientPool.get(workUnitState.getJobState().getProperties(),
-        
Optional.fromNullable(workUnitState.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY)));
-    this.conversionEntity = getConversionEntity();
-    this.stagingDataPartitionDirName = 
HiveConverterUtils.getStagingDataPartitionDirName(conversionEntity, 
sourceDataPathIdentifier);
-    this.stagingDataPartitionLocation = stagingDataLocation + Path.SEPARATOR + 
stagingDataPartitionDirName;
-    this.partitionsDDLInfo = Maps.newHashMap();
-    this.partitionsDMLInfo = Maps.newHashMap();
-    HiveConverterUtils.populatePartitionInfo(conversionEntity, 
partitionsDDLInfo, partitionsDMLInfo);
-    this.destinationTableMeta = 
HiveConverterUtils.getDestinationTableMeta(outputDatabaseName,
-        outputTableName, workUnitState.getProperties()).getLeft();
-  }
-
-  /**
-   * Returns hive queries to be run as a part of a hive task.
-   * This does not include publish queries.
-   * @return
-   */
-  @Override
-  public List<String> generateQueries() {
-
-    List<String> hiveQueries = Lists.newArrayList();
-
-    Preconditions.checkNotNull(this.workUnit, "Workunit must not be null");
-    EventWorkunitUtils.setBeginDDLBuildTimeMetadata(this.workUnit, 
System.currentTimeMillis());
-
-    HiveConverterUtils.createStagingDirectory(fs, 
conversionConfig.getDestinationDataPath(),
-        conversionEntity, this.workUnitState);
-
-    // Create DDL statement for table
-    String createStagingTableDDL =
-        HiveConverterUtils.generateCreateDuplicateTableDDL(
-            inputDbName,
-            inputTableName,
-            stagingTableName,
-            stagingDataLocation,
-            Optional.of(outputDatabaseName));
-    hiveQueries.add(createStagingTableDDL);
-    log.debug("Create staging table DDL:\n" + createStagingTableDDL);
-
-    /*
-     * Setting partition mode to 'nonstrict' is needed to improve readability 
of the code.
-     * If we do not set dynamic partition mode to nonstrict, we will have to 
write partition values also,
-     * and because hive considers partition as a virtual column, we also have 
to write each of the column
-     * name in the query (in place of *) to match source and target columns.
-     */
-    hiveQueries.add("SET hive.exec.dynamic.partition.mode=nonstrict");
-
-    String insertInStagingTableDML =
-        HiveConverterUtils
-            .generateTableCopy(
-                inputTableName,
-                stagingTableName,
-                conversionEntity.getHiveTable().getDbName(),
-                outputDatabaseName,
-                Optional.of(partitionsDMLInfo));
-    hiveQueries.add(insertInStagingTableDML);
-    log.debug("Conversion staging DML: " + insertInStagingTableDML);
-
-    log.info("Conversion Queries {}\n",  hiveQueries);
-
-    EventWorkunitUtils.setEndDDLBuildTimeMetadata(workUnit, 
System.currentTimeMillis());
-    return hiveQueries;
-  }
-
-  /**
-   * Retuens a QueryBasedHivePublishEntity which includes publish level 
queries and cleanup commands.
-   * @return QueryBasedHivePublishEntity
-   * @throws DataConversionException
-   */
-  public QueryBasedHivePublishEntity generatePublishQueries() throws 
DataConversionException {
-
-    QueryBasedHivePublishEntity publishEntity = new 
QueryBasedHivePublishEntity();
-    List<String> publishQueries = publishEntity.getPublishQueries();
-    Map<String, String> publishDirectories = 
publishEntity.getPublishDirectories();
-    List<String> cleanupQueries = publishEntity.getCleanupQueries();
-    List<String> cleanupDirectories = publishEntity.getCleanupDirectories();
-
-    String createFinalTableDDL =
-        HiveConverterUtils.generateCreateDuplicateTableDDL(inputDbName, 
inputTableName, outputTableName,
-            outputDataLocation, Optional.of(outputDatabaseName));
-    publishQueries.add(createFinalTableDDL);
-    log.debug("Create final table DDL:\n" + createFinalTableDDL);
-
-    if (partitionsDDLInfo.size() == 0) {
-      log.debug("Snapshot directory to move: " + stagingDataLocation + " to: " 
+ outputDataLocation);
-      publishDirectories.put(stagingDataLocation, outputDataLocation);
-
-      String dropStagingTableDDL = 
HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, 
stagingTableName);
-
-      log.debug("Drop staging table DDL: " + dropStagingTableDDL);
-      cleanupQueries.add(dropStagingTableDDL);
-
-      log.debug("Staging table directory to delete: " + stagingDataLocation);
-      cleanupDirectories.add(stagingDataLocation);
-    } else {
-      String finalDataPartitionLocation = outputDataLocation + Path.SEPARATOR 
+ stagingDataPartitionDirName;
-      Optional<Path> destPartitionLocation =
-            
HiveConverterUtils.getDestinationPartitionLocation(destinationTableMeta, 
this.workUnitState,
-                conversionEntity.getHivePartition().get().getName());
-        finalDataPartitionLocation = 
HiveConverterUtils.updatePartitionLocation(finalDataPartitionLocation, 
this.workUnitState,
-            destPartitionLocation);
-
-      log.debug("Partition directory to move: " + stagingDataPartitionLocation 
+ " to: " + finalDataPartitionLocation);
-      publishDirectories.put(stagingDataPartitionLocation, 
finalDataPartitionLocation);
-      List<String> dropPartitionsDDL =
-          
HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, 
outputTableName, partitionsDMLInfo);
-      log.debug("Drop partitions if exist in final table: " + 
dropPartitionsDDL);
-      publishQueries.addAll(dropPartitionsDDL);
-      List<String> createFinalPartitionDDL =
-          
HiveAvroORCQueryGenerator.generateCreatePartitionDDL(outputDatabaseName, 
outputTableName,
-              finalDataPartitionLocation, partitionsDMLInfo, 
Optional.<String>absent());
-
-      log.debug("Create final partition DDL: " + createFinalPartitionDDL);
-      publishQueries.addAll(createFinalPartitionDDL);
-
-      String dropStagingTableDDL =
-          HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, 
stagingTableName);
-
-      log.debug("Drop staging table DDL: " + dropStagingTableDDL);
-      cleanupQueries.add(dropStagingTableDDL);
-
-      log.debug("Staging table directory to delete: " + stagingDataLocation);
-      cleanupDirectories.add(stagingDataLocation);
-    }
-
-    
publishQueries.addAll(HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName,
 outputTableName,
-        
AbstractAvroToOrcConverter.getDropPartitionsDDLInfo(conversionEntity)));
-
-    log.info("Publish partition entity: " + publishEntity);
-    return publishEntity;
-  }
-
-  private QueryBasedHiveConversionEntity getConversionEntity() throws 
Exception {
-
-
-    try (AutoReturnableObject<IMetaStoreClient> client = 
this.pool.getClient()) {
-
-      Table table = client.get().getTable(this.inputDbName, 
this.inputTableName);
-
-      SchemaAwareHiveTable schemaAwareHiveTable = new 
SchemaAwareHiveTable(table, 
AvroSchemaManager.getSchemaFromUrl(workUnit.getTableSchemaUrl(), fs));
-
-      SchemaAwareHivePartition schemaAwareHivePartition = null;
-
-      if (workUnit.getPartitionName().isPresent() && 
workUnit.getPartitionSchemaUrl().isPresent()) {
-        org.apache.hadoop.hive.metastore.api.Partition
-            partition = client.get().getPartition(this.inputDbName, 
this.inputTableName, workUnit.getPartitionName().get());
-        schemaAwareHivePartition =
-            new SchemaAwareHivePartition(table, partition, 
AvroSchemaManager.getSchemaFromUrl(workUnit.getPartitionSchemaUrl().get(), fs));
-      }
-      return new QueryBasedHiveConversionEntity(this.hiveDataset, 
schemaAwareHiveTable, Optional.fromNullable(schemaAwareHivePartition));
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java
deleted file mode 100644
index d1c9371..0000000
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.data.management.conversion.hive.task;
-
-import java.util.List;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.SourceState;
-import 
org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
-import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
-import 
org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
-import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
-import org.apache.gobblin.runtime.task.TaskUtils;
-import org.apache.gobblin.source.workunit.WorkUnit;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-
-/**
- * A simple HiveSource for {@link HiveMaterializer}.
- */
-public class HiveMaterializerSource extends HiveSource {
-
-  @Override
-  public List<WorkUnit> getWorkunits(SourceState state) {
-    if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) {
-      state.setProp(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY, 
ConvertibleHiveDatasetFinder.class.getName());
-    }
-    if (!state.contains(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY)) {
-      state.setProp(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, 
"hive.conversion.avro");
-    }
-
-    List<WorkUnit> workUnits = super.getWorkunits(state);
-
-    for(WorkUnit workUnit : workUnits) {
-      if 
(Boolean.valueOf(workUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY)))
 {
-        log.info("Ignoring Watermark workunit for {}", 
workUnit.getProp(ConfigurationKeys.DATASET_URN_KEY));
-        continue;
-      }
-      TaskUtils.setTaskFactoryClass(workUnit, 
HiveMaterializerTaskFactory.class);
-    }
-    return workUnits;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java
deleted file mode 100644
index c05d4bf..0000000
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.data.management.conversion.hive.task;
-
-import org.apache.gobblin.publisher.DataPublisher;
-import org.apache.gobblin.publisher.NoopPublisher;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.TaskContext;
-import org.apache.gobblin.runtime.task.TaskFactory;
-import org.apache.gobblin.runtime.task.TaskIFace;
-
-/**
- * A {@link TaskFactory} that runs a {@link HiveMaterializer} task.
- * This factory is intended to publish data in the task directly, and
- * uses a {@link NoopPublisher}.
- */
-public class HiveMaterializerTaskFactory implements TaskFactory {
-  @Override
-  public TaskIFace createTask(TaskContext taskContext) {
-    try {
-      return new HiveMaterializer(taskContext);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public DataPublisher createDataPublisher(JobState.DatasetState datasetState) 
{
-    return new NoopPublisher(datasetState);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
index aabbb6e..16a2028 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
@@ -25,9 +25,11 @@ import java.util.Set;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import 
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
 import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
@@ -52,6 +54,15 @@ import lombok.extern.slf4j.Slf4j;
  * which creates extract/write level queries and publish level queries 
respectively.
  */
 public abstract class HiveTask extends BaseAbstractTask {
+  private static final String USE_WATERMARKER_KEY = 
"internal.hiveTask.useWatermarker";
+
+  /**
+   * Disable Hive watermarker. This is necessary when there is no concrete 
source table where watermark can be inferred.
+   */
+  public static void disableHiveWatermarker(State state) {
+    state.setProp(USE_WATERMARKER_KEY, Boolean.toString(false));
+  }
+
   protected final TaskContext taskContext;
   protected final WorkUnitState workUnitState;
   protected final HiveWorkUnit workUnit;
@@ -114,11 +125,8 @@ public abstract class HiveTask extends BaseAbstractTask {
           for (Map.Entry<String, String> publishDir : 
publishDirectories.entrySet()) {
             HadoopUtils.renamePath(fs, new Path(publishDir.getKey()), new 
Path(publishDir.getValue()), true);
           }
-        } catch (RuntimeException re) {
-          throw re;
-        }
-        catch (Exception e) {
-          log.error("error in move dir");
+        } catch (Throwable t) {
+          throw Throwables.propagate(t);
         }
       }
 
@@ -132,11 +140,12 @@ public abstract class HiveTask extends BaseAbstractTask {
 
       wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
 
-      HiveSourceWatermarker watermarker = 
GobblinConstructorUtils.invokeConstructor(
-          HiveSourceWatermarkerFactory.class, 
wus.getProp(HiveSource.HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY,
-              
HiveSource.DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS)).createFromState(wus);
+      if (wus.getPropAsBoolean(USE_WATERMARKER_KEY, true)) {
+        HiveSourceWatermarker watermarker = 
GobblinConstructorUtils.invokeConstructor(HiveSourceWatermarkerFactory.class,
+            wus.getProp(HiveSource.HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY, 
HiveSource.DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS)).createFromState(wus);
 
-      watermarker.setActualHighWatermark(wus);
+        watermarker.setActualHighWatermark(wus);
+      }
     } catch (RuntimeException re) {
       throw re;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java
index 1502b06..8c7357b 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java
@@ -25,7 +25,7 @@ import 
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiv
 /**
  * An interface for generating queries.
  */
-interface QueryGenerator {
+public interface QueryGenerator {
 
   /**
    * Generates queries to extract/convert/write data

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
index 94c427c..fa15459 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java
@@ -73,7 +73,7 @@ public class HiveQueryExecutionWriter implements 
DataWriter<QueryBasedHiveConver
    * Method to add properties needed by publisher to preserve partition params
    */
   private void addPropsForPublisher(QueryBasedHiveConversionEntity 
hiveConversionEntity) {
-    if (!hiveConversionEntity.getHivePartition().isPresent()) {
+    if (!hiveConversionEntity.getPartition().isPresent()) {
       return;
     }
     ConvertibleHiveDataset convertibleHiveDataset = 
hiveConversionEntity.getConvertibleHiveDataset();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
index 9d6bd32..eee2005 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
@@ -66,7 +66,7 @@ public class HiveSourceTest {
 
     SourceState testState = getTestState(dbName);
 
-    this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, tableSdLoc, 
Optional.<String> absent());
+    this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName, 
tableSdLoc, Optional.<String> absent());
 
     List<WorkUnit> workUnits = hiveSource.getWorkunits(testState);
 
@@ -92,7 +92,7 @@ public class HiveSourceTest {
 
     SourceState testState = getTestState(dbName);
 
-    Table tbl = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, 
tableSdLoc, Optional.of("field"));
+    Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
tableName, tableSdLoc, Optional.of("field"));
 
     this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("f1"), 
(int) System.currentTimeMillis());
 
@@ -126,8 +126,8 @@ public class HiveSourceTest {
 
     this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, 
false, true, true);
 
-    this.hiveMetastoreTestUtils.createTestTable(dbName, tableName1, 
tableSdLoc1, Optional.<String> absent());
-    this.hiveMetastoreTestUtils.createTestTable(dbName, tableName2, 
tableSdLoc2, Optional.<String> absent(), true);
+    this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName1, 
tableSdLoc1, Optional.<String> absent());
+    this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName2, 
tableSdLoc2, Optional.<String> absent(), true);
 
     List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList();
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
index 32c6af8..5d684b4 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Optional;
@@ -93,12 +94,12 @@ public class LocalHiveMetastoreTestUtils {
     }
   }
 
-  public Table createTestTable(String dbName, String tableName, String 
tableSdLoc, Optional<String> partitionFieldName)
+  public Table createTestAvroTable(String dbName, String tableName, String 
tableSdLoc, Optional<String> partitionFieldName)
       throws Exception {
-    return createTestTable(dbName, tableName, tableSdLoc, partitionFieldName, 
false);
+    return createTestAvroTable(dbName, tableName, tableSdLoc, 
partitionFieldName, false);
   }
 
-  public Table createTestTable(String dbName, String tableName, String 
tableSdLoc,
+  public Table createTestAvroTable(String dbName, String tableName, String 
tableSdLoc,
       Optional<String> partitionFieldName, boolean ignoreDbCreation) throws 
Exception {
     if (!ignoreDbCreation) {
       createTestDb(dbName);
@@ -106,6 +107,7 @@ public class LocalHiveMetastoreTestUtils {
 
     Table tbl = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(dbName, 
tableName);
     tbl.getSd().setLocation(tableSdLoc);
+    tbl.getSd().getSerdeInfo().setSerializationLib(AvroSerDe.class.getName());
     
tbl.getSd().getSerdeInfo().setParameters(ImmutableMap.of(HiveAvroSerDeManager.SCHEMA_URL,
 "/tmp/dummy"));
 
     if (partitionFieldName.isPresent()) {
@@ -117,11 +119,11 @@ public class LocalHiveMetastoreTestUtils {
     return tbl;
   }
 
-  public Table createTestTable(String dbName, String tableName, List<String> 
partitionFieldNames) throws Exception {
-    return createTestTable(dbName, tableName, "/tmp/" + tableName, 
partitionFieldNames, true);
+  public Table createTestAvroTable(String dbName, String tableName, 
List<String> partitionFieldNames) throws Exception {
+    return createTestAvroTable(dbName, tableName, "/tmp/" + tableName, 
partitionFieldNames, true);
   }
 
-  public Table createTestTable(String dbName, String tableName, String 
tableSdLoc,
+  public Table createTestAvroTable(String dbName, String tableName, String 
tableSdLoc,
       List<String> partitionFieldNames, boolean ignoreDbCreation)
       throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
index 7e38841..893e13c 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
@@ -68,7 +68,7 @@ public class HiveAvroToOrcConverterTest {
 
     this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, 
false, true, true);
 
-    Table table = this.hiveMetastoreTestUtils.createTestTable(dbName, 
tableName, tableSdLoc, Optional.<String> absent());
+    Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
tableName, tableSdLoc, Optional.<String> absent());
     Schema schema = 
ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir, 
"recordWithinRecordWithinRecord_nested.json");
     WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, tableName, 
0);
 
@@ -120,7 +120,7 @@ public class HiveAvroToOrcConverterTest {
 
     this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, 
false, true, true);
 
-    Table table = this.hiveMetastoreTestUtils.createTestTable(dbName, 
tableName, tableSdLoc, Optional.<String> absent());
+    Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
tableName, tableSdLoc, Optional.<String> absent());
     Schema schema = 
ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir, 
"recordWithinRecordWithinRecord_nested.json");
     WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, tableName, 
0);
     wus.getJobState().setProp("orc.table.flatten.schema", "false");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
new file mode 100644
index 0000000..d237a7b
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.materializer;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import 
org.apache.gobblin.data.management.conversion.hive.LocalHiveMetastoreTestUtils;
+import 
org.apache.gobblin.data.management.conversion.hive.entities.TableLikeStageableTableMetadata;
+import 
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveTask;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.AutoReturnableObject;
+import org.apache.gobblin.util.HiveJdbcConnector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+
+public class HiveMaterializerTest {
+
+  private final LocalHiveMetastoreTestUtils localHiveMetastore = 
LocalHiveMetastoreTestUtils.getInstance();
+  private final String dbName = HiveMaterializerTest.class.getSimpleName();
+  private final String sourceTableName = "source";
+  private final String partitionColumn = "part";
+  private File dataFile;
+  private HiveJdbcConnector jdbcConnector;
+  private HiveDataset dataset;
+  private HiveMetastoreClientPool pool;
+
+  @BeforeClass
+  public void setup() throws Exception {
+    this.jdbcConnector = HiveJdbcConnector.newEmbeddedConnector(2);
+    this.dataFile = new 
File(getClass().getClassLoader().getResource("hiveMaterializerTest/source/").toURI());
+    this.localHiveMetastore.dropDatabaseIfExists(this.dbName);
+    this.localHiveMetastore.createTestDb(this.dbName);
+    this.jdbcConnector.executeStatements(
+        String.format("CREATE EXTERNAL TABLE %s.%s (id STRING, name String) 
PARTITIONED BY (%s String) "
+                + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS 
TEXTFILE",
+            this.dbName, this.sourceTableName, this.partitionColumn),
+        String.format("ALTER TABLE %s.%s ADD PARTITION (part = 'part1') 
LOCATION '%s'",
+            this.dbName, this.sourceTableName, this.dataFile.getAbsolutePath() 
+ "/part1"),
+        String.format("ALTER TABLE %s.%s ADD PARTITION (part = 'part2') 
LOCATION '%s'",
+            this.dbName, this.sourceTableName, this.dataFile.getAbsolutePath() 
+ "/part2"));
+
+    List<List<String>> allTable = 
executeStatementAndGetResults(this.jdbcConnector,
+        String.format("SELECT * FROM %s.%s", this.dbName, 
this.sourceTableName), 3);
+    Assert.assertEquals(allTable.size(), 8);
+    List<List<String>> part1 = 
executeStatementAndGetResults(this.jdbcConnector,
+        String.format("SELECT * FROM %s.%s WHERE %s='part1'", this.dbName, 
this.sourceTableName, this.partitionColumn), 3);
+    Assert.assertEquals(part1.size(), 4);
+
+    this.pool = HiveMetastoreClientPool.get(new Properties(), 
Optional.absent());
+    Table table;
+    try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) {
+      table = new Table(client.get().getTable(this.dbName, 
this.sourceTableName));
+    }
+    this.dataset = new HiveDataset(FileSystem.getLocal(new Configuration()), 
pool, table, new Properties());
+  }
+
+  @AfterClass
+  public void teardown() throws Exception {
+    if (this.jdbcConnector != null) {
+      this.jdbcConnector.close();
+    }
+  }
+
+  @Test
+  public void testCopyTable() throws Exception {
+    String destinationTable = "copyTable";
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    WorkUnit workUnit = HiveMaterializer.tableCopyWorkUnit(this.dataset, new 
TableLikeStageableTableMetadata(this.dataset.getTable(),
+        this.dbName, destinationTable, tmpDir.getAbsolutePath()), 
String.format("%s=part1", this.partitionColumn));
+
+    HiveMaterializer hiveMaterializer = new 
HiveMaterializer(getTaskContextForRun(workUnit));
+    hiveMaterializer.run();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+    hiveMaterializer.commit();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+
+    List<List<String>> allTable = 
executeStatementAndGetResults(this.jdbcConnector,
+        String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 
3);
+    Assert.assertEquals(allTable.size(), 4);
+    Assert.assertEquals(allTable.stream().map(l -> 
l.get(0)).collect(Collectors.toList()), Lists.newArrayList("101", "102", "103", 
"104"));
+  }
+
+  @Test
+  public void testMaterializeTable() throws Exception {
+    String destinationTable = "materializeTable";
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    WorkUnit workUnit = 
HiveMaterializer.viewMaterializationWorkUnit(this.dataset, 
HiveConverterUtils.StorageFormat.AVRO,
+        new TableLikeStageableTableMetadata(this.dataset.getTable(), 
this.dbName, destinationTable, tmpDir.getAbsolutePath()), null);
+
+    HiveMaterializer hiveMaterializer = new 
HiveMaterializer(getTaskContextForRun(workUnit));
+    hiveMaterializer.run();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+    hiveMaterializer.commit();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+
+    List<List<String>> allTable = 
executeStatementAndGetResults(this.jdbcConnector,
+        String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 
3);
+    Assert.assertEquals(allTable.size(), 8);
+    Assert.assertEquals(allTable.stream().map(l -> 
l.get(0)).collect(Collectors.toList()),
+        Lists.newArrayList("101", "102", "103", "104", "201", "202", "203", 
"204"));
+  }
+
+  @Test
+  public void testMaterializeTablePartition() throws Exception {
+    String destinationTable = "materializeTablePartition";
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    WorkUnit workUnit = 
HiveMaterializer.viewMaterializationWorkUnit(this.dataset, 
HiveConverterUtils.StorageFormat.AVRO,
+        new TableLikeStageableTableMetadata(this.dataset.getTable(), 
this.dbName, destinationTable, tmpDir.getAbsolutePath()),
+        String.format("%s=part1", this.partitionColumn));
+
+    HiveMaterializer hiveMaterializer = new 
HiveMaterializer(getTaskContextForRun(workUnit));
+    hiveMaterializer.run();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+    hiveMaterializer.commit();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+
+    List<List<String>> allTable = 
executeStatementAndGetResults(this.jdbcConnector,
+        String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 
3);
+    Assert.assertEquals(allTable.size(), 4);
+    Assert.assertEquals(allTable.stream().map(l -> 
l.get(0)).collect(Collectors.toList()),
+        Lists.newArrayList("101", "102", "103", "104"));
+  }
+
+  @Test
+  public void testMaterializeView() throws Exception {
+    String destinationTable = "materializeView";
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+
+    String viewName = "myView";
+
+    this.jdbcConnector.executeStatements(String.format("CREATE VIEW %s.%s AS 
SELECT * FROM %s.%s WHERE name = 'foo'",
+        this.dbName, viewName, this.dbName, this.sourceTableName));
+
+    Table view;
+    try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) {
+      view = new Table(client.get().getTable(this.dbName, viewName));
+    }
+    HiveDataset viewDataset = new HiveDataset(FileSystem.getLocal(new 
Configuration()), pool, view, new Properties());
+
+    WorkUnit workUnit = 
HiveMaterializer.viewMaterializationWorkUnit(viewDataset, 
HiveConverterUtils.StorageFormat.AVRO,
+        new TableLikeStageableTableMetadata(viewDataset.getTable(), 
this.dbName, destinationTable, tmpDir.getAbsolutePath()),
+        null);
+
+    HiveMaterializer hiveMaterializer = new 
HiveMaterializer(getTaskContextForRun(workUnit));
+    hiveMaterializer.run();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+    hiveMaterializer.commit();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+
+    List<List<String>> allTable = 
executeStatementAndGetResults(this.jdbcConnector,
+        String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 
3);
+    Assert.assertEquals(allTable.size(), 4);
+    Assert.assertEquals(allTable.stream().map(l -> 
l.get(0)).collect(Collectors.toList()),
+        Lists.newArrayList("101", "103", "201", "203"));
+  }
+
+  @Test
+  public void testMaterializeQuery() throws Exception {
+    String destinationTable = "materializeQuery";
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+
+    WorkUnit workUnit = HiveMaterializer.queryResultMaterializationWorkUnit(
+        String.format("SELECT * FROM %s.%s WHERE name = 'foo'", this.dbName, 
this.sourceTableName),
+        HiveConverterUtils.StorageFormat.AVRO,
+        new TableLikeStageableTableMetadata(this.dataset.getTable(), 
this.dbName, destinationTable, tmpDir.getAbsolutePath()));
+
+    HiveMaterializer hiveMaterializer = new 
HiveMaterializer(getTaskContextForRun(workUnit));
+    hiveMaterializer.run();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+    hiveMaterializer.commit();
+    Assert.assertEquals(hiveMaterializer.getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+
+    List<List<String>> allTable = 
executeStatementAndGetResults(this.jdbcConnector,
+        String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 
3);
+    Assert.assertEquals(allTable.size(), 4);
+    Assert.assertEquals(allTable.stream().map(l -> 
l.get(0)).collect(Collectors.toList()),
+        Lists.newArrayList("101", "103", "201", "203"));
+  }
+
+  private TaskContext getTaskContextForRun(WorkUnit workUnit) {
+    workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, "job123");
+    workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "task123");
+    
workUnit.setProp(HiveConverterUtils.HIVE_DATASET_DESTINATION_SKIP_SETGROUP, 
Boolean.toString(true));
+    HiveTask.disableHiveWatermarker(workUnit);
+    JobState jobState = new JobState("job", "job123");
+    return new TaskContext(new WorkUnitState(workUnit, jobState));
+  }
+
+  private List<List<String>> executeStatementAndGetResults(HiveJdbcConnector 
connector, String query, int columns) throws SQLException {
+    Connection conn = connector.getConnection();
+    List<List<String>> result = new ArrayList<>();
+
+    try (Statement stmt = conn.createStatement()) {
+      stmt.execute(query);
+      ResultSet rs = stmt.getResultSet();
+      while (rs.next()) {
+        List<String> thisResult = new ArrayList<>();
+        for (int i = 0; i < columns; i++) {
+          thisResult.add(rs.getString(i + 1));
+        }
+        result.add(thisResult);
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
index 281ee62..8a7d37e 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.tools.ant.taskdefs.Local;
 import org.joda.time.DateTime;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -423,7 +422,7 @@ public class PartitionLevelWatermarkerTest {
     File tableSdFile = Files.createTempDir();
     tableSdFile.deleteOnExit();
     return new Table(LocalHiveMetastoreTestUtils.getInstance()
-        .createTestTable(dbName, name, tableSdFile.getAbsolutePath(),
+        .createTestAvroTable(dbName, name, tableSdFile.getAbsolutePath(),
             partitioned ? Optional.of("part") : Optional.<String>absent()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java
index 2db01c3..fdd5f44 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java
@@ -74,14 +74,14 @@ public class HiveRetentionTest {
       // Setup db, table to purge. Creating 4 partitions. 2 will be deleted 
and 2 will be retained
       String purgedTableSdLoc = new Path(testTempPath, purgedDbName + 
purgedTableName).toString();
       this.hiveMetastoreTestUtils.dropDatabaseIfExists(purgedDbName);
-      final Table purgedTbl = 
this.hiveMetastoreTestUtils.createTestTable(purgedDbName, purgedTableName, 
purgedTableSdLoc, ImmutableList.of("datepartition"), false);
+      final Table purgedTbl = 
this.hiveMetastoreTestUtils.createTestAvroTable(purgedDbName, purgedTableName, 
purgedTableSdLoc, ImmutableList.of("datepartition"), false);
 
       // Setup db, table and partitions to act as replacement partitions source
       String replacementSourceTableSdLoc = new Path(testTempPath, purgedDbName 
+ purgedTableName + "_source").toString();
       String replacementDbName = purgedDbName + "_source";
       String replacementTableName = purgedTableName + "_source";
       this.hiveMetastoreTestUtils.dropDatabaseIfExists(replacementDbName);
-      final Table replacementTbl = 
this.hiveMetastoreTestUtils.createTestTable(replacementDbName, 
replacementTableName, replacementSourceTableSdLoc, 
ImmutableList.of("datepartition"), false);
+      final Table replacementTbl = 
this.hiveMetastoreTestUtils.createTestAvroTable(replacementDbName, 
replacementTableName, replacementSourceTableSdLoc, 
ImmutableList.of("datepartition"), false);
 
       String deleted1 = "2016-01-01-00";
       String deleted2 = "2016-01-02-02";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java
index e028c34..d6285a9 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java
@@ -75,7 +75,7 @@ public class DatePartitionedHiveVersionFinderTest {
     DatePartitionHiveVersionFinder versionFinder = new 
DatePartitionHiveVersionFinder(this.fs, ConfigFactory.empty());
     String tableName = "VfTb1";
 
-    Table tbl = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, 
ImmutableList.of("datepartition"));
+    Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
tableName, ImmutableList.of("datepartition"));
     org.apache.hadoop.hive.metastore.api.Partition tp =
         this.hiveMetastoreTestUtils.addTestPartition(tbl, 
ImmutableList.of("2016-01-01-20"), (int) System.currentTimeMillis());
     Partition partition = new Partition(new 
org.apache.hadoop.hive.ql.metadata.Table(tbl), tp);
@@ -95,7 +95,7 @@ public class DatePartitionedHiveVersionFinderTest {
 
     DatePartitionHiveVersionFinder versionFinder = new 
DatePartitionHiveVersionFinder(this.fs, conf);
 
-    Table tbl = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, 
ImmutableList.of("field1"));
+    Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
tableName, ImmutableList.of("field1"));
     org.apache.hadoop.hive.metastore.api.Partition tp =
         this.hiveMetastoreTestUtils.addTestPartition(tbl, 
ImmutableList.of("2016/01/01/20"), (int) System.currentTimeMillis());
     Partition partition = new Partition(new 
org.apache.hadoop.hive.ql.metadata.Table(tbl), tp);
@@ -109,7 +109,7 @@ public class DatePartitionedHiveVersionFinderTest {
     DatePartitionHiveVersionFinder versionFinder = new 
DatePartitionHiveVersionFinder(this.fs, ConfigFactory.empty());
     String tableName = "VfTb3";
 
-    Table tbl = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, 
ImmutableList.of("datepartition", "field1"));
+    Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
tableName, ImmutableList.of("datepartition", "field1"));
     org.apache.hadoop.hive.metastore.api.Partition tp =
         this.hiveMetastoreTestUtils.addTestPartition(tbl, 
ImmutableList.of("2016-01-01-20", "f1"), (int) System.currentTimeMillis());
     Partition partition = new Partition(new 
org.apache.hadoop.hive.ql.metadata.Table(tbl), tp);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part1/data.txt
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part1/data.txt
 
b/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part1/data.txt
new file mode 100644
index 0000000..e195725
--- /dev/null
+++ 
b/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part1/data.txt
@@ -0,0 +1,4 @@
+101,foo
+102,bar
+103,foo
+104,bar

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part2/data.txt
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part2/data.txt
 
b/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part2/data.txt
new file mode 100644
index 0000000..08e1734
--- /dev/null
+++ 
b/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part2/data.txt
@@ -0,0 +1,4 @@
+201,foo
+202,bar
+203,foo
+204,bar

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-example/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-example/build.gradle b/gobblin-example/build.gradle
index 0c77bce..d3cec0a 100644
--- a/gobblin-example/build.gradle
+++ b/gobblin-example/build.gradle
@@ -38,6 +38,7 @@ dependencies {
     compile externalDependency.httpcore
     compile externalDependency.httpclient
     compile externalDependency.commonsCli
+    compile externalDependency.hiveJdbc
 
     testCompile externalDependency.testng
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-example/src/main/java/org/apache/gobblin/example/hivematerializer/HiveMaterializerSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-example/src/main/java/org/apache/gobblin/example/hivematerializer/HiveMaterializerSource.java
 
b/gobblin-example/src/main/java/org/apache/gobblin/example/hivematerializer/HiveMaterializerSource.java
new file mode 100644
index 0000000..f7d9767
--- /dev/null
+++ 
b/gobblin-example/src/main/java/org/apache/gobblin/example/hivematerializer/HiveMaterializerSource.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.example.hivematerializer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import 
org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata;
+import 
org.apache.gobblin.data.management.conversion.hive.materializer.HiveMaterializer;
+import 
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveTask;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.AutoReturnableObject;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.thrift.TException;
+
+import com.google.api.client.repackaged.com.google.common.base.Splitter;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import static org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder.*;
+
+
+@Slf4j
+
+/**
+ * A sample source showing how to create work units for Hive materialization. 
This source allows copying of tables,
+ * materialization of views, and materialization of queries.
+ */
+public class HiveMaterializerSource implements Source<Object, Object> {
+
+  private static final String HIVE_MATERIALIZER_SOURCE_PREFIX = 
"gobblin.hiveMaterializerSource";
+  public static final String COPY_TABLE_KEY = HIVE_MATERIALIZER_SOURCE_PREFIX 
+ ".copyTable";
+  public static final String MATERIALIZE_VIEW = 
HIVE_MATERIALIZER_SOURCE_PREFIX + ".materializeView";
+  public static final String MATERIALIZE_QUERY = 
HIVE_MATERIALIZER_SOURCE_PREFIX + ".materializeQuery";
+  public static final String OUTPUT_STORAGE_FORMAT = 
HIVE_MATERIALIZER_SOURCE_PREFIX + ".outputStorageFormat";
+
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    try {
+      FileSystem fs = HadoopUtils.getSourceFileSystem(state);
+      Config config = ConfigUtils.propertiesToConfig(state.getProperties());
+
+      if (state.contains(COPY_TABLE_KEY)) {
+        HiveDataset dataset = getHiveDataset(state.getProp(COPY_TABLE_KEY), 
fs, state);
+        WorkUnit workUnit = HiveMaterializer.tableCopyWorkUnit(dataset,
+            new 
StageableTableMetadata(config.getConfig(HIVE_MATERIALIZER_SOURCE_PREFIX), 
dataset.getTable()), null);
+        HiveTask.disableHiveWatermarker(workUnit);
+        return Lists.newArrayList(workUnit);
+      } else if (state.contains(MATERIALIZE_VIEW)) {
+        HiveDataset dataset = getHiveDataset(state.getProp(MATERIALIZE_VIEW), 
fs, state);
+        WorkUnit workUnit = 
HiveMaterializer.viewMaterializationWorkUnit(dataset, 
getOutputStorageFormat(state),
+            new 
StageableTableMetadata(config.getConfig(HIVE_MATERIALIZER_SOURCE_PREFIX), 
dataset.getTable()), null);
+        HiveTask.disableHiveWatermarker(workUnit);
+        return Lists.newArrayList(workUnit);
+      } else if (state.contains(MATERIALIZE_QUERY)) {
+        String query = state.getProp(MATERIALIZE_QUERY);
+        WorkUnit workUnit = 
HiveMaterializer.queryResultMaterializationWorkUnit(query, 
getOutputStorageFormat(state),
+            new 
StageableTableMetadata(config.getConfig(HIVE_MATERIALIZER_SOURCE_PREFIX), 
null));
+        HiveTask.disableHiveWatermarker(workUnit);
+        return Lists.newArrayList(workUnit);
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+    throw new RuntimeException(String.format("Must specify either %s, %s, or 
%s.", COPY_TABLE_KEY, MATERIALIZE_QUERY,
+        MATERIALIZE_VIEW));
+  }
+
+  private HiveDataset getHiveDataset(String tableString, FileSystem fs, State 
state) throws IOException {
+    try {
+      HiveMetastoreClientPool pool = 
HiveMetastoreClientPool.get(state.getProperties(),
+          Optional.fromNullable(state.getProp(HIVE_METASTORE_URI_KEY)));
+
+      List<String> tokens = Splitter.on(".").splitToList(tableString);
+      DbAndTable sourceDbAndTable = new DbAndTable(tokens.get(0), 
tokens.get(1));
+
+      try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) {
+        Table sourceTable = new 
Table(client.get().getTable(sourceDbAndTable.getDb(), 
sourceDbAndTable.getTable()));
+        return new HiveDataset(fs, pool, sourceTable, 
ConfigUtils.propertiesToConfig(state.getProperties()));
+      }
+    } catch (TException exc) {
+      throw new RuntimeException(exc);
+    }
+  }
+
+  private HiveConverterUtils.StorageFormat getOutputStorageFormat(State state) 
{
+    return 
HiveConverterUtils.StorageFormat.valueOf(state.getProp(OUTPUT_STORAGE_FORMAT,
+        HiveConverterUtils.StorageFormat.TEXT_FILE.name()));
+  }
+
+  @Override
+  public Extractor<Object, Object> getExtractor(WorkUnitState state) throws 
IOException {
+    return null;
+  }
+
+  @Override
+  public void shutdown(SourceState state) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-example/src/main/resources/hive-materializer.conf
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/hive-materializer.conf 
b/gobblin-example/src/main/resources/hive-materializer.conf
new file mode 100644
index 0000000..636caf5
--- /dev/null
+++ b/gobblin-example/src/main/resources/hive-materializer.conf
@@ -0,0 +1,19 @@
+# A job to materialize Hive tables / views / queries
+
+# =================
+# Enable exactly one of the modes below
+# =================
+#gobblin.hiveMaterializerSource.copyTable="myDb.myTable"
+#gobblin.hiveMaterializerSource.materializeView="myDb.myView"
+#gobblin.hiveMaterializerSource.materializeQuery="SELECT * from myDb.myTable 
where name='foo'"
+
+source.class= 
org.apache.gobblin.example.hivematerializer.HiveMaterializerSource
+
+gobblin.hiveMaterializerSource.destination.dataPath="/tmp/hiveMaterializer/myTableMaterialized"
+gobblin.hiveMaterializerSource.destination.tableName="myTable_materialized"
+gobblin.hiveMaterializerSource.destination.dbName=myDb
+# Supported: TEXTFILE, SEQUENCEFILE, ORC, PARQUET, AVRO, RCFILE
+gobblin.hiveMaterializerSource.outputStorageFormat=AVRO
+
+hive.dataset.hive.metastore.uri="thrift://localhost:60083"
+hiveserver.connection.string="jdbc:hive2://"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java
index d301e29..1dd448b 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java
@@ -67,9 +67,6 @@ public class HiveJdbcConnector implements Closeable {
   // Connection to the Hive DB
   private Connection conn;
 
-  // Re-usable Statement
-  private Statement stmt;
-
   private int hiveServerVersion;
 
   private boolean isSimulate;
@@ -160,7 +157,6 @@ public class HiveJdbcConnector implements Closeable {
    */
   private HiveJdbcConnector withHiveConnectionFromUrl(String hiveServerUrl) 
throws SQLException {
     this.conn = DriverManager.getConnection(hiveServerUrl);
-    this.stmt = this.conn.createStatement();
     return this;
   }
 
@@ -175,7 +171,6 @@ public class HiveJdbcConnector implements Closeable {
   private HiveJdbcConnector withHiveConnectionFromUrlUserPassword(String 
hiveServerUrl, String hiveServerUser,
       String hiveServerPassword) throws SQLException {
     this.conn = DriverManager.getConnection(hiveServerUrl, hiveServerUser, 
hiveServerPassword);
-    this.stmt = this.conn.createStatement();
     return this;
   }
 
@@ -190,7 +185,6 @@ public class HiveJdbcConnector implements Closeable {
     } else {
       this.conn = 
DriverManager.getConnection(HIVE2_EMBEDDED_CONNECTION_STRING);
     }
-    this.stmt = this.conn.createStatement();
     return this;
   }
 
@@ -250,7 +244,14 @@ public class HiveJdbcConnector implements Closeable {
         LOG.info("[SIMULATE MODE] STATEMENT NOT RUN: " + 
choppedStatement(statement));
       } else {
         LOG.info("RUNNING STATEMENT: " + choppedStatement(statement));
-        this.stmt.execute(statement);
+        try (Statement stmt = this.conn.createStatement()) {
+          try {
+            stmt.execute(statement);
+          } catch (SQLException sqe) {
+            LOG.error("Failed statement: " + statement);
+            throw sqe;
+          }
+        }
       }
     }
   }
@@ -269,13 +270,6 @@ public class HiveJdbcConnector implements Closeable {
 
   @Override
   public void close() throws IOException {
-    if (this.stmt != null) {
-      try {
-        this.stmt.close();
-      } catch (SQLException e) {
-        LOG.error("Failed to close JDBC statement object", e);
-      }
-    }
 
     if (this.conn != null) {
       try {

Reply via email to