[GOBBLIN-253] Enhance Hive materializer. Closes #2104 from ibuenros/hive-materializer
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5fa98326 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5fa98326 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5fa98326 Branch: refs/heads/master Commit: 5fa983268606335493903e7186836c57eefe40d9 Parents: 30990f4 Author: ibuenros <[email protected]> Authored: Thu Sep 21 16:53:52 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Thu Sep 21 16:53:52 2017 -0700 ---------------------------------------------------------------------- gobblin-data-management/build.gradle | 1 + .../converter/AbstractAvroToOrcConverter.java | 26 +- .../hive/dataset/ConvertibleHiveDataset.java | 125 +++------ .../hive/entities/HiveProcessingEntity.java | 50 ++++ .../QueryBasedHiveConversionEntity.java | 20 +- .../ReplaceTableStageableTableMetadata.java | 33 +++ .../hive/entities/StageableTableMetadata.java | 154 +++++++++++ .../TableLikeStageableTableMetadata.java | 55 ++++ .../materializer/CopyTableQueryGenerator.java | 97 +++++++ .../hive/materializer/HiveMaterializer.java | 155 ++++++++++++ ...iveMaterializerFromEntityQueryGenerator.java | 176 +++++++++++++ .../HiveMaterializerQueryGenerator.java | 94 +++++++ .../HiveMaterializerTaskFactory.java | 46 ++++ .../MaterializeTableQueryGenerator.java | 52 ++++ .../QueryBasedMaterializerQueryGenerator.java | 96 +++++++ .../conversion/hive/source/HiveSource.java | 41 ++- .../conversion/hive/source/HiveWorkUnit.java | 12 + .../hive/task/HiveConverterUtils.java | 104 ++++++-- .../conversion/hive/task/HiveMaterializer.java | 54 ---- .../task/HiveMaterializerQueryGenerator.java | 244 ------------------ .../hive/task/HiveMaterializerSource.java | 60 ----- .../hive/task/HiveMaterializerTaskFactory.java | 46 ---- .../conversion/hive/task/HiveTask.java | 27 +- .../conversion/hive/task/QueryGenerator.java | 2 +- .../hive/writer/HiveQueryExecutionWriter.java | 2 +- .../conversion/hive/HiveSourceTest.java | 8 +- .../hive/LocalHiveMetastoreTestUtils.java | 14 +- .../converter/HiveAvroToOrcConverterTest.java | 4 +- .../hive/materializer/HiveMaterializerTest.java | 253 +++++++++++++++++++ .../PartitionLevelWatermarkerTest.java | 3 +- .../integration/HiveRetentionTest.java | 4 +- .../DatePartitionedHiveVersionFinderTest.java | 6 +- .../hiveMaterializerTest/source/part1/data.txt | 4 + .../hiveMaterializerTest/source/part2/data.txt | 4 + gobblin-example/build.gradle | 1 + .../HiveMaterializerSource.java | 129 ++++++++++ .../src/main/resources/hive-materializer.conf | 19 ++ .../apache/gobblin/util/HiveJdbcConnector.java | 22 +- 38 files changed, 1650 insertions(+), 593 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-data-management/build.gradle b/gobblin-data-management/build.gradle index c10835e..3bb5d1e 100644 --- a/gobblin-data-management/build.gradle +++ b/gobblin-data-management/build.gradle @@ -54,6 +54,7 @@ dependencies { testCompile externalDependency.joptSimple testCompile externalDependency.hamcrest testCompile externalDependency.testng + testCompile externalDependency.hiveJdbc testRuntime project(":gobblin-modules:gobblin-crypto-provider") // for GPG } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java index b8495a9..b8591e7 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java @@ -17,7 +17,6 @@ package org.apache.gobblin.data.management.conversion.hive.converter; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -30,8 +29,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.gobblin.data.management.conversion.hive.entities.HiveProcessingEntity; import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -65,7 +64,6 @@ import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiv import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils; import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator; import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; -import org.apache.gobblin.data.management.copy.hive.HiveUtils; import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist; import org.apache.gobblin.hive.HiveMetastoreClientPool; import org.apache.gobblin.metrics.event.sla.SlaEventKeys; @@ -198,7 +196,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem Preconditions.checkNotNull(outputAvroSchema, "Avro schema must not be null"); Preconditions.checkNotNull(conversionEntity, "Conversion entity must not be null"); Preconditions.checkNotNull(workUnit, "Workunit state must not be null"); - Preconditions.checkNotNull(conversionEntity.getHiveTable(), "Hive table within conversion entity must not be null"); + Preconditions.checkNotNull(conversionEntity.getTable(), "Hive table within conversion entity must not be null"); EventWorkunitUtils.setBeginDDLBuildTimeMetadata(workUnit, System.currentTimeMillis()); @@ -209,7 +207,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem } // Avro table name and location - String avroTableName = conversionEntity.getHiveTable().getTableName(); + String avroTableName = conversionEntity.getTable().getTableName(); // ORC table name and location String orcTableName = getConversionConfig().getDestinationTableName(); @@ -271,7 +269,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem * Upon testing, this did not work */ try { - FileStatus sourceDataFileStatus = this.fs.getFileStatus(conversionEntity.getHiveTable().getDataLocation()); + FileStatus sourceDataFileStatus = this.fs.getFileStatus(conversionEntity.getTable().getDataLocation()); FsPermission sourceDataPermission = sourceDataFileStatus.getPermission(); if (!this.fs.mkdirs(new Path(getConversionConfig().getDestinationDataPath()), sourceDataPermission)) { throw new RuntimeException(String.format("Failed to create path %s with permissions %s", new Path( @@ -297,10 +295,10 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem } // Set hive runtime properties for tracking conversionEntity.getQueries().add(String.format("SET %s=%s", GOBBLIN_DATASET_URN_KEY, - conversionEntity.getHiveTable().getCompleteName())); - if (conversionEntity.getHivePartition().isPresent()) { + conversionEntity.getTable().getCompleteName())); + if (conversionEntity.getPartition().isPresent()) { conversionEntity.getQueries().add(String.format("SET %s=%s", GOBBLIN_PARTITION_NAME_KEY, - conversionEntity.getHivePartition().get().getCompleteName())); + conversionEntity.getPartition().get().getCompleteName())); } conversionEntity.getQueries().add(String .format("SET %s=%s", GOBBLIN_WORKUNIT_CREATE_TIME_KEY, @@ -348,7 +346,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem outputAvroSchema, avroTableName, orcStagingTableName, - Optional.of(conversionEntity.getHiveTable().getDbName()), + Optional.of(conversionEntity.getTable().getDbName()), Optional.of(orcTableDatabase), Optional.of(partitionsDMLInfo), Optional.<Boolean>absent(), @@ -468,7 +466,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem // Move: orcStagingDataPartitionLocation to: orcFinalDataPartitionLocation String orcFinalDataPartitionLocation = orcDataLocation + Path.SEPARATOR + orcStagingDataPartitionDirName; Optional<Path> destPartitionLocation = getDestinationPartitionLocation(destinationTableMeta, workUnit, - conversionEntity.getHivePartition().get().getName()); + conversionEntity.getPartition().get().getName()); orcFinalDataPartitionLocation = HiveConverterUtils.updatePartitionLocation(orcFinalDataPartitionLocation, workUnit, destPartitionLocation); log.info( @@ -626,12 +624,12 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem @VisibleForTesting - public static List<Map<String, String>> getDropPartitionsDDLInfo(QueryBasedHiveConversionEntity conversionEntity) { - if (!conversionEntity.getHivePartition().isPresent()) { + public static List<Map<String, String>> getDropPartitionsDDLInfo(HiveProcessingEntity conversionEntity) { + if (!conversionEntity.getPartition().isPresent()) { return Collections.emptyList(); } - return getDropPartitionsDDLInfo(conversionEntity.getHivePartition().get()); + return getDropPartitionsDDLInfo(conversionEntity.getPartition().get()); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java index ba42811..933e86a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java @@ -16,7 +16,6 @@ */ package org.apache.gobblin.data.management.conversion.hive.dataset; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -25,6 +24,7 @@ import lombok.Getter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.ql.metadata.Table; @@ -34,6 +34,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.typesafe.config.Config; +import org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata; import org.apache.gobblin.data.management.copy.hive.HiveDataset; import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; import org.apache.gobblin.hive.HiveMetastoreClientPool; @@ -144,116 +145,58 @@ public class ConvertibleHiveDataset extends HiveDataset { */ @Getter @ToString - public static class ConversionConfig { - public static final String DESTINATION_TABLE_KEY = "destination.tableName"; + public static class ConversionConfig extends StageableTableMetadata { public static final String DESTINATION_VIEW_KEY = "destination.viewName"; - public static final String DESTINATION_DB_KEY = "destination.dbName"; - public static final String DESTINATION_DATA_PATH_KEY = "destination.dataPath"; - public static final String DESTINATION_TABLE_PROPERTIES_LIST_KEY = "destination.tableProperties"; - public static final String CLUSTER_BY_KEY = "clusterByList"; - public static final String NUM_BUCKETS_KEY = "numBuckets"; - public static final String EVOLUTION_ENABLED = "evolution.enabled"; public static final String UPDATE_VIEW_ALWAYS_ENABLED = "updateViewAlways.enabled"; - public static final String ROW_LIMIT_KEY = "rowLimit"; - public static final String HIVE_VERSION_KEY = "hiveVersion"; - private static final String HIVE_RUNTIME_PROPERTIES_LIST_KEY = "hiveRuntimeProperties"; - - /*** - * Comma separated list of string that should be used as a prefix for destination partition directory name - * ... (if present in the location path string of source partition) - * - * This is helpful in roll-up / compaction scenarios, where you don't want queries in flight to fail. - * - * Scenario without this property: - * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/hourly/2016/01/01/00 is available for - * processing - * - Source partition is processed and published to destination table as: /foo/bar_orc/datepartition=2016-01-01-00 - * - * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/daily/2016/01/01/00 is available again for - * processing (due to roll-up / compaction of hourly data for 2016-01-01 into same partition) - * - Source partition is processed and published to destination table as: /foo/bar_orc/datepartition=2016-01-01-00 - * (previous data is overwritten and any queries in flight fail) - * - * Same scenario with this property set to "hourly,daily": - * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/hourly/2016/01/01/00 is available for - * processing - * - Source partition is processed and published to destination table as: /foo/bar_orc/hourly_datepartition=2016-01-01-00 - * (Note: "hourly_" is prefixed to destination partition directory name because source partition path contains - * "hourly" substring) - * - * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/daily/2016/01/01/00 is available again for - * processing (due to roll-up / compaction of hourly data for 2016-01-01 into same partition) - * - Source partition is processed and published to destination table as: /foo/bar_orc/daily_datepartition=2016-01-01-00 - * (Note: "daily_" is prefixed to destination partition directory name, because source partition path contains - * "daily" substring) - * - Any running queries are not impacted since data is not overwritten and hourly_datepartition=2016-01-01-00 - * directory continues to exist - * - * Notes: - * - This however leaves the responsibility of cleanup of previous destination partition directory on retention or - * other such independent module, since in the above case hourly_datepartition=2016-01-01-00 dir will not be deleted - * - Directories can still be overwritten if they resolve to same destination partition directory name, such as - * re-processing / backfill of daily partition will overwrite daily_datepartition=2016-01-01-00 directory - */ - private static final String SOURCE_DATA_PATH_IDENTIFIER_KEY = "source.dataPathIdentifier"; private final String destinationFormat; - private final String destinationTableName; // destinationViewName : If specified view with 'destinationViewName' is created if not already exists over destinationTableName private final Optional<String> destinationViewName; - private final String destinationStagingTableName; - private final String destinationDbName; - private final String destinationDataPath; - private final Properties destinationTableProperties; - private final List<String> clusterBy; - private final Optional<Integer> numBuckets; - private final Properties hiveRuntimeProperties; - private final boolean evolutionEnabled; - // updateViewAlwaysEnabled: If false 'destinationViewName' is only updated when schema evolves; if true 'destinationViewName' + // updateViewAlwaysEnabled: If false 'destinationViewName' is only updated when schema evolves; if true 'destinationViewName' // ... is always updated (everytime publish happens) private final boolean updateViewAlwaysEnabled; - private final Optional<Integer> rowLimit; - private final List<String> sourceDataPathIdentifier; private ConversionConfig(Config config, Table table, String destinationFormat) { - - Preconditions.checkArgument(config.hasPath(DESTINATION_TABLE_KEY), String.format("Key %s.%s is not specified", destinationFormat, DESTINATION_TABLE_KEY)); - Preconditions.checkArgument(config.hasPath(DESTINATION_DB_KEY), String.format("Key %s.%s is not specified", destinationFormat, DESTINATION_DB_KEY)); - Preconditions.checkArgument(config.hasPath(DESTINATION_DATA_PATH_KEY), - String.format("Key %s.%s is not specified", destinationFormat, DESTINATION_DATA_PATH_KEY)); + super(config, table); // Required this.destinationFormat = destinationFormat; - this.destinationTableName = resolveTemplate(config.getString(DESTINATION_TABLE_KEY), table); - this.destinationStagingTableName = String.format("%s_%s", this.destinationTableName, "staging"); // Fixed and non-configurable - this.destinationDbName = resolveTemplate(config.getString(DESTINATION_DB_KEY), table); - this.destinationDataPath = resolveTemplate(config.getString(DESTINATION_DATA_PATH_KEY), table); // Optional this.destinationViewName = Optional.fromNullable(resolveTemplate(ConfigUtils.getString(config, DESTINATION_VIEW_KEY, null), table)); - this.destinationTableProperties = - convertKeyValueListToProperties(ConfigUtils.getStringList(config, DESTINATION_TABLE_PROPERTIES_LIST_KEY)); - this.clusterBy = ConfigUtils.getStringList(config, CLUSTER_BY_KEY); - this.numBuckets = Optional.fromNullable(ConfigUtils.getInt(config, NUM_BUCKETS_KEY, null)); - - this.hiveRuntimeProperties = - convertKeyValueListToProperties(ConfigUtils.getStringList(config, HIVE_RUNTIME_PROPERTIES_LIST_KEY)); - this.evolutionEnabled = ConfigUtils.getBoolean(config, EVOLUTION_ENABLED, false); this.updateViewAlwaysEnabled = ConfigUtils.getBoolean(config, UPDATE_VIEW_ALWAYS_ENABLED, true); - this.rowLimit = Optional.fromNullable(ConfigUtils.getInt(config, ROW_LIMIT_KEY, null)); - this.sourceDataPathIdentifier = ConfigUtils.getStringList(config, SOURCE_DATA_PATH_IDENTIFIER_KEY); } - private Properties convertKeyValueListToProperties(List<String> keyValueList) { - Preconditions.checkArgument(keyValueList.size() % 2 == 0, String.format( - "The list %s does not have equal number of keys and values. Size %s", keyValueList, keyValueList.size())); - Properties props = new Properties(); - for (int i = 0; i < keyValueList.size(); i += 2) { - String key = keyValueList.get(i); - String value = keyValueList.get(i + 1); - props.put(key, value); + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + ConversionConfig that = (ConversionConfig) o; + + if (isUpdateViewAlwaysEnabled() != that.isUpdateViewAlwaysEnabled()) { + return false; } - return props; + if (!getDestinationFormat().equals(that.getDestinationFormat())) { + return false; + } + return getDestinationViewName().equals(that.getDestinationViewName()); + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + getDestinationFormat().hashCode(); + result = 31 * result + getDestinationViewName().hashCode(); + result = 31 * result + (isUpdateViewAlwaysEnabled() ? 1 : 0); + return result; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/HiveProcessingEntity.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/HiveProcessingEntity.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/HiveProcessingEntity.java new file mode 100644 index 0000000..5a9a819 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/HiveProcessingEntity.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.entities; + +import org.apache.gobblin.data.management.copy.hive.HiveDataset; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; + +import com.google.common.base.Optional; + +import lombok.Getter; + + +/** + * Represents a Hive table and optionally partition. + */ +@Getter +public class HiveProcessingEntity { + + private final HiveDataset hiveDataset; + private final Table table; + private final Optional<Partition> partition; + + public HiveProcessingEntity(HiveDataset hiveDataset, Table table) { + this(hiveDataset, table, Optional.absent()); + } + + public HiveProcessingEntity(HiveDataset convertibleHiveDataset, Table table, + Optional<Partition> partition) { + this.hiveDataset = convertibleHiveDataset; + this.table = table; + this.partition = partition; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java index 1486fe5..a84e55b 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java @@ -18,13 +18,6 @@ package org.apache.gobblin.data.management.conversion.hive.entities; import java.util.List; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.ToString; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; - import org.apache.gobblin.converter.Converter; import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter; import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; @@ -35,6 +28,15 @@ import org.apache.gobblin.hive.HiveRegistrationUnit; import org.apache.gobblin.hive.HiveTable; import org.apache.gobblin.source.extractor.Extractor; +import org.apache.hadoop.hive.ql.metadata.Partition; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + /** * Represents a gobblin Record in the Hive avro to orc conversion flow. @@ -54,7 +56,7 @@ import org.apache.gobblin.source.extractor.Extractor; @ToString @EqualsAndHashCode @Getter -public class QueryBasedHiveConversionEntity { +public class QueryBasedHiveConversionEntity extends HiveProcessingEntity { private final ConvertibleHiveDataset convertibleHiveDataset; private final SchemaAwareHiveTable hiveTable; @@ -71,9 +73,11 @@ public class QueryBasedHiveConversionEntity { public QueryBasedHiveConversionEntity(ConvertibleHiveDataset convertibleHiveDataset, SchemaAwareHiveTable hiveTable, Optional<SchemaAwareHivePartition> hivePartition) { + super(convertibleHiveDataset, hiveTable, Optional.fromNullable(hivePartition.orNull())); this.convertibleHiveDataset = convertibleHiveDataset; this.hiveTable = hiveTable; this.hivePartition = hivePartition; this.queries = Lists.newArrayList(); } + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/ReplaceTableStageableTableMetadata.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/ReplaceTableStageableTableMetadata.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/ReplaceTableStageableTableMetadata.java new file mode 100644 index 0000000..76c0c4e --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/ReplaceTableStageableTableMetadata.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.entities; + +import org.apache.hadoop.hive.ql.metadata.Table; + + +/** + * A {@link StageableTableMetadata} intended where the target table is the same as the reference table. Intended to + * replace the original table. + */ +public class ReplaceTableStageableTableMetadata extends TableLikeStageableTableMetadata { + + public ReplaceTableStageableTableMetadata(Table referenceTable) { + super(referenceTable, referenceTable.getDbName(), referenceTable.getTableName(), referenceTable.getDataLocation().toString()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java new file mode 100644 index 0000000..39494c2 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.entities; + +import java.util.List; +import java.util.Properties; + +import org.apache.gobblin.data.management.copy.hive.HiveDataset; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.hadoop.hive.ql.metadata.Table; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Data; + + +/** + * Contains metadata associated with a stageable table. + * + * This class contains information about two Hive tables: a final destination table and a staging table. The staging + * table is used as temporary storage during job run to aid with consistency of the final destination table. + */ +@Data +@AllArgsConstructor +public class StageableTableMetadata { + + public static final String DESTINATION_TABLE_KEY = "destination.tableName"; + public static final String DESTINATION_DB_KEY = "destination.dbName"; + public static final String DESTINATION_DATA_PATH_KEY = "destination.dataPath"; + public static final String DESTINATION_TABLE_PROPERTIES_LIST_KEY = "destination.tableProperties"; + public static final String CLUSTER_BY_KEY = "clusterByList"; + public static final String NUM_BUCKETS_KEY = "numBuckets"; + public static final String EVOLUTION_ENABLED = "evolution.enabled"; + public static final String ROW_LIMIT_KEY = "rowLimit"; + public static final String HIVE_VERSION_KEY = "hiveVersion"; + public static final String HIVE_RUNTIME_PROPERTIES_LIST_KEY = "hiveRuntimeProperties"; + /*** + * Comma separated list of string that should be used as a prefix for destination partition directory name + * ... (if present in the location path string of source partition) + * + * This is helpful in roll-up / compaction scenarios, where you don't want queries in flight to fail. + * + * Scenario without this property: + * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/hourly/2016/01/01/00 is available for + * processing + * - Source partition is processed and published to destination table as: /foo/bar_orc/datepartition=2016-01-01-00 + * + * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/daily/2016/01/01/00 is available again for + * processing (due to roll-up / compaction of hourly data for 2016-01-01 into same partition) + * - Source partition is processed and published to destination table as: /foo/bar_orc/datepartition=2016-01-01-00 + * (previous data is overwritten and any queries in flight fail) + * + * Same scenario with this property set to "hourly,daily": + * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/hourly/2016/01/01/00 is available for + * processing + * - Source partition is processed and published to destination table as: /foo/bar_orc/hourly_datepartition=2016-01-01-00 + * (Note: "hourly_" is prefixed to destination partition directory name because source partition path contains + * "hourly" substring) + * + * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/daily/2016/01/01/00 is available again for + * processing (due to roll-up / compaction of hourly data for 2016-01-01 into same partition) + * - Source partition is processed and published to destination table as: /foo/bar_orc/daily_datepartition=2016-01-01-00 + * (Note: "daily_" is prefixed to destination partition directory name, because source partition path contains + * "daily" substring) + * - Any running queries are not impacted since data is not overwritten and hourly_datepartition=2016-01-01-00 + * directory continues to exist + * + * Notes: + * - This however leaves the responsibility of cleanup of previous destination partition directory on retention or + * other such independent module, since in the above case hourly_datepartition=2016-01-01-00 dir will not be deleted + * - Directories can still be overwritten if they resolve to same destination partition directory name, such as + * re-processing / backfill of daily partition will overwrite daily_datepartition=2016-01-01-00 directory + */ + public static final String SOURCE_DATA_PATH_IDENTIFIER_KEY = "source.dataPathIdentifier"; + + + /** Table name of the destination table. */ + private final String destinationTableName; + /** Table name of the staging table. */ + private final String destinationStagingTableName; + /** Name of db for destination name. */ + private final String destinationDbName; + /** Path where files of the destination table should be located. */ + private final String destinationDataPath; + /** Table properties of destination table. */ + private final Properties destinationTableProperties; + /** List of columns to cluster by. */ + private final List<String> clusterBy; + /** Number of buckets in destination table. */ + private final Optional<Integer> numBuckets; + private final Properties hiveRuntimeProperties; + private final boolean evolutionEnabled; + private final Optional<Integer> rowLimit; + private final List<String> sourceDataPathIdentifier; + + public StageableTableMetadata(Config config, @Nullable Table referenceTable) { + Preconditions.checkArgument(config.hasPath(DESTINATION_TABLE_KEY), String.format("Key %s is not specified", DESTINATION_TABLE_KEY)); + Preconditions.checkArgument(config.hasPath(DESTINATION_DB_KEY), String.format("Key %s is not specified", DESTINATION_DB_KEY)); + Preconditions.checkArgument(config.hasPath(DESTINATION_DATA_PATH_KEY), + String.format("Key %s is not specified", DESTINATION_DATA_PATH_KEY)); + + // Required + this.destinationTableName = referenceTable == null ? config.getString(DESTINATION_TABLE_KEY) + : HiveDataset.resolveTemplate(config.getString(DESTINATION_TABLE_KEY), referenceTable); + this.destinationStagingTableName = String.format("%s_%s", this.destinationTableName, "staging"); // Fixed and non-configurable + this.destinationDbName = referenceTable == null ? config.getString(DESTINATION_DB_KEY) + : HiveDataset.resolveTemplate(config.getString(DESTINATION_DB_KEY), referenceTable); + this.destinationDataPath = referenceTable == null ? config.getString(DESTINATION_DATA_PATH_KEY) + : HiveDataset.resolveTemplate(config.getString(DESTINATION_DATA_PATH_KEY), referenceTable); + + // Optional + this.destinationTableProperties = + convertKeyValueListToProperties(ConfigUtils.getStringList(config, DESTINATION_TABLE_PROPERTIES_LIST_KEY)); + this.clusterBy = ConfigUtils.getStringList(config, CLUSTER_BY_KEY); + this.numBuckets = Optional.fromNullable(ConfigUtils.getInt(config, NUM_BUCKETS_KEY, null)); + + this.hiveRuntimeProperties = + convertKeyValueListToProperties(ConfigUtils.getStringList(config, HIVE_RUNTIME_PROPERTIES_LIST_KEY)); + this.evolutionEnabled = ConfigUtils.getBoolean(config, EVOLUTION_ENABLED, false); + this.rowLimit = Optional.fromNullable(ConfigUtils.getInt(config, ROW_LIMIT_KEY, null)); + this.sourceDataPathIdentifier = ConfigUtils.getStringList(config, SOURCE_DATA_PATH_IDENTIFIER_KEY); + } + + private Properties convertKeyValueListToProperties(List<String> keyValueList) { + Preconditions.checkArgument(keyValueList.size() % 2 == 0, String.format( + "The list %s does not have equal number of keys and values. Size %s", keyValueList, keyValueList.size())); + Properties props = new Properties(); + for (int i = 0; i < keyValueList.size(); i += 2) { + String key = keyValueList.get(i); + String value = keyValueList.get(i + 1); + props.put(key, value); + } + return props; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java new file mode 100644 index 0000000..b4fe9d4 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.entities; + +import java.util.ArrayList; +import java.util.Properties; + +import org.apache.gobblin.data.management.copy.hive.HiveDataset; +import org.apache.hadoop.hive.ql.metadata.Table; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; + + +/** + * A {@link StageableTableMetadata} that copies most metadata from a reference table. + */ +public class TableLikeStageableTableMetadata extends StageableTableMetadata { + + public TableLikeStageableTableMetadata(Table referenceTable, String destinationDB, String destinationTableName, String targetDataPath) { + super(destinationTableName, destinationTableName + "_STAGING", destinationDB, targetDataPath, + getTableProperties(referenceTable), new ArrayList<>(), Optional.of(referenceTable.getNumBuckets()), new Properties(), false, Optional.absent(), + new ArrayList<>()); + } + + public TableLikeStageableTableMetadata(Table referenceTable, Config config) { + super(HiveDataset.resolveTemplate(config.getString(StageableTableMetadata.DESTINATION_TABLE_KEY), referenceTable), + HiveDataset.resolveTemplate(config.getString(StageableTableMetadata.DESTINATION_TABLE_KEY), referenceTable) + "_STAGING", + HiveDataset.resolveTemplate(config.getString(StageableTableMetadata.DESTINATION_DB_KEY), referenceTable), + HiveDataset.resolveTemplate(config.getString(DESTINATION_DATA_PATH_KEY), referenceTable), + getTableProperties(referenceTable), new ArrayList<>(), Optional.of(referenceTable.getNumBuckets()), + new Properties(), false, Optional.absent(), new ArrayList<>()); + } + + private static Properties getTableProperties(Table table) { + Properties properties = new Properties(); + properties.putAll(table.getParameters()); + return properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java new file mode 100644 index 0000000..8ff0913 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.materializer; + +import java.io.IOException; +import java.util.List; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A {@link org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator} that generates queries to exactly + * copy an input table / partition. + */ +@Slf4j +public class CopyTableQueryGenerator extends HiveMaterializerFromEntityQueryGenerator { + + public CopyTableQueryGenerator(WorkUnitState workUnitState) throws IOException { + super(workUnitState, true); + } + + /** + * Returns hive queries to be run as a part of a hive task. + * This does not include publish queries. + * @return + */ + @Override + public List<String> generateQueries() { + + List<String> hiveQueries = Lists.newArrayList(); + /* + * Setting partition mode to 'nonstrict' is needed to improve readability of the code. + * If we do not set dynamic partition mode to nonstrict, we will have to write partition values also, + * and because hive considers partition as a virtual column, we also have to write each of the column + * name in the query (in place of *) to match source and target columns. + */ + hiveQueries.add("SET hive.exec.dynamic.partition.mode=nonstrict"); + + Preconditions.checkNotNull(this.workUnit, "Workunit must not be null"); + EventWorkunitUtils.setBeginDDLBuildTimeMetadata(this.workUnit, System.currentTimeMillis()); + + HiveConverterUtils.createStagingDirectory(fs, outputTableMetadata.getDestinationDataPath(), + conversionEntity, this.workUnitState); + + // Create DDL statement for table + String createStagingTableDDL = + HiveConverterUtils.generateCreateDuplicateTableDDL( + inputDbName, + inputTableName, + stagingTableName, + stagingDataLocation, + Optional.of(outputDatabaseName)); + hiveQueries.add(createStagingTableDDL); + log.debug("Create staging table DDL:\n" + createStagingTableDDL); + + + String insertInStagingTableDML = + HiveConverterUtils + .generateTableCopy( + inputTableName, + stagingTableName, + conversionEntity.getTable().getDbName(), + outputDatabaseName, + Optional.of(partitionsDMLInfo)); + hiveQueries.add(insertInStagingTableDML); + log.debug("Conversion staging DML: " + insertInStagingTableDML); + + log.info("Conversion Queries {}\n", hiveQueries); + + EventWorkunitUtils.setEndDDLBuildTimeMetadata(workUnit, System.currentTimeMillis()); + return hiveQueries; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializer.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializer.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializer.java new file mode 100644 index 0000000..d4b3ca4 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializer.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.materializer; + +import java.io.IOException; +import java.util.List; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; +import org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata; +import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; +import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; +import org.apache.gobblin.data.management.conversion.hive.task.HiveTask; +import org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator; +import org.apache.gobblin.data.management.copy.hive.HiveDataset; +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.runtime.task.TaskUtils; +import org.apache.gobblin.source.workunit.WorkUnit; + +import com.google.common.base.Strings; + +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; + +@Slf4j + +/** + * A simple {@link HiveTask} for Hive view materialization. + */ +public class HiveMaterializer extends HiveTask { + + protected static final String STAGEABLE_TABLE_METADATA_KEY = "internal.hiveMaterializer.stageableTableMetadata"; + protected static final String MATERIALIZER_MODE_KEY = "internal.hiveMaterializer.materializerMode"; + protected static final String STORAGE_FORMAT_KEY = "internal.hiveMaterializer.storageFormat"; + protected static final String QUERY_RESULT_TO_MATERIALIZE_KEY = "internal.hiveMaterializer.queryResultToMaterialize"; + + /** + * Create a work unit to copy a source table to a target table using a staging table in between. + * @param dataset {@link HiveDataset} for the source table. + * @param destinationTable {@link StageableTableMetadata} specifying staging and target tables metadata. + */ + public static HiveWorkUnit tableCopyWorkUnit(HiveDataset dataset, StageableTableMetadata destinationTable, + @Nullable String partitionName) { + HiveWorkUnit workUnit = new HiveWorkUnit(dataset); + workUnit.setProp(MATERIALIZER_MODE_KEY, MaterializerMode.TABLE_COPY.name()); + workUnit.setProp(STAGEABLE_TABLE_METADATA_KEY, HiveSource.GENERICS_AWARE_GSON.toJson(destinationTable)); + if (!Strings.isNullOrEmpty(partitionName)) { + workUnit.setPartitionName(partitionName); + } + TaskUtils.setTaskFactoryClass(workUnit, HiveMaterializerTaskFactory.class); + return workUnit; + } + + /** + * Create a work unit to materialize a table / view to a target table using a staging table in between. + * @param dataset {@link HiveDataset} for the source table. + * @param storageFormat format in which target table should be written. + * @param destinationTable {@link StageableTableMetadata} specifying staging and target tables metadata. + */ + public static HiveWorkUnit viewMaterializationWorkUnit(HiveDataset dataset, HiveConverterUtils.StorageFormat storageFormat, + StageableTableMetadata destinationTable, @Nullable String partitionName) { + HiveWorkUnit workUnit = new HiveWorkUnit(dataset); + workUnit.setProp(MATERIALIZER_MODE_KEY, MaterializerMode.TABLE_MATERIALIZATION.name()); + workUnit.setProp(STORAGE_FORMAT_KEY, storageFormat.name()); + workUnit.setProp(STAGEABLE_TABLE_METADATA_KEY, HiveSource.GENERICS_AWARE_GSON.toJson(destinationTable)); + if (!Strings.isNullOrEmpty(partitionName)) { + workUnit.setPartitionName(partitionName); + } + TaskUtils.setTaskFactoryClass(workUnit, HiveMaterializerTaskFactory.class); + return workUnit; + } + + /** + * Create a work unit to materialize a query to a target table using a staging table in between. + * @param query the query to materialize. + * @param storageFormat format in which target table should be written. + * @param destinationTable {@link StageableTableMetadata} specifying staging and target tables metadata. + */ + public static WorkUnit queryResultMaterializationWorkUnit(String query, HiveConverterUtils.StorageFormat storageFormat, + StageableTableMetadata destinationTable) { + WorkUnit workUnit = new WorkUnit(); + workUnit.setProp(MATERIALIZER_MODE_KEY, MaterializerMode.QUERY_RESULT_MATERIALIZATION.name()); + workUnit.setProp(STORAGE_FORMAT_KEY, storageFormat.name()); + workUnit.setProp(QUERY_RESULT_TO_MATERIALIZE_KEY, query); + workUnit.setProp(STAGEABLE_TABLE_METADATA_KEY, HiveSource.GENERICS_AWARE_GSON.toJson(destinationTable)); + TaskUtils.setTaskFactoryClass(workUnit, HiveMaterializerTaskFactory.class); + HiveTask.disableHiveWatermarker(workUnit); + return workUnit; + } + + public static StageableTableMetadata parseStageableTableMetadata(WorkUnit workUnit) { + return HiveSource.GENERICS_AWARE_GSON.fromJson(workUnit.getProp(STAGEABLE_TABLE_METADATA_KEY), StageableTableMetadata.class); + } + + private enum MaterializerMode { + + /** Materialize a table or view into a new table possibly with a new storage format. */ + TABLE_MATERIALIZATION { + @Override + public QueryGenerator createQueryGenerator(WorkUnitState state) throws IOException { + return new MaterializeTableQueryGenerator(state); + } + }, + /** Copy a table into a new table with the same properties. */ + TABLE_COPY { + @Override + public QueryGenerator createQueryGenerator(WorkUnitState state) throws IOException { + return new CopyTableQueryGenerator(state); + } + }, + /** Materialize a query into a table. */ + QUERY_RESULT_MATERIALIZATION { + @Override + public QueryGenerator createQueryGenerator(WorkUnitState state) throws IOException { + return new QueryBasedMaterializerQueryGenerator(state); + } + }; + + public abstract QueryGenerator createQueryGenerator(WorkUnitState state) throws IOException; + } + + private final QueryGenerator queryGenerator; + + public HiveMaterializer(TaskContext taskContext) throws IOException { + super(taskContext); + MaterializerMode materializerMode = MaterializerMode.valueOf(this.workUnitState.getProp(MATERIALIZER_MODE_KEY)); + this.queryGenerator = materializerMode.createQueryGenerator(this.workUnitState); + } + + @Override + public List<String> generateHiveQueries() { + return queryGenerator.generateQueries(); + } + + @Override + public QueryBasedHivePublishEntity generatePublishQueries() throws Exception { + return queryGenerator.generatePublishQueries(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java new file mode 100644 index 0000000..872a3f4 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.materializer; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter; +import org.apache.gobblin.data.management.conversion.hive.entities.HiveProcessingEntity; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; +import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator; +import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; +import org.apache.gobblin.data.management.copy.hive.HiveDataset; +import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; +import org.apache.gobblin.util.AutoReturnableObject; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.thrift.TException; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + + +/** + * An abstract {@link org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator} containing common methods + * for materializing existing tables / partitions / views. + */ +@Slf4j +public abstract class HiveMaterializerFromEntityQueryGenerator extends HiveMaterializerQueryGenerator { + + protected final String inputDbName; + protected final String inputTableName; + + protected final List<String> sourceDataPathIdentifier; + protected final String stagingDataPartitionDirName; + protected final String stagingDataPartitionLocation; + protected final Map<String, String> partitionsDDLInfo; + protected final Map<String, String> partitionsDMLInfo; + protected final HiveProcessingEntity conversionEntity; + protected final Table sourceTable; + protected final boolean supportTargetPartitioning; + + public HiveMaterializerFromEntityQueryGenerator(WorkUnitState workUnitState, boolean supportTargetPartitioning) + throws IOException { + super(workUnitState); + + + try { + this.conversionEntity = getConversionEntity(this.workUnit); + } catch (TException | HiveException ex) { + throw new IOException(ex); + } + this.sourceTable = this.conversionEntity.getTable(); + this.inputDbName = this.sourceTable.getDbName(); + this.inputTableName = this.sourceTable.getTableName(); + + this.sourceDataPathIdentifier = this.outputTableMetadata.getSourceDataPathIdentifier(); + this.stagingDataPartitionDirName = HiveConverterUtils.getStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier); + this.stagingDataPartitionLocation = stagingDataLocation + Path.SEPARATOR + stagingDataPartitionDirName; + this.partitionsDDLInfo = Maps.newHashMap(); + this.partitionsDMLInfo = Maps.newHashMap(); + HiveConverterUtils.populatePartitionInfo(conversionEntity, partitionsDDLInfo, partitionsDMLInfo); + this.supportTargetPartitioning = supportTargetPartitioning; + } + + private HiveProcessingEntity getConversionEntity(HiveWorkUnit hiveWorkUnit) throws IOException, TException, + HiveException { + + try (AutoReturnableObject<IMetaStoreClient> client = this.pool.getClient()) { + + HiveDataset dataset = hiveWorkUnit.getHiveDataset(); + HiveDatasetFinder.DbAndTable dbAndTable = dataset.getDbAndTable(); + + Table table = new Table(client.get().getTable(dbAndTable.getDb(), dbAndTable.getTable())); + + Partition partition = null; + if (hiveWorkUnit.getPartitionName().isPresent()) { + partition = new Partition(table, client.get() + .getPartition(dbAndTable.getDb(), dbAndTable.getTable(), hiveWorkUnit.getPartitionName().get())); + } + return new HiveProcessingEntity(dataset, table, Optional.fromNullable(partition)); + } + } + + /** + * Returns a QueryBasedHivePublishEntity which includes publish level queries and cleanup commands. + * @return QueryBasedHivePublishEntity + * @throws DataConversionException + */ + public QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException { + + QueryBasedHivePublishEntity publishEntity = new QueryBasedHivePublishEntity(); + List<String> publishQueries = publishEntity.getPublishQueries(); + Map<String, String> publishDirectories = publishEntity.getPublishDirectories(); + List<String> cleanupQueries = publishEntity.getCleanupQueries(); + List<String> cleanupDirectories = publishEntity.getCleanupDirectories(); + + String createFinalTableDDL = + HiveConverterUtils.generateCreateDuplicateTableDDL(outputDatabaseName, stagingTableName, outputTableName, + outputDataLocation, Optional.of(outputDatabaseName)); + publishQueries.add(createFinalTableDDL); + log.debug("Create final table DDL:\n" + createFinalTableDDL); + + if (!this.supportTargetPartitioning || partitionsDDLInfo.size() == 0) { + log.debug("Snapshot directory to move: " + stagingDataLocation + " to: " + outputDataLocation); + publishDirectories.put(stagingDataLocation, outputDataLocation); + + String dropStagingTableDDL = HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName); + + log.debug("Drop staging table DDL: " + dropStagingTableDDL); + cleanupQueries.add(dropStagingTableDDL); + + log.debug("Staging table directory to delete: " + stagingDataLocation); + cleanupDirectories.add(stagingDataLocation); + } else { + String finalDataPartitionLocation = outputDataLocation + Path.SEPARATOR + stagingDataPartitionDirName; + Optional<Path> destPartitionLocation = + HiveConverterUtils.getDestinationPartitionLocation(destinationTableMeta, this.workUnitState, + conversionEntity.getPartition().get().getName()); + finalDataPartitionLocation = HiveConverterUtils.updatePartitionLocation(finalDataPartitionLocation, this.workUnitState, + destPartitionLocation); + + log.debug("Partition directory to move: " + stagingDataPartitionLocation + " to: " + finalDataPartitionLocation); + publishDirectories.put(stagingDataPartitionLocation, finalDataPartitionLocation); + List<String> dropPartitionsDDL = + HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName, partitionsDMLInfo); + log.debug("Drop partitions if exist in final table: " + dropPartitionsDDL); + publishQueries.addAll(dropPartitionsDDL); + List<String> createFinalPartitionDDL = + HiveAvroORCQueryGenerator.generateCreatePartitionDDL(outputDatabaseName, outputTableName, + finalDataPartitionLocation, partitionsDMLInfo, Optional.<String>absent()); + + log.debug("Create final partition DDL: " + createFinalPartitionDDL); + publishQueries.addAll(createFinalPartitionDDL); + + String dropStagingTableDDL = + HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName); + + log.debug("Drop staging table DDL: " + dropStagingTableDDL); + cleanupQueries.add(dropStagingTableDDL); + + log.debug("Staging table directory to delete: " + stagingDataLocation); + cleanupDirectories.add(stagingDataLocation); + + publishQueries.addAll(HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName, + AbstractAvroToOrcConverter.getDropPartitionsDDLInfo(conversionEntity))); + } + + log.info("Publish partition entity: " + publishEntity); + return publishEntity; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java new file mode 100644 index 0000000..803e043 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.materializer; + +import java.io.IOException; +import java.util.List; + +import org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata; +import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; +import org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator; +import org.apache.hadoop.fs.FileSystem; + +import com.google.common.base.Optional; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; +import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; +import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; +import org.apache.gobblin.hive.HiveMetastoreClientPool; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +/** + * A base abstract query generator for {@link HiveMaterializer}. + */ +public abstract class HiveMaterializerQueryGenerator implements QueryGenerator { + protected final FileSystem fs; + protected final StageableTableMetadata outputTableMetadata; + + protected final String outputDatabaseName; + protected final String outputTableName; + protected final String outputDataLocation; + + protected final String stagingTableName; + protected final String stagingDataLocation; + + protected final Optional<org.apache.hadoop.hive.metastore.api.Table> destinationTableMeta; + protected final HiveWorkUnit workUnit; + protected final HiveMetastoreClientPool pool; + protected final WorkUnitState workUnitState; + + public HiveMaterializerQueryGenerator(WorkUnitState workUnitState) throws IOException { + this.fs = HiveSource.getSourceFs(workUnitState); + this.pool = HiveMetastoreClientPool.get(workUnitState.getJobState().getProperties(), + Optional.fromNullable(workUnitState.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY))); + + this.workUnitState = workUnitState; + this.workUnit = new HiveWorkUnit(workUnitState.getWorkunit()); + + this.outputTableMetadata = HiveMaterializer.parseStageableTableMetadata(this.workUnit); + this.outputDatabaseName = outputTableMetadata.getDestinationDbName(); + this.outputTableName = outputTableMetadata.getDestinationTableName(); + this.outputDataLocation = HiveConverterUtils.getOutputDataLocation(outputTableMetadata.getDestinationDataPath()); + + this.destinationTableMeta = HiveConverterUtils.getDestinationTableMeta(this.outputTableMetadata.getDestinationDbName(), + this.outputTableMetadata.getDestinationTableName(), workUnitState.getProperties()).getLeft(); + + this.stagingTableName = HiveConverterUtils.getStagingTableName(this.outputTableMetadata.getDestinationStagingTableName()); + this.stagingDataLocation = HiveConverterUtils.getStagingDataLocation(this.outputTableMetadata.getDestinationDataPath(), this.stagingTableName); + } + + /** + * Returns hive queries to be run as a part of a hive task. + * This does not include publish queries. + * @return + */ + @Override + public abstract List<String> generateQueries(); + + /** + * Retuens a QueryBasedHivePublishEntity which includes publish level queries and cleanup commands. + * @return QueryBasedHivePublishEntity + * @throws DataConversionException + */ + public abstract QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTaskFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTaskFactory.java new file mode 100644 index 0000000..9de376b --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTaskFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.materializer; + +import org.apache.gobblin.publisher.DataPublisher; +import org.apache.gobblin.publisher.NoopPublisher; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.runtime.task.TaskFactory; +import org.apache.gobblin.runtime.task.TaskIFace; + +/** + * A {@link TaskFactory} that runs a {@link HiveMaterializer} task. + * This factory is intended to publish data in the task directly, and + * uses a {@link NoopPublisher}. + */ +public class HiveMaterializerTaskFactory implements TaskFactory { + @Override + public TaskIFace createTask(TaskContext taskContext) { + try { + return new HiveMaterializer(taskContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public DataPublisher createDataPublisher(JobState.DatasetState datasetState) { + return new NoopPublisher(datasetState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java new file mode 100644 index 0000000..fa91d15 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.materializer; + +import java.io.IOException; +import java.util.List; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; +import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; + +import com.google.common.collect.Lists; + + +/** + * A {@link org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator} to materialize a copy of an existing + * Hive table / partition. + */ +public class MaterializeTableQueryGenerator extends HiveMaterializerFromEntityQueryGenerator { + + private final HiveConverterUtils.StorageFormat storageFormat; + + public MaterializeTableQueryGenerator(WorkUnitState workUnitState) throws IOException { + super(workUnitState, false); + + this.storageFormat = HiveConverterUtils.StorageFormat.valueOf(workUnitState.getProp(HiveMaterializer.STORAGE_FORMAT_KEY)); + } + + @Override + public List<String> generateQueries() { + return Lists.newArrayList(HiveConverterUtils.generateStagingCTASStatementFromSelectStar( + new HiveDatasetFinder.DbAndTable(this.outputDatabaseName, this.stagingTableName), + new HiveDatasetFinder.DbAndTable(this.inputDbName, this.inputTableName), + this.partitionsDMLInfo, this.storageFormat, + this.stagingDataLocation)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java new file mode 100644 index 0000000..37a50b3 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.materializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; +import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; +import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A {@link org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator} to materialize the result of a Hive + * query. + */ +@Slf4j +public class QueryBasedMaterializerQueryGenerator extends HiveMaterializerQueryGenerator { + + private final String sourceQuery; + private final HiveConverterUtils.StorageFormat storageFormat; + + public QueryBasedMaterializerQueryGenerator(WorkUnitState workUnitState) throws IOException { + super(workUnitState); + + this.sourceQuery = workUnitState.getProp(HiveMaterializer.QUERY_RESULT_TO_MATERIALIZE_KEY); + this.storageFormat = HiveConverterUtils.StorageFormat.valueOf(workUnitState.getProp(HiveMaterializer.STORAGE_FORMAT_KEY)); + } + + @Override + public List<String> generateQueries() { + return Lists.newArrayList(HiveConverterUtils.generateStagingCTASStatement( + new HiveDatasetFinder.DbAndTable(this.outputDatabaseName, this.stagingTableName), + this.sourceQuery, + this.storageFormat, + this.stagingDataLocation)); + } + + @Override + public QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException { + QueryBasedHivePublishEntity publishEntity = new QueryBasedHivePublishEntity(); + List<String> publishQueries = publishEntity.getPublishQueries(); + Map<String, String> publishDirectories = publishEntity.getPublishDirectories(); + List<String> cleanupQueries = publishEntity.getCleanupQueries(); + List<String> cleanupDirectories = publishEntity.getCleanupDirectories(); + + String createFinalTableDDL = + HiveConverterUtils.generateCreateDuplicateTableDDL(outputDatabaseName, stagingTableName, outputTableName, + outputDataLocation, Optional.of(outputDatabaseName)); + publishQueries.add(createFinalTableDDL); + log.debug("Create final table DDL:\n" + createFinalTableDDL); + + log.debug("Snapshot directory to move: " + stagingDataLocation + " to: " + outputDataLocation); + publishDirectories.put(stagingDataLocation, outputDataLocation); + + String dropStagingTableDDL = HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName); + + log.debug("Drop staging table DDL: " + dropStagingTableDDL); + cleanupQueries.add(dropStagingTableDDL); + + log.debug("Staging table directory to delete: " + stagingDataLocation); + cleanupDirectories.add(stagingDataLocation); + + + publishQueries.addAll(HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName, + new HashMap<>())); + + log.info("Publish partition entity: " + publishEntity); + return publishEntity; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java index 74a8b3b..b71d6b6 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.joda.time.DateTime; @@ -243,15 +244,15 @@ public class HiveSource implements Source { "Creating workunit for table %s as updateTime %s or createTime %s is greater than low watermark %s", hiveDataset.getTable().getCompleteName(), updateTime, hiveDataset.getTable().getTTable().getCreateTime(), lowWatermark.getValue())); + HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset); + LongWatermark expectedDatasetHighWatermark = this.watermarker.getExpectedHighWatermark(hiveDataset.getTable(), tableProcessTime); - HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset); - hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable())); - hiveWorkUnit.setTableLocation(hiveDataset.getTable().getSd().getLocation()); hiveWorkUnit.setWatermarkInterval(new WatermarkInterval(lowWatermark, expectedDatasetHighWatermark)); EventWorkunitUtils.setTableSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), updateTime, lowWatermark.getValue(), this.beginGetWorkunitsTime); + this.workunits.add(hiveWorkUnit); log.debug(String.format("Workunit added for table: %s", hiveWorkUnit)); @@ -264,11 +265,19 @@ public class HiveSource implements Source { } } catch (UpdateNotFoundException e) { log.error(String.format("Not Creating workunit for %s as update time was not found. %s", hiveDataset.getTable() - .getCompleteName(), e.getMessage())); + .getCompleteName(), e.getMessage()), e); } catch (SchemaNotFoundException e) { log.error(String.format("Not Creating workunit for %s as schema was not found. %s", hiveDataset.getTable() - .getCompleteName(), e.getMessage())); + .getCompleteName(), e.getMessage()), e); + } + } + + protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset) throws IOException { + HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset); + if (isAvro(hiveDataset.getTable())) { + hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable())); } + return hiveWorkUnit; } private void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException { @@ -315,17 +324,12 @@ public class HiveSource implements Source { LongWatermark expectedPartitionHighWatermark = this.watermarker.getExpectedHighWatermark(sourcePartition, tableProcessTime, partitionProcessTime); - HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset); - hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable())); - hiveWorkUnit.setTableLocation(hiveDataset.getTable().getSd().getLocation()); - hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(sourcePartition)); - hiveWorkUnit.setPartitionName(sourcePartition.getName()); - hiveWorkUnit.setPartitionLocation(sourcePartition.getLocation()); - hiveWorkUnit.setPartitionKeys(sourcePartition.getTable().getPartitionKeys()); + HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset, sourcePartition); hiveWorkUnit.setWatermarkInterval(new WatermarkInterval(lowWatermark, expectedPartitionHighWatermark)); EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime, lowWatermark.getValue(), this.beginGetWorkunitsTime); + workunits.add(hiveWorkUnit); log.info(String.format("Creating workunit for partition %s as updateTime %s is greater than low watermark %s", sourcePartition.getCompleteName(), updateTime, lowWatermark.getValue())); @@ -348,6 +352,15 @@ public class HiveSource implements Source { } } + protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, Partition partition) throws IOException { + HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset, partition); + if (isAvro(hiveDataset.getTable())) { + hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable())); + hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(partition)); + } + return hiveWorkUnit; + } + /*** * Check if path of Hive entity (table / partition) contains location token that should be ignored. If so, ignore * the partition. @@ -447,4 +460,8 @@ public class HiveSource implements Source { } } } + + private boolean isAvro(Table table) { + return AvroSerDe.class.getName().equals(table.getSd().getSerdeInfo().getSerializationLib()); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java index e5d1a2e..326fdd8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java @@ -20,6 +20,7 @@ import java.lang.reflect.Type; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import com.google.common.base.Optional; @@ -29,6 +30,7 @@ import com.google.gson.Gson; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.data.management.copy.hive.HiveDataset; import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.hadoop.hive.ql.metadata.Partition; /** @@ -68,6 +70,16 @@ public class HiveWorkUnit extends WorkUnit { public HiveWorkUnit(HiveDataset hiveDataset) { super(); setHiveDataset(hiveDataset); + if (hiveDataset.getTable().getTableType() != TableType.VIRTUAL_VIEW) { + setTableLocation(hiveDataset.getTable().getSd().getLocation()); + } + } + + public HiveWorkUnit(HiveDataset hiveDataset, Partition partition) { + this(hiveDataset); + setPartitionName(partition.getName()); + setPartitionLocation(partition.getLocation()); + setPartitionKeys(partition.getTable().getPartitionKeys()); } /**
