Repository: incubator-gobblin Updated Branches: refs/heads/master a5fe06210 -> 8a374f207
[GOBBLIN-181] Create HiveTask using customized Gobblin Task Closes #2062 from arjun4084346/materializer Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8a374f20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8a374f20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8a374f20 Branch: refs/heads/master Commit: 8a374f207bc7838bcfb144d644b6243c78dc122d Parents: a5fe062 Author: Arjun <[email protected]> Authored: Tue Sep 5 11:23:29 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Tue Sep 5 11:23:29 2017 -0700 ---------------------------------------------------------------------- .../converter/AbstractAvroToOrcConverter.java | 125 +----- .../conversion/hive/source/HiveSource.java | 2 +- .../hive/task/HiveConverterUtils.java | 387 +++++++++++++++++++ .../conversion/hive/task/HiveMaterializer.java | 54 +++ .../task/HiveMaterializerQueryGenerator.java | 244 ++++++++++++ .../hive/task/HiveMaterializerSource.java | 60 +++ .../hive/task/HiveMaterializerTaskFactory.java | 46 +++ .../conversion/hive/task/HiveTask.java | 178 +++++++++ .../conversion/hive/task/QueryGenerator.java | 42 ++ .../hive/task/HiveConverterUtilsTest.java | 49 +++ .../hive/validation/ValidationJob.java | 33 +- .../org/apache/gobblin/util/HadoopUtils.java | 30 +- .../apache/gobblin/util/HadoopUtilsTest.java | 21 + 13 files changed, 1117 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java index 9c4a5ec..b8495a9 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java @@ -32,6 +32,7 @@ import org.apache.avro.Schema; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -137,13 +138,6 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem public static final String HIVE_DATASET_DESTINATION_SKIP_SETGROUP = "hive.dataset.destination.skip.setGroup"; public static final boolean DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP = false; - /** - * If the property is set to true then partition dir is overwritten, - * else a new time-stamped partition dir is created to avoid breaking in-flight queries - * Check org.apache.gobblin.data.management.retention.Avro2OrcStaleDatasetCleaner to clean stale directories - */ - public static final String HIVE_DATASET_PARTITION_OVERWRITE = "hive.dataset.partition.overwrite"; - public static final boolean DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE = true; /** * If set to true, a set format DDL will be separate from add partition DDL @@ -224,8 +218,8 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem String orcDataLocation = getOrcDataLocation(); String orcStagingDataLocation = getOrcStagingDataLocation(orcStagingTableName); boolean isEvolutionEnabled = getConversionConfig().isEvolutionEnabled(); - Pair<Optional<Table>, Optional<List<Partition>>> destinationMeta = getDestinationTableMeta(orcTableDatabase, - orcTableName, workUnit); + Pair<Optional<Table>, Optional<List<Partition>>> destinationMeta = HiveConverterUtils.getDestinationTableMeta(orcTableDatabase, + orcTableName, workUnit.getProperties()); Optional<Table> destinationTableMeta = destinationMeta.getLeft(); // Optional @@ -263,7 +257,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem // Populate optional partition info Map<String, String> partitionsDDLInfo = Maps.newHashMap(); Map<String, String> partitionsDMLInfo = Maps.newHashMap(); - populatePartitionInfo(conversionEntity, partitionsDDLInfo, partitionsDMLInfo); + HiveConverterUtils.populatePartitionInfo(conversionEntity, partitionsDDLInfo, partitionsDMLInfo); /* * Create ORC data location with the same permissions as Avro data @@ -334,7 +328,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem log.debug("Create staging table DDL: " + createStagingTableDDL); // Create DDL statement for partition - String orcStagingDataPartitionDirName = getOrcStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier); + String orcStagingDataPartitionDirName = HiveConverterUtils.getStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier); String orcStagingDataPartitionLocation = orcStagingDataLocation + Path.SEPARATOR + orcStagingDataPartitionDirName; if (partitionsDMLInfo.size() > 0) { List<String> createStagingPartitionDDL = @@ -476,7 +470,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem Optional<Path> destPartitionLocation = getDestinationPartitionLocation(destinationTableMeta, workUnit, conversionEntity.getHivePartition().get().getName()); orcFinalDataPartitionLocation = - updatePartitionLocation(orcFinalDataPartitionLocation, workUnit, destPartitionLocation); + HiveConverterUtils.updatePartitionLocation(orcFinalDataPartitionLocation, workUnit, destPartitionLocation); log.info( "Partition directory to move: " + orcStagingDataPartitionLocation + " to: " + orcFinalDataPartitionLocation); publishDirectories.put(orcStagingDataPartitionLocation, orcFinalDataPartitionLocation); @@ -607,32 +601,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem return stagingTableNamePrefix + "_" + uniqueStagingTableQualifier; } - /*** - * Get the ORC partition directory name of the format: [hourly_][daily_]<partitionSpec1>[partitionSpec ..] - * @param conversionEntity Conversion entity. - * @param sourceDataPathIdentifier Hints to look in source partition location to prefix the partition dir name - * such as hourly or daily. - * @return Partition directory name. - */ - private String getOrcStagingDataPartitionDirName(QueryBasedHiveConversionEntity conversionEntity, - List<String> sourceDataPathIdentifier) { - - if (conversionEntity.getHivePartition().isPresent()) { - StringBuilder dirNamePrefix = new StringBuilder(); - String sourceHivePartitionLocation = conversionEntity.getHivePartition().get().getDataLocation().toString(); - if (null != sourceDataPathIdentifier && null != sourceHivePartitionLocation) { - for (String hint : sourceDataPathIdentifier) { - if (sourceHivePartitionLocation.toLowerCase().contains(hint.toLowerCase())) { - dirNamePrefix.append(hint.toLowerCase()).append("_"); - } - } - } - return dirNamePrefix + conversionEntity.getHivePartition().get().getName(); - } else { - return StringUtils.EMPTY; - } - } /*** * Get the ORC final table location of format: <ORC final table location>/final @@ -699,88 +668,6 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem return replacedPartitionsDDLInfo; } - private void populatePartitionInfo(QueryBasedHiveConversionEntity conversionEntity, Map<String, String> partitionsDDLInfo, - Map<String, String> partitionsDMLInfo) { - String partitionsInfoString = null; - String partitionsTypeString = null; - - if (conversionEntity.getHivePartition().isPresent()) { - partitionsInfoString = conversionEntity.getHivePartition().get().getName(); - partitionsTypeString = conversionEntity.getHivePartition().get().getSchema().getProperty("partition_columns.types"); - } - - if (StringUtils.isNotBlank(partitionsInfoString) || StringUtils.isNotBlank(partitionsTypeString)) { - if (StringUtils.isBlank(partitionsInfoString) || StringUtils.isBlank(partitionsTypeString)) { - throw new IllegalArgumentException("Both partitions info and partitions must be present, if one is specified"); - } - List<String> pInfo = Splitter.on(HIVE_PARTITIONS_INFO).omitEmptyStrings().trimResults().splitToList(partitionsInfoString); - List<String> pType = Splitter.on(HIVE_PARTITIONS_TYPE).omitEmptyStrings().trimResults().splitToList(partitionsTypeString); - log.debug("PartitionsInfoString: " + partitionsInfoString); - log.debug("PartitionsTypeString: " + partitionsTypeString); - - if (pInfo.size() != pType.size()) { - throw new IllegalArgumentException("partitions info and partitions type list should of same size"); - } - for (int i = 0; i < pInfo.size(); i++) { - List<String> partitionInfoParts = Splitter.on("=").omitEmptyStrings().trimResults().splitToList(pInfo.get(i)); - String partitionType = pType.get(i); - if (partitionInfoParts.size() != 2) { - throw new IllegalArgumentException( - String.format("Partition details should be of the format partitionName=partitionValue. Recieved: %s", pInfo.get(i))); - } - partitionsDDLInfo.put(partitionInfoParts.get(0), partitionType); - partitionsDMLInfo.put(partitionInfoParts.get(0), partitionInfoParts.get(1)); - } - } - } - - private Pair<Optional<Table>, Optional<List<Partition>>> getDestinationTableMeta(String dbName, - String tableName, WorkUnitState state) - throws DataConversionException { - - Optional<Table> table = Optional.<Table>absent(); - Optional<List<Partition>> partitions = Optional.<List<Partition>>absent(); - - try { - HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(state.getJobState().getProperties(), - Optional.fromNullable(state.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY))); - try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) { - table = Optional.of(client.get().getTable(dbName, tableName)); - if (table.isPresent()) { - org.apache.hadoop.hive.ql.metadata.Table qlTable = new org.apache.hadoop.hive.ql.metadata.Table(table.get()); - if (HiveUtils.isPartitioned(qlTable)) { - partitions = Optional.of(HiveUtils.getPartitions(client.get(), qlTable, Optional.<String>absent())); - } - } - } - } catch (NoSuchObjectException e) { - return ImmutablePair.of(table, partitions); - } catch (IOException | TException e) { - throw new DataConversionException("Could not fetch destination table metadata", e); - } - - return ImmutablePair.of(table, partitions); - } - - /** - * If partition already exists then new partition location will be a separate time stamp dir - * If partition location is /a/b/c/<oldTimeStamp> then new partition location is /a/b/c/<currentTimeStamp> - * If partition location is /a/b/c/ then new partition location is /a/b/c/<currentTimeStamp> - **/ - private String updatePartitionLocation(String orcDataPartitionLocation, WorkUnitState workUnitState, - Optional<Path> destPartitionLocation) - throws DataConversionException { - - if (workUnitState.getPropAsBoolean(HIVE_DATASET_PARTITION_OVERWRITE, DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE)) { - return orcDataPartitionLocation; - } - if (!destPartitionLocation.isPresent()) { - return orcDataPartitionLocation; - } - long timeStamp = System.currentTimeMillis(); - return StringUtils.join(Arrays.asList(orcDataPartitionLocation, timeStamp), '/'); - } - private Optional<Path> getDestinationPartitionLocation(Optional<Table> table, WorkUnitState state, String partitionName) throws DataConversionException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java index 21d8ab2..74a8b3b 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java @@ -427,7 +427,7 @@ public class HiveSource implements Source { public void shutdown(SourceState state) { } - private static FileSystem getSourceFs(State state) throws IOException { + public static FileSystem getSourceFs(State state) throws IOException { if (state.contains(HIVE_SOURCE_FS_URI)) { return FileSystem.get(URI.create(state.getProp(HIVE_SOURCE_FS_URI)), HadoopUtils.getConfFromState(state)); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java new file mode 100644 index 0000000..119ef1c --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.task; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import static java.util.stream.Collectors.joining; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.thrift.TException; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiveConversionEntity; +import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; +import org.apache.gobblin.data.management.copy.hive.HiveUtils; +import org.apache.gobblin.hive.HiveMetastoreClientPool; +import org.apache.gobblin.util.AutoReturnableObject; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j + +/** + * A utility class for converting hive data from one dataset to another. + */ +public class HiveConverterUtils { + + /*** + * Subdirectory within destination table directory to publish data + */ + private static final String PUBLISHED_TABLE_SUBDIRECTORY = "final"; + + /*** + * Separators used by Hive + */ + private static final String HIVE_PARTITIONS_INFO = "/"; + private static final String HIVE_PARTITIONS_TYPE = ":"; + + /** + * If the property is set to true then partition dir is overwritten, + * else a new time-stamped partition dir is created to avoid breaking in-flight queries + * Check org.apache.gobblin.data.management.retention.Avro2OrcStaleDatasetCleaner to clean stale directories + */ + public static final String HIVE_DATASET_PARTITION_OVERWRITE = "hive.dataset.partition.overwrite"; + public static final boolean DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE = true; + + /** + * If the property is set to true then in the destination dir permissions, group won't be explicitly set. + */ + public static final String HIVE_DATASET_DESTINATION_SKIP_SETGROUP = "hive.dataset.destination.skip.setGroup"; + public static final boolean DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP = false; + + public static String getStagingTableName(String stagingTableNamePrefix) { + int randomNumber = new Random().nextInt(100); + String uniqueStagingTableQualifier = String.format("%s%s", System.currentTimeMillis(), randomNumber); + + return stagingTableNamePrefix + "_" + uniqueStagingTableQualifier; + } + + /*** + * Get the final table location of format: <final table location>/final + * @return final table location. + */ + public static String getOutputDataLocation(String outputDataLocation) { + return outputDataLocation + Path.SEPARATOR + PUBLISHED_TABLE_SUBDIRECTORY; + } + + /*** + * Get the staging table location of format: <final table location>/<staging table name> + * @param outputDataLocation output table data lcoation. + * @return staging table location. + */ + public static String getStagingDataLocation(String outputDataLocation, String stagingTableName) { + return outputDataLocation + Path.SEPARATOR + stagingTableName; + } + + /*** + * Generate DDL query to create a duplicate Hive table + * @param inputDbName source DB name + * @param inputTblName source table name + * @param tblName New Hive table name + * @param tblLocation New hive table location + * @param optionalDbName Optional DB name, if not specified it defaults to 'default' + */ + public static String generateCreateDuplicateTableDDL( + String inputDbName, + String inputTblName, + String tblName, + String tblLocation, + Optional<String> optionalDbName) { + + Preconditions.checkArgument(StringUtils.isNotBlank(tblName)); + Preconditions.checkArgument(StringUtils.isNotBlank(tblLocation)); + + String dbName = optionalDbName.isPresent() ? optionalDbName.get() : "default"; + + return String.format("CREATE EXTERNAL TABLE IF NOT EXISTS `%s`.`%s` LIKE `%s`.`%s` LOCATION %n '%s' %n", + dbName, tblName, inputDbName, inputTblName, tblLocation); + } + + /** + * Fills data from input table into output table. + * @param inputTblName input hive table name + * @param outputTblName output hive table name + * @param inputDbName input hive database name + * @param outputDbName output hive database name + * @param optionalPartitionDMLInfo input hive table's partition's name and value + * @return Hive query string + */ + public static String generateTableCopy( + String inputTblName, + String outputTblName, + String inputDbName, + String outputDbName, + Optional<Map<String, String>> optionalPartitionDMLInfo) { + Preconditions.checkArgument(StringUtils.isNotBlank(inputTblName)); + Preconditions.checkArgument(StringUtils.isNotBlank(outputTblName)); + Preconditions.checkArgument(StringUtils.isNotBlank(inputDbName)); + Preconditions.checkArgument(StringUtils.isNotBlank(outputDbName)); + + StringBuilder dmlQuery = new StringBuilder(); + + // Insert query + dmlQuery.append(String.format("INSERT OVERWRITE TABLE `%s`.`%s` %n", outputDbName, outputTblName)); + + // Partition details + dmlQuery.append(partitionKeyValues(optionalPartitionDMLInfo)); + + dmlQuery.append(String.format("SELECT * FROM `%s`.`%s`", inputDbName, inputTblName)); + if (optionalPartitionDMLInfo.isPresent()) { + if (optionalPartitionDMLInfo.get().size() > 0) { + dmlQuery.append(" WHERE "); + String partitionsAndValues = optionalPartitionDMLInfo.get().entrySet().stream() + .map(e -> "`" + e.getKey() + "`='" + e.getValue() + "'") + .collect(joining(" AND ")); + dmlQuery.append(partitionsAndValues); + } + } + + return dmlQuery.toString(); + } + + protected static StringBuilder partitionKeyValues(Optional<Map<String, String>> optionalPartitionDMLInfo) { + if (!optionalPartitionDMLInfo.isPresent()) { + return new StringBuilder(); + } else { + return new StringBuilder("PARTITION (").append(Joiner.on(", ") + .join(optionalPartitionDMLInfo.get().entrySet().stream().map(e -> "`" + e.getKey() + "`").iterator())).append(") \n"); + } + } + + /** + * It fills partitionsDDLInfo and partitionsDMLInfo with the partition information + * @param conversionEntity conversion entity to + * @param partitionsDDLInfo partition type information, to be filled by this method + * @param partitionsDMLInfo partition key-value pair, to be filled by this method + */ + public static void populatePartitionInfo(QueryBasedHiveConversionEntity conversionEntity, Map<String, String> partitionsDDLInfo, + Map<String, String> partitionsDMLInfo) { + + String partitionsInfoString = null; + String partitionsTypeString = null; + + if (conversionEntity.getHivePartition().isPresent()) { + partitionsInfoString = conversionEntity.getHivePartition().get().getName(); + partitionsTypeString = conversionEntity.getHivePartition().get().getSchema().getProperty("partition_columns.types"); + } + + if (StringUtils.isNotBlank(partitionsInfoString) || StringUtils.isNotBlank(partitionsTypeString)) { + if (StringUtils.isBlank(partitionsInfoString) || StringUtils.isBlank(partitionsTypeString)) { + throw new IllegalArgumentException("Both partitions info and partitions must be present, if one is specified"); + } + List<String> pInfo = Splitter.on(HIVE_PARTITIONS_INFO).omitEmptyStrings().trimResults().splitToList(partitionsInfoString); + List<String> pType = Splitter.on(HIVE_PARTITIONS_TYPE).omitEmptyStrings().trimResults().splitToList(partitionsTypeString); + log.debug("PartitionsInfoString: " + partitionsInfoString); + log.debug("PartitionsTypeString: " + partitionsTypeString); + + if (pInfo.size() != pType.size()) { + throw new IllegalArgumentException("partitions info and partitions type list should of same size"); + } + for (int i = 0; i < pInfo.size(); i++) { + List<String> partitionInfoParts = Splitter.on("=").omitEmptyStrings().trimResults().splitToList(pInfo.get(i)); + String partitionType = pType.get(i); + if (partitionInfoParts.size() != 2) { + throw new IllegalArgumentException( + String.format("Partition details should be of the format partitionName=partitionValue. Recieved: %s", pInfo.get(i))); + } + partitionsDDLInfo.put(partitionInfoParts.get(0), partitionType); + partitionsDMLInfo.put(partitionInfoParts.get(0), partitionInfoParts.get(1)); + } + } + } + + /** + * Creates a staging directory with the permission as in source directory. + * @param fs filesystem object + * @param destination staging directory location + * @param conversionEntity conversion entity used to get source directory permissions + * @param workUnit workunit + */ + public static void createStagingDirectory(FileSystem fs, String destination, QueryBasedHiveConversionEntity conversionEntity, + WorkUnitState workUnit) { + /* + * Create staging data location with the same permissions as source data location + * + * Note that hive can also automatically create the non-existing directories but it does not + * seem to create it with the desired permissions. + * According to hive docs permissions for newly created directories/files can be controlled using uMask like, + * + * SET hive.warehouse.subdir.inherit.perms=false; + * SET fs.permissions.umask-mode=022; + * Upon testing, this did not work + */ + Path destinationPath = new Path(destination); + try { + FileStatus sourceDataFileStatus = fs.getFileStatus(conversionEntity.getHiveTable().getDataLocation()); + FsPermission sourceDataPermission = sourceDataFileStatus.getPermission(); + if (!fs.mkdirs(destinationPath, sourceDataPermission)) { + throw new RuntimeException(String.format("Failed to create path %s with permissions %s", + destinationPath, sourceDataPermission)); + } else { + fs.setPermission(destinationPath, sourceDataPermission); + // Set the same group as source + if (!workUnit.getPropAsBoolean(HIVE_DATASET_DESTINATION_SKIP_SETGROUP, DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP)) { + fs.setOwner(destinationPath, null, sourceDataFileStatus.getGroup()); + } + log.info(String.format("Created %s with permissions %s and group %s", destinationPath, sourceDataPermission, sourceDataFileStatus.getGroup())); + } + } catch (IOException e) { + Throwables.propagate(e); + } + } + + /*** + * Get the partition directory name of the format: [hourly_][daily_]<partitionSpec1>[partitionSpec ..] + * @param conversionEntity Conversion entity. + * @param sourceDataPathIdentifier Hints to look in source partition location to prefix the partition dir name + * such as hourly or daily. + * @return Partition directory name. + */ + public static String getStagingDataPartitionDirName(QueryBasedHiveConversionEntity conversionEntity, + List<String> sourceDataPathIdentifier) { + + if (conversionEntity.getHivePartition().isPresent()) { + StringBuilder dirNamePrefix = new StringBuilder(); + String sourceHivePartitionLocation = conversionEntity.getHivePartition().get().getDataLocation().toString(); + if (null != sourceDataPathIdentifier && null != sourceHivePartitionLocation) { + for (String hint : sourceDataPathIdentifier) { + if (sourceHivePartitionLocation.toLowerCase().contains(hint.toLowerCase())) { + dirNamePrefix.append(hint.toLowerCase()).append("_"); + } + } + } + + return dirNamePrefix + conversionEntity.getHivePartition().get().getName(); + } else { + return StringUtils.EMPTY; + } + } + + /** + * Returns the partition data location of a given table and partition + * @param table Hive table + * @param state workunit state + * @param partitionName partition name + * @return partition data location + * @throws DataConversionException + */ + public static Optional<Path> getDestinationPartitionLocation(Optional<Table> table, WorkUnitState state, + String partitionName) + throws DataConversionException { + Optional<org.apache.hadoop.hive.metastore.api.Partition> partitionOptional; + if (!table.isPresent()) { + return Optional.absent(); + } + try { + HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(state.getJobState().getProperties(), + Optional.fromNullable(state.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY))); + try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) { + partitionOptional = + Optional.of(client.get().getPartition(table.get().getDbName(), table.get().getTableName(), partitionName)); + } catch (NoSuchObjectException e) { + return Optional.absent(); + } + if (partitionOptional.isPresent()) { + org.apache.hadoop.hive.ql.metadata.Table qlTable = new org.apache.hadoop.hive.ql.metadata.Table(table.get()); + Partition qlPartition = + new Partition(qlTable, partitionOptional.get()); + return Optional.of(qlPartition.getDataLocation()); + } + } catch (IOException | TException | HiveException e) { + throw new DataConversionException("Could not fetch destination table metadata", e); + } + return Optional.absent(); + } + + /** + * If partition already exists then new partition location will be a separate time stamp dir + * If partition location is /a/b/c/<oldTimeStamp> then new partition location is /a/b/c/<currentTimeStamp> + * If partition location is /a/b/c/ then new partition location is /a/b/c/<currentTimeStamp> + **/ + public static String updatePartitionLocation(String outputDataPartitionLocation, WorkUnitState workUnitState, + Optional<Path> destPartitionLocation) + throws DataConversionException { + + if (workUnitState.getPropAsBoolean(HIVE_DATASET_PARTITION_OVERWRITE, DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE)) { + return outputDataPartitionLocation; + } + if (!destPartitionLocation.isPresent()) { + return outputDataPartitionLocation; + } + long timeStamp = System.currentTimeMillis(); + return StringUtils.join(Arrays.asList(outputDataPartitionLocation, timeStamp), '/'); + } + + /** + * Returns a pair of Hive table and its partitions + * @param dbName db name + * @param tableName table name + * @param props properties + * @return a pair of Hive table and its partitions + * @throws DataConversionException + */ + public static Pair<Optional<Table>, Optional<List<Partition>>> getDestinationTableMeta(String dbName, + String tableName, Properties props) { + + Optional<Table> table = Optional.<Table>absent(); + Optional<List<Partition>> partitions = Optional.<List<Partition>>absent(); + + try { + HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(props, + Optional.fromNullable(props.getProperty(HiveDatasetFinder.HIVE_METASTORE_URI_KEY))); + try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) { + table = Optional.of(client.get().getTable(dbName, tableName)); + if (table.isPresent()) { + org.apache.hadoop.hive.ql.metadata.Table qlTable = new org.apache.hadoop.hive.ql.metadata.Table(table.get()); + if (HiveUtils.isPartitioned(qlTable)) { + partitions = Optional.of(HiveUtils.getPartitions(client.get(), qlTable, Optional.<String>absent())); + } + } + } + } catch (NoSuchObjectException e) { + return ImmutablePair.of(table, partitions); + } catch (IOException | TException e) { + throw new RuntimeException("Could not fetch destination table metadata", e); + } + + return ImmutablePair.of(table, partitions); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java new file mode 100644 index 0000000..2e370c0 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.task; + +import java.util.List; + +import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; +import org.apache.gobblin.runtime.TaskContext; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j + +/** + * A simple {@link HiveTask} for Hive view materialization. + */ +public class HiveMaterializer extends HiveTask { + + private final QueryGenerator queryGenerator; + + public HiveMaterializer(TaskContext taskContext) throws Exception { + super(taskContext); + this.queryGenerator = new HiveMaterializerQueryGenerator(this.workUnitState); + if (!(workUnit.getHiveDataset() instanceof ConvertibleHiveDataset)) { + throw new IllegalStateException("HiveConvertExtractor is only compatible with ConvertibleHiveDataset"); + } + } + + @Override + public List<String> generateHiveQueries() { + return queryGenerator.generateQueries(); + } + + @Override + public QueryBasedHivePublishEntity generatePublishQueries() throws Exception { + return queryGenerator.generatePublishQueries(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java new file mode 100644 index 0000000..4eee0e0 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.task; + +import java.util.Map; +import java.util.List; + +import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter; +import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Table; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.data.management.conversion.hive.avro.AvroSchemaManager; +import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiveConversionEntity; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; +import org.apache.gobblin.data.management.conversion.hive.entities.SchemaAwareHivePartition; +import org.apache.gobblin.data.management.conversion.hive.entities.SchemaAwareHiveTable; +import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils; +import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator; +import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; +import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; +import org.apache.gobblin.hive.HiveMetastoreClientPool; +import org.apache.gobblin.util.AutoReturnableObject; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +/** + * A simple query generator for {@link HiveMaterializer}. + */ +public class HiveMaterializerQueryGenerator implements QueryGenerator { + private final FileSystem fs; + private final ConvertibleHiveDataset.ConversionConfig conversionConfig; + private final ConvertibleHiveDataset hiveDataset; + private final String inputDbName; + private final String inputTableName; + private final String outputDatabaseName; + private final String outputTableName; + private final String outputDataLocation; + private final String stagingTableName; + private final String stagingDataLocation; + private final List<String> sourceDataPathIdentifier; + private final String stagingDataPartitionDirName; + private final String stagingDataPartitionLocation; + private final Map<String, String> partitionsDDLInfo; + private final Map<String, String> partitionsDMLInfo; + private final Optional<Table> destinationTableMeta; + private final HiveWorkUnit workUnit; + private final HiveMetastoreClientPool pool; + private final QueryBasedHiveConversionEntity conversionEntity; + private final WorkUnitState workUnitState; + + public HiveMaterializerQueryGenerator(WorkUnitState workUnitState) throws Exception { + this.workUnitState = workUnitState; + this.workUnit = new HiveWorkUnit(workUnitState.getWorkunit()); + this.hiveDataset = (ConvertibleHiveDataset) workUnit.getHiveDataset(); + this.inputDbName = hiveDataset.getDbAndTable().getDb(); + this.inputTableName = hiveDataset.getDbAndTable().getTable(); + this.fs = HiveSource.getSourceFs(workUnitState); + this.conversionConfig = hiveDataset.getConversionConfigForFormat("sameAsSource").get(); + this.outputDatabaseName = conversionConfig.getDestinationDbName(); + this.outputTableName = conversionConfig.getDestinationTableName(); + this.outputDataLocation = HiveConverterUtils.getOutputDataLocation(conversionConfig.getDestinationDataPath()); + this.stagingTableName = HiveConverterUtils.getStagingTableName(conversionConfig.getDestinationStagingTableName()); + this.stagingDataLocation = HiveConverterUtils.getStagingDataLocation(conversionConfig.getDestinationDataPath(), stagingTableName); + this.sourceDataPathIdentifier = conversionConfig.getSourceDataPathIdentifier(); + this.pool = HiveMetastoreClientPool.get(workUnitState.getJobState().getProperties(), + Optional.fromNullable(workUnitState.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY))); + this.conversionEntity = getConversionEntity(); + this.stagingDataPartitionDirName = HiveConverterUtils.getStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier); + this.stagingDataPartitionLocation = stagingDataLocation + Path.SEPARATOR + stagingDataPartitionDirName; + this.partitionsDDLInfo = Maps.newHashMap(); + this.partitionsDMLInfo = Maps.newHashMap(); + HiveConverterUtils.populatePartitionInfo(conversionEntity, partitionsDDLInfo, partitionsDMLInfo); + this.destinationTableMeta = HiveConverterUtils.getDestinationTableMeta(outputDatabaseName, + outputTableName, workUnitState.getProperties()).getLeft(); + } + + /** + * Returns hive queries to be run as a part of a hive task. + * This does not include publish queries. + * @return + */ + @Override + public List<String> generateQueries() { + + List<String> hiveQueries = Lists.newArrayList(); + + Preconditions.checkNotNull(this.workUnit, "Workunit must not be null"); + EventWorkunitUtils.setBeginDDLBuildTimeMetadata(this.workUnit, System.currentTimeMillis()); + + HiveConverterUtils.createStagingDirectory(fs, conversionConfig.getDestinationDataPath(), + conversionEntity, this.workUnitState); + + // Create DDL statement for table + String createStagingTableDDL = + HiveConverterUtils.generateCreateDuplicateTableDDL( + inputDbName, + inputTableName, + stagingTableName, + stagingDataLocation, + Optional.of(outputDatabaseName)); + hiveQueries.add(createStagingTableDDL); + log.debug("Create staging table DDL:\n" + createStagingTableDDL); + + /* + * Setting partition mode to 'nonstrict' is needed to improve readability of the code. + * If we do not set dynamic partition mode to nonstrict, we will have to write partition values also, + * and because hive considers partition as a virtual column, we also have to write each of the column + * name in the query (in place of *) to match source and target columns. + */ + hiveQueries.add("SET hive.exec.dynamic.partition.mode=nonstrict"); + + String insertInStagingTableDML = + HiveConverterUtils + .generateTableCopy( + inputTableName, + stagingTableName, + conversionEntity.getHiveTable().getDbName(), + outputDatabaseName, + Optional.of(partitionsDMLInfo)); + hiveQueries.add(insertInStagingTableDML); + log.debug("Conversion staging DML: " + insertInStagingTableDML); + + log.info("Conversion Queries {}\n", hiveQueries); + + EventWorkunitUtils.setEndDDLBuildTimeMetadata(workUnit, System.currentTimeMillis()); + return hiveQueries; + } + + /** + * Retuens a QueryBasedHivePublishEntity which includes publish level queries and cleanup commands. + * @return QueryBasedHivePublishEntity + * @throws DataConversionException + */ + public QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException { + + QueryBasedHivePublishEntity publishEntity = new QueryBasedHivePublishEntity(); + List<String> publishQueries = publishEntity.getPublishQueries(); + Map<String, String> publishDirectories = publishEntity.getPublishDirectories(); + List<String> cleanupQueries = publishEntity.getCleanupQueries(); + List<String> cleanupDirectories = publishEntity.getCleanupDirectories(); + + String createFinalTableDDL = + HiveConverterUtils.generateCreateDuplicateTableDDL(inputDbName, inputTableName, outputTableName, + outputDataLocation, Optional.of(outputDatabaseName)); + publishQueries.add(createFinalTableDDL); + log.debug("Create final table DDL:\n" + createFinalTableDDL); + + if (partitionsDDLInfo.size() == 0) { + log.debug("Snapshot directory to move: " + stagingDataLocation + " to: " + outputDataLocation); + publishDirectories.put(stagingDataLocation, outputDataLocation); + + String dropStagingTableDDL = HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName); + + log.debug("Drop staging table DDL: " + dropStagingTableDDL); + cleanupQueries.add(dropStagingTableDDL); + + log.debug("Staging table directory to delete: " + stagingDataLocation); + cleanupDirectories.add(stagingDataLocation); + } else { + String finalDataPartitionLocation = outputDataLocation + Path.SEPARATOR + stagingDataPartitionDirName; + Optional<Path> destPartitionLocation = + HiveConverterUtils.getDestinationPartitionLocation(destinationTableMeta, this.workUnitState, + conversionEntity.getHivePartition().get().getName()); + finalDataPartitionLocation = HiveConverterUtils.updatePartitionLocation(finalDataPartitionLocation, this.workUnitState, + destPartitionLocation); + + log.debug("Partition directory to move: " + stagingDataPartitionLocation + " to: " + finalDataPartitionLocation); + publishDirectories.put(stagingDataPartitionLocation, finalDataPartitionLocation); + List<String> dropPartitionsDDL = + HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName, partitionsDMLInfo); + log.debug("Drop partitions if exist in final table: " + dropPartitionsDDL); + publishQueries.addAll(dropPartitionsDDL); + List<String> createFinalPartitionDDL = + HiveAvroORCQueryGenerator.generateCreatePartitionDDL(outputDatabaseName, outputTableName, + finalDataPartitionLocation, partitionsDMLInfo, Optional.<String>absent()); + + log.debug("Create final partition DDL: " + createFinalPartitionDDL); + publishQueries.addAll(createFinalPartitionDDL); + + String dropStagingTableDDL = + HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName); + + log.debug("Drop staging table DDL: " + dropStagingTableDDL); + cleanupQueries.add(dropStagingTableDDL); + + log.debug("Staging table directory to delete: " + stagingDataLocation); + cleanupDirectories.add(stagingDataLocation); + } + + publishQueries.addAll(HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName, + AbstractAvroToOrcConverter.getDropPartitionsDDLInfo(conversionEntity))); + + log.info("Publish partition entity: " + publishEntity); + return publishEntity; + } + + private QueryBasedHiveConversionEntity getConversionEntity() throws Exception { + + + try (AutoReturnableObject<IMetaStoreClient> client = this.pool.getClient()) { + + Table table = client.get().getTable(this.inputDbName, this.inputTableName); + + SchemaAwareHiveTable schemaAwareHiveTable = new SchemaAwareHiveTable(table, AvroSchemaManager.getSchemaFromUrl(workUnit.getTableSchemaUrl(), fs)); + + SchemaAwareHivePartition schemaAwareHivePartition = null; + + if (workUnit.getPartitionName().isPresent() && workUnit.getPartitionSchemaUrl().isPresent()) { + org.apache.hadoop.hive.metastore.api.Partition + partition = client.get().getPartition(this.inputDbName, this.inputTableName, workUnit.getPartitionName().get()); + schemaAwareHivePartition = + new SchemaAwareHivePartition(table, partition, AvroSchemaManager.getSchemaFromUrl(workUnit.getPartitionSchemaUrl().get(), fs)); + } + return new QueryBasedHiveConversionEntity(this.hiveDataset, schemaAwareHiveTable, Optional.fromNullable(schemaAwareHivePartition)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java new file mode 100644 index 0000000..d1c9371 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.task; + +import java.util.List; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder; +import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; +import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker; +import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; +import org.apache.gobblin.runtime.task.TaskUtils; +import org.apache.gobblin.source.workunit.WorkUnit; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j + +/** + * A simple HiveSource for {@link HiveMaterializer}. + */ +public class HiveMaterializerSource extends HiveSource { + + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) { + state.setProp(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY, ConvertibleHiveDatasetFinder.class.getName()); + } + if (!state.contains(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY)) { + state.setProp(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, "hive.conversion.avro"); + } + + List<WorkUnit> workUnits = super.getWorkunits(state); + + for(WorkUnit workUnit : workUnits) { + if (Boolean.valueOf(workUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY))) { + log.info("Ignoring Watermark workunit for {}", workUnit.getProp(ConfigurationKeys.DATASET_URN_KEY)); + continue; + } + TaskUtils.setTaskFactoryClass(workUnit, HiveMaterializerTaskFactory.class); + } + return workUnits; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java new file mode 100644 index 0000000..c05d4bf --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.task; + +import org.apache.gobblin.publisher.DataPublisher; +import org.apache.gobblin.publisher.NoopPublisher; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.runtime.task.TaskFactory; +import org.apache.gobblin.runtime.task.TaskIFace; + +/** + * A {@link TaskFactory} that runs a {@link HiveMaterializer} task. + * This factory is intended to publish data in the task directly, and + * uses a {@link NoopPublisher}. + */ +public class HiveMaterializerTaskFactory implements TaskFactory { + @Override + public TaskIFace createTask(TaskContext taskContext) { + try { + return new HiveMaterializer(taskContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public DataPublisher createDataPublisher(JobState.DatasetState datasetState) { + return new NoopPublisher(datasetState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java new file mode 100644 index 0000000..aabbb6e --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.task; + +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; +import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; +import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; +import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker; +import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarkerFactory; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.runtime.task.BaseAbstractTask; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.HiveJdbcConnector; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j + +/** + * An abstract Task that runs a hive job. + * it runs hive queries. + * Implementation classes should implement abstract methods generateHiveQueries() and generatePublishQueries() + * which creates extract/write level queries and publish level queries respectively. + */ +public abstract class HiveTask extends BaseAbstractTask { + protected final TaskContext taskContext; + protected final WorkUnitState workUnitState; + protected final HiveWorkUnit workUnit; + protected final EventSubmitter eventSubmitter; + protected final List<String> hiveExecutionQueries; + protected final QueryBasedHivePublishEntity publishEntity; + protected final HiveJdbcConnector hiveJdbcConnector; + + public HiveTask(TaskContext taskContext) { + super(taskContext); + this.taskContext = taskContext; + this.workUnitState = taskContext.getTaskState(); + this.workUnit = new HiveWorkUnit(this.workUnitState.getWorkunit()); + this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.HiveTask") + .build(); + this.hiveExecutionQueries = Lists.newArrayList(); + this.publishEntity = new QueryBasedHivePublishEntity(); + try { + this.hiveJdbcConnector = HiveJdbcConnector.newConnectorWithProps(this.workUnitState.getProperties()); + } catch (SQLException se) { + throw new RuntimeException("Error in creating JDBC Connector", se); + } + } + + /** + * Generate hive queries to extract data + * @return list of hive queries + * @throws Exception + */ + public abstract List<String> generateHiveQueries() throws Exception; + + /** + * Generate publish and cleanup queries for hive datasets/partitions + * @return QueryBasedHivePublishEntity having cleanup and publish queries + * @throws Exception + */ + public abstract QueryBasedHivePublishEntity generatePublishQueries() throws Exception; + + protected void executePublishQueries(QueryBasedHivePublishEntity publishEntity) { + Set<String> cleanUpQueries = Sets.newLinkedHashSet(); + Set<String> publishQueries = Sets.newLinkedHashSet(); + List<String> directoriesToDelete = Lists.newArrayList(); + FileSystem fs = null; + + try { + fs = HiveSource.getSourceFs(workUnitState); + + if (publishEntity.getCleanupQueries() != null) { + cleanUpQueries.addAll(publishEntity.getCleanupQueries()); + } + + if (publishEntity.getCleanupDirectories() != null) { + directoriesToDelete.addAll(publishEntity.getCleanupDirectories()); + } + + if (publishEntity.getPublishDirectories() != null) { + // Publish snapshot / partition directories + Map<String, String> publishDirectories = publishEntity.getPublishDirectories(); + try { + for (Map.Entry<String, String> publishDir : publishDirectories.entrySet()) { + HadoopUtils.renamePath(fs, new Path(publishDir.getKey()), new Path(publishDir.getValue()), true); + } + } catch (RuntimeException re) { + throw re; + } + catch (Exception e) { + log.error("error in move dir"); + } + } + + if (publishEntity.getPublishQueries() != null) { + publishQueries.addAll(publishEntity.getPublishQueries()); + } + + WorkUnitState wus = this.workUnitState; + + this.hiveJdbcConnector.executeStatements(publishQueries.toArray(new String[publishQueries.size()])); + + wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + + HiveSourceWatermarker watermarker = GobblinConstructorUtils.invokeConstructor( + HiveSourceWatermarkerFactory.class, wus.getProp(HiveSource.HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY, + HiveSource.DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS)).createFromState(wus); + + watermarker.setActualHighWatermark(wus); + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + log.error("Error in HiveMaterializer generate publish queries", e); + } finally { + try { + this.hiveJdbcConnector.executeStatements(cleanUpQueries.toArray(new String[cleanUpQueries.size()])); + HadoopUtils.deleteDirectories(fs, directoriesToDelete, true, true); + } catch(RuntimeException re) { + throw re; + } catch (Exception e) { + log.error("Failed to cleanup staging entities.", e); + } + } + } + + @Override + public void run() { + try { + List<String> queries = generateHiveQueries(); + this.hiveJdbcConnector.executeStatements(queries.toArray(new String[queries.size()])); + super.run(); + } catch (Exception e) { + this.workingState = WorkUnitState.WorkingState.FAILED; + log.error("Exception in HiveTask generateHiveQueries ", e); + } + } + + @Override + public void commit() { + try { + executePublishQueries(generatePublishQueries()); + super.commit(); + } catch (Exception e) { + this.workingState = WorkUnitState.WorkingState.FAILED; + log.error("Exception in HiveTask generate publish HiveQueries ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java new file mode 100644 index 0000000..1502b06 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.task; + +import java.util.List; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; + + +/** + * An interface for generating queries. + */ +interface QueryGenerator { + + /** + * Generates queries to extract/convert/write data + * @return list of queries + */ + List<String> generateQueries(); + + /** + * Generates queries for publish data + * @return QueryBasedHivePublishEntity containing cleanup and publish queries + * @throws DataConversionException + */ + QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java new file mode 100644 index 0000000..a2a3dbe --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.task; + +import java.util.Map; + +import org.junit.Test; +import org.testng.Assert; +import org.testng.collections.Maps; + +import com.google.common.base.Optional; + +public class HiveConverterUtilsTest { + private final String inputDbName = "testdb"; + private final String inputTableName = "testtable"; + private final String outputDatabaseName = "testdb2"; + private final String outputTableName = "testtable2"; + + @Test + public void copyTableQueryTest() throws Exception { + Map<String, String> partitionsDMLInfo = Maps.newHashMap(); + String partitionName = "datepartition"; + String partitionValue = "2017-07-15-08"; + + partitionsDMLInfo.put(partitionName, partitionValue); + String expectedQuery = "INSERT OVERWRITE TABLE `" + outputDatabaseName + "`.`" + outputTableName + "` \n" + + "PARTITION (`" + partitionName + "`) \n" + "SELECT * FROM `" + inputDbName + "`.`" + inputTableName + "` WHERE " + + "`" + partitionName + "`='" + partitionsDMLInfo.get(partitionName) + "'"; + + String actualQuery = HiveConverterUtils.generateTableCopy(inputTableName, + outputTableName, inputDbName, outputDatabaseName, Optional.of(partitionsDMLInfo)); + Assert.assertEquals(expectedQuery, actualQuery); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java index 8bf2c21..9222d48 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java @@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.conversion.hive.validation; import org.apache.gobblin.config.client.ConfigClient; import org.apache.gobblin.config.client.api.VersionStabilityPolicy; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; import org.apache.gobblin.util.PathUtils; import java.io.IOException; import java.io.InputStreamReader; @@ -45,18 +46,15 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.log4j.Logger; -import org.apache.thrift.TException; import org.joda.time.DateTime; import org.slf4j.LoggerFactory; @@ -374,7 +372,7 @@ public class ValidationJob extends AbstractJob { String orcTableName = conversionConfig.getDestinationTableName(); String orcTableDatabase = conversionConfig.getDestinationDbName(); Pair<Optional<org.apache.hadoop.hive.metastore.api.Table>, Optional<List<Partition>>> destinationMeta = - getDestinationTableMeta(orcTableDatabase, orcTableName, this.props); + HiveConverterUtils.getDestinationTableMeta(orcTableDatabase, orcTableName, this.props); // Generate validation queries final List<String> validationQueries = @@ -433,7 +431,7 @@ public class ValidationJob extends AbstractJob { String orcTableName = conversionConfig.getDestinationTableName(); String orcTableDatabase = conversionConfig.getDestinationDbName(); Pair<Optional<org.apache.hadoop.hive.metastore.api.Table>, Optional<List<Partition>>> destinationMeta = - getDestinationTableMeta(orcTableDatabase, orcTableName, this.props); + HiveConverterUtils.getDestinationTableMeta(orcTableDatabase, orcTableName, this.props); // Validate each partition for (final Partition sourcePartition : sourcePartitions) { @@ -704,31 +702,6 @@ public class ValidationJob extends AbstractJob { DateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); return dateFormat.parse(dateString).getTime(); } - - private Pair<Optional<org.apache.hadoop.hive.metastore.api.Table>, Optional<List<Partition>>> getDestinationTableMeta(String dbName, String tableName, - Properties props) { - - Optional<org.apache.hadoop.hive.metastore.api.Table> table = Optional.absent(); - Optional<List<Partition>> partitions = Optional.absent(); - - try { - try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) { - table = Optional.of(client.get().getTable(dbName, tableName)); - if (table.isPresent()) { - org.apache.hadoop.hive.ql.metadata.Table qlTable = new org.apache.hadoop.hive.ql.metadata.Table(table.get()); - if (HiveUtils.isPartitioned(qlTable)) { - partitions = Optional.of(HiveUtils.getPartitions(client.get(), qlTable, Optional.<String> absent())); - } - } - } - } catch (NoSuchObjectException e) { - return ImmutablePair.of(table, partitions); - } catch (IOException | TException e) { - throw new RuntimeException("Could not fetch destination table metadata", e); - } - - return ImmutablePair.of(table, partitions); - } } enum ValidationType { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java index 8d186a6..27ec5cd 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; @@ -147,6 +148,20 @@ public class HadoopUtils { } /** + * Calls deletePath() on each directory in the given list of directories to delete. + * If moveToTrash is set, it will be moved to trash according to the file system trash policy. + */ + public static void deleteDirectories(FileSystem fs, List<String> directoriesToDelete, boolean recursive, boolean moveToTrash) throws IOException { + for (String directory : directoriesToDelete) { + if (moveToTrash) { + moveToTrash(fs, new Path(directory)); + } else { + deletePath(fs, new Path(directory), recursive); + } + } + } + + /** * A wrapper around {@link FileSystem#delete(Path, boolean)} that only deletes a given {@link Path} if it is present * on the given {@link FileSystem}. */ @@ -170,6 +185,16 @@ public class HadoopUtils { } /** + * Moves the object to the filesystem trash according to the file system policy. + * @param fs FileSystem object + * @param path Path to the object to be moved to trash. + * @throws IOException + */ + public static void moveToTrash(FileSystem fs, Path path) throws IOException { + Trash trash = new Trash(fs, new Configuration()); + trash.moveToTrash(path); + } + /** * Renames a src {@link Path} on fs {@link FileSystem} to a dst {@link Path}. If fs is a {@link LocalFileSystem} and * src is a directory then {@link File#renameTo} is called directly to avoid a directory rename race condition where * {@link org.apache.hadoop.fs.RawLocalFileSystem#rename} copies the conflicting src directory into dst resulting in @@ -212,10 +237,7 @@ public class HadoopUtils { } if (fs.exists(newName)) { if (overwrite) { - if (!fs.delete(newName, true)) { - throw new IOException( - String.format("Failed to delete %s while renaming %s to %s", newName, oldName, newName)); - } + HadoopUtils.moveToTrash(fs, newName); } else { throw new FileAlreadyExistsException( String.format("Failed to rename %s to %s: dst already exists", oldName, newName)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java index 768dbdd..4dc3ba6 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java @@ -29,6 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.fs.TrashPolicy; import org.apache.hadoop.fs.permission.FsPermission; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -281,4 +283,23 @@ public class HadoopUtilsTest { Assert.assertNotNull(configuration.get(entry.getKey())); //Verify key with child path exist as decryption is unit tested in ConfigUtil. } } + + @Test + public void testMoveToTrash() throws IOException { + Path hadoopUtilsTestDir = new Path(Files.createTempDir().getAbsolutePath(), "HadoopUtilsTestDir"); + Configuration conf = new Configuration(); + // Set the time to keep it in trash to 10 minutes. + // 0 means object will be deleted instantly. + conf.set("fs.trash.interval", "10"); + FileSystem fs = FileSystem.getLocal(conf); + Trash trash = new Trash(fs, conf); + TrashPolicy trashPolicy = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory()); + Path trashPath = trashPolicy.getCurrentTrashDir(); + + fs.mkdirs(hadoopUtilsTestDir); + Assert.assertTrue(fs.exists(hadoopUtilsTestDir)); + trash.moveToTrash(hadoopUtilsTestDir.getParent()); + Assert.assertFalse(fs.exists(hadoopUtilsTestDir)); + Assert.assertTrue(fs.exists(trashPath)); + } }
