Repository: incubator-gobblin Updated Branches: refs/heads/master 30990f485 -> 5fa983268
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java index 119ef1c..a08f61a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java @@ -28,6 +28,7 @@ import static java.util.stream.Collectors.joining; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.gobblin.data.management.conversion.hive.entities.HiveProcessingEntity; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,16 +44,18 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.base.Throwables; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.DataConversionException; -import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiveConversionEntity; import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; import org.apache.gobblin.data.management.copy.hive.HiveUtils; import org.apache.gobblin.hive.HiveMetastoreClientPool; import org.apache.gobblin.util.AutoReturnableObject; +import lombok.AllArgsConstructor; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -62,6 +65,19 @@ import lombok.extern.slf4j.Slf4j; */ public class HiveConverterUtils { + @AllArgsConstructor + @Getter + public static enum StorageFormat { + TEXT_FILE("TEXTFILE"), + SEQUENCE_FILE("SEQUENCEFILE"), + ORC("ORC"), + PARQUET("PARQUET"), + AVRO("AVRO"), + RC_FILE("RCFILE"); + + private final String hiveName; + } + /*** * Subdirectory within destination table directory to publish data */ @@ -136,6 +152,44 @@ public class HiveConverterUtils { } /** + * Generates a CTAS statement to dump the contents of a table / partition into a new table. + * @param outputDbAndTable output db and table where contents should be written. + * @param sourceEntity source table / partition. + * @param partitionDMLInfo map of partition values. + * @param storageFormat format of output table. + * @param outputTableLocation location where files of output table should be written. + */ + public static String generateStagingCTASStatementFromSelectStar(HiveDatasetFinder.DbAndTable outputDbAndTable, + HiveDatasetFinder.DbAndTable sourceEntity, Map<String, String> partitionDMLInfo, + StorageFormat storageFormat, String outputTableLocation) { + StringBuilder sourceQueryBuilder = new StringBuilder("SELECT * FROM `").append(sourceEntity.getDb()) + .append("`.`").append(sourceEntity.getTable()).append("`"); + if (partitionDMLInfo != null && !partitionDMLInfo.isEmpty()) { + sourceQueryBuilder.append(" WHERE "); + sourceQueryBuilder.append(partitionDMLInfo.entrySet().stream() + .map(e -> "`" + e.getKey() + "`='" + e.getValue() + "'") + .collect(joining(" AND "))); + } + return generateStagingCTASStatement(outputDbAndTable, sourceQueryBuilder.toString(), storageFormat, outputTableLocation); + } + + /** + * Generates a CTAS statement to dump the results of a query into a new table. + * @param outputDbAndTable output db and table where contents should be written. + * @param sourceQuery query to materialize. + * @param storageFormat format of output table. + * @param outputTableLocation location where files of output table should be written. + */ + public static String generateStagingCTASStatement(HiveDatasetFinder.DbAndTable outputDbAndTable, + String sourceQuery, StorageFormat storageFormat, String outputTableLocation) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(outputDbAndTable.getDb()) && + !Strings.isNullOrEmpty(outputDbAndTable.getTable()), "Invalid output db and table " + outputDbAndTable); + + return String.format("CREATE TEMPORARY TABLE `%s`.`%s` STORED AS %s LOCATION '%s' AS %s", outputDbAndTable.getDb(), + outputDbAndTable.getTable(), storageFormat.getHiveName(), outputTableLocation, sourceQuery); + } + + /** * Fills data from input table into output table. * @param inputTblName input hive table name * @param outputTblName output hive table name @@ -160,8 +214,10 @@ public class HiveConverterUtils { // Insert query dmlQuery.append(String.format("INSERT OVERWRITE TABLE `%s`.`%s` %n", outputDbName, outputTblName)); - // Partition details - dmlQuery.append(partitionKeyValues(optionalPartitionDMLInfo)); + if (optionalPartitionDMLInfo.isPresent() && optionalPartitionDMLInfo.get().size() > 0) { + // Partition details + dmlQuery.append(partitionKeyValues(optionalPartitionDMLInfo)); + } dmlQuery.append(String.format("SELECT * FROM `%s`.`%s`", inputDbName, inputTblName)); if (optionalPartitionDMLInfo.isPresent()) { @@ -192,15 +248,15 @@ public class HiveConverterUtils { * @param partitionsDDLInfo partition type information, to be filled by this method * @param partitionsDMLInfo partition key-value pair, to be filled by this method */ - public static void populatePartitionInfo(QueryBasedHiveConversionEntity conversionEntity, Map<String, String> partitionsDDLInfo, + public static void populatePartitionInfo(HiveProcessingEntity conversionEntity, Map<String, String> partitionsDDLInfo, Map<String, String> partitionsDMLInfo) { String partitionsInfoString = null; String partitionsTypeString = null; - if (conversionEntity.getHivePartition().isPresent()) { - partitionsInfoString = conversionEntity.getHivePartition().get().getName(); - partitionsTypeString = conversionEntity.getHivePartition().get().getSchema().getProperty("partition_columns.types"); + if (conversionEntity.getPartition().isPresent()) { + partitionsInfoString = conversionEntity.getPartition().get().getName(); + partitionsTypeString = conversionEntity.getPartition().get().getSchema().getProperty("partition_columns.types"); } if (StringUtils.isNotBlank(partitionsInfoString) || StringUtils.isNotBlank(partitionsTypeString)) { @@ -235,7 +291,7 @@ public class HiveConverterUtils { * @param conversionEntity conversion entity used to get source directory permissions * @param workUnit workunit */ - public static void createStagingDirectory(FileSystem fs, String destination, QueryBasedHiveConversionEntity conversionEntity, + public static void createStagingDirectory(FileSystem fs, String destination, HiveProcessingEntity conversionEntity, WorkUnitState workUnit) { /* * Create staging data location with the same permissions as source data location @@ -250,18 +306,26 @@ public class HiveConverterUtils { */ Path destinationPath = new Path(destination); try { - FileStatus sourceDataFileStatus = fs.getFileStatus(conversionEntity.getHiveTable().getDataLocation()); - FsPermission sourceDataPermission = sourceDataFileStatus.getPermission(); - if (!fs.mkdirs(destinationPath, sourceDataPermission)) { + FsPermission permission; + String group = null; + if (conversionEntity.getTable().getDataLocation() != null) { + FileStatus sourceDataFileStatus = fs.getFileStatus(conversionEntity.getTable().getDataLocation()); + permission = sourceDataFileStatus.getPermission(); + group = sourceDataFileStatus.getGroup(); + } else { + permission = FsPermission.getDefault(); + } + + if (!fs.mkdirs(destinationPath, permission)) { throw new RuntimeException(String.format("Failed to create path %s with permissions %s", - destinationPath, sourceDataPermission)); + destinationPath, permission)); } else { - fs.setPermission(destinationPath, sourceDataPermission); + fs.setPermission(destinationPath, permission); // Set the same group as source - if (!workUnit.getPropAsBoolean(HIVE_DATASET_DESTINATION_SKIP_SETGROUP, DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP)) { - fs.setOwner(destinationPath, null, sourceDataFileStatus.getGroup()); + if (group != null && !workUnit.getPropAsBoolean(HIVE_DATASET_DESTINATION_SKIP_SETGROUP, DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP)) { + fs.setOwner(destinationPath, null, group); } - log.info(String.format("Created %s with permissions %s and group %s", destinationPath, sourceDataPermission, sourceDataFileStatus.getGroup())); + log.info(String.format("Created %s with permissions %s and group %s", destinationPath, permission, group)); } } catch (IOException e) { Throwables.propagate(e); @@ -275,12 +339,12 @@ public class HiveConverterUtils { * such as hourly or daily. * @return Partition directory name. */ - public static String getStagingDataPartitionDirName(QueryBasedHiveConversionEntity conversionEntity, + public static String getStagingDataPartitionDirName(HiveProcessingEntity conversionEntity, List<String> sourceDataPathIdentifier) { - if (conversionEntity.getHivePartition().isPresent()) { + if (conversionEntity.getPartition().isPresent()) { StringBuilder dirNamePrefix = new StringBuilder(); - String sourceHivePartitionLocation = conversionEntity.getHivePartition().get().getDataLocation().toString(); + String sourceHivePartitionLocation = conversionEntity.getPartition().get().getDataLocation().toString(); if (null != sourceDataPathIdentifier && null != sourceHivePartitionLocation) { for (String hint : sourceDataPathIdentifier) { if (sourceHivePartitionLocation.toLowerCase().contains(hint.toLowerCase())) { @@ -289,7 +353,7 @@ public class HiveConverterUtils { } } - return dirNamePrefix + conversionEntity.getHivePartition().get().getName(); + return dirNamePrefix + conversionEntity.getPartition().get().getName(); } else { return StringUtils.EMPTY; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java deleted file mode 100644 index 2e370c0..0000000 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.data.management.conversion.hive.task; - -import java.util.List; - -import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; -import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; -import org.apache.gobblin.runtime.TaskContext; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j - -/** - * A simple {@link HiveTask} for Hive view materialization. - */ -public class HiveMaterializer extends HiveTask { - - private final QueryGenerator queryGenerator; - - public HiveMaterializer(TaskContext taskContext) throws Exception { - super(taskContext); - this.queryGenerator = new HiveMaterializerQueryGenerator(this.workUnitState); - if (!(workUnit.getHiveDataset() instanceof ConvertibleHiveDataset)) { - throw new IllegalStateException("HiveConvertExtractor is only compatible with ConvertibleHiveDataset"); - } - } - - @Override - public List<String> generateHiveQueries() { - return queryGenerator.generateQueries(); - } - - @Override - public QueryBasedHivePublishEntity generatePublishQueries() throws Exception { - return queryGenerator.generatePublishQueries(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java deleted file mode 100644 index 4eee0e0..0000000 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.data.management.conversion.hive.task; - -import java.util.Map; -import java.util.List; - -import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter; -import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Table; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import org.apache.gobblin.configuration.WorkUnitState; -import org.apache.gobblin.converter.DataConversionException; -import org.apache.gobblin.data.management.conversion.hive.avro.AvroSchemaManager; -import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; -import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiveConversionEntity; -import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; -import org.apache.gobblin.data.management.conversion.hive.entities.SchemaAwareHivePartition; -import org.apache.gobblin.data.management.conversion.hive.entities.SchemaAwareHiveTable; -import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils; -import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator; -import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; -import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; -import org.apache.gobblin.hive.HiveMetastoreClientPool; -import org.apache.gobblin.util.AutoReturnableObject; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -/** - * A simple query generator for {@link HiveMaterializer}. - */ -public class HiveMaterializerQueryGenerator implements QueryGenerator { - private final FileSystem fs; - private final ConvertibleHiveDataset.ConversionConfig conversionConfig; - private final ConvertibleHiveDataset hiveDataset; - private final String inputDbName; - private final String inputTableName; - private final String outputDatabaseName; - private final String outputTableName; - private final String outputDataLocation; - private final String stagingTableName; - private final String stagingDataLocation; - private final List<String> sourceDataPathIdentifier; - private final String stagingDataPartitionDirName; - private final String stagingDataPartitionLocation; - private final Map<String, String> partitionsDDLInfo; - private final Map<String, String> partitionsDMLInfo; - private final Optional<Table> destinationTableMeta; - private final HiveWorkUnit workUnit; - private final HiveMetastoreClientPool pool; - private final QueryBasedHiveConversionEntity conversionEntity; - private final WorkUnitState workUnitState; - - public HiveMaterializerQueryGenerator(WorkUnitState workUnitState) throws Exception { - this.workUnitState = workUnitState; - this.workUnit = new HiveWorkUnit(workUnitState.getWorkunit()); - this.hiveDataset = (ConvertibleHiveDataset) workUnit.getHiveDataset(); - this.inputDbName = hiveDataset.getDbAndTable().getDb(); - this.inputTableName = hiveDataset.getDbAndTable().getTable(); - this.fs = HiveSource.getSourceFs(workUnitState); - this.conversionConfig = hiveDataset.getConversionConfigForFormat("sameAsSource").get(); - this.outputDatabaseName = conversionConfig.getDestinationDbName(); - this.outputTableName = conversionConfig.getDestinationTableName(); - this.outputDataLocation = HiveConverterUtils.getOutputDataLocation(conversionConfig.getDestinationDataPath()); - this.stagingTableName = HiveConverterUtils.getStagingTableName(conversionConfig.getDestinationStagingTableName()); - this.stagingDataLocation = HiveConverterUtils.getStagingDataLocation(conversionConfig.getDestinationDataPath(), stagingTableName); - this.sourceDataPathIdentifier = conversionConfig.getSourceDataPathIdentifier(); - this.pool = HiveMetastoreClientPool.get(workUnitState.getJobState().getProperties(), - Optional.fromNullable(workUnitState.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY))); - this.conversionEntity = getConversionEntity(); - this.stagingDataPartitionDirName = HiveConverterUtils.getStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier); - this.stagingDataPartitionLocation = stagingDataLocation + Path.SEPARATOR + stagingDataPartitionDirName; - this.partitionsDDLInfo = Maps.newHashMap(); - this.partitionsDMLInfo = Maps.newHashMap(); - HiveConverterUtils.populatePartitionInfo(conversionEntity, partitionsDDLInfo, partitionsDMLInfo); - this.destinationTableMeta = HiveConverterUtils.getDestinationTableMeta(outputDatabaseName, - outputTableName, workUnitState.getProperties()).getLeft(); - } - - /** - * Returns hive queries to be run as a part of a hive task. - * This does not include publish queries. - * @return - */ - @Override - public List<String> generateQueries() { - - List<String> hiveQueries = Lists.newArrayList(); - - Preconditions.checkNotNull(this.workUnit, "Workunit must not be null"); - EventWorkunitUtils.setBeginDDLBuildTimeMetadata(this.workUnit, System.currentTimeMillis()); - - HiveConverterUtils.createStagingDirectory(fs, conversionConfig.getDestinationDataPath(), - conversionEntity, this.workUnitState); - - // Create DDL statement for table - String createStagingTableDDL = - HiveConverterUtils.generateCreateDuplicateTableDDL( - inputDbName, - inputTableName, - stagingTableName, - stagingDataLocation, - Optional.of(outputDatabaseName)); - hiveQueries.add(createStagingTableDDL); - log.debug("Create staging table DDL:\n" + createStagingTableDDL); - - /* - * Setting partition mode to 'nonstrict' is needed to improve readability of the code. - * If we do not set dynamic partition mode to nonstrict, we will have to write partition values also, - * and because hive considers partition as a virtual column, we also have to write each of the column - * name in the query (in place of *) to match source and target columns. - */ - hiveQueries.add("SET hive.exec.dynamic.partition.mode=nonstrict"); - - String insertInStagingTableDML = - HiveConverterUtils - .generateTableCopy( - inputTableName, - stagingTableName, - conversionEntity.getHiveTable().getDbName(), - outputDatabaseName, - Optional.of(partitionsDMLInfo)); - hiveQueries.add(insertInStagingTableDML); - log.debug("Conversion staging DML: " + insertInStagingTableDML); - - log.info("Conversion Queries {}\n", hiveQueries); - - EventWorkunitUtils.setEndDDLBuildTimeMetadata(workUnit, System.currentTimeMillis()); - return hiveQueries; - } - - /** - * Retuens a QueryBasedHivePublishEntity which includes publish level queries and cleanup commands. - * @return QueryBasedHivePublishEntity - * @throws DataConversionException - */ - public QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException { - - QueryBasedHivePublishEntity publishEntity = new QueryBasedHivePublishEntity(); - List<String> publishQueries = publishEntity.getPublishQueries(); - Map<String, String> publishDirectories = publishEntity.getPublishDirectories(); - List<String> cleanupQueries = publishEntity.getCleanupQueries(); - List<String> cleanupDirectories = publishEntity.getCleanupDirectories(); - - String createFinalTableDDL = - HiveConverterUtils.generateCreateDuplicateTableDDL(inputDbName, inputTableName, outputTableName, - outputDataLocation, Optional.of(outputDatabaseName)); - publishQueries.add(createFinalTableDDL); - log.debug("Create final table DDL:\n" + createFinalTableDDL); - - if (partitionsDDLInfo.size() == 0) { - log.debug("Snapshot directory to move: " + stagingDataLocation + " to: " + outputDataLocation); - publishDirectories.put(stagingDataLocation, outputDataLocation); - - String dropStagingTableDDL = HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName); - - log.debug("Drop staging table DDL: " + dropStagingTableDDL); - cleanupQueries.add(dropStagingTableDDL); - - log.debug("Staging table directory to delete: " + stagingDataLocation); - cleanupDirectories.add(stagingDataLocation); - } else { - String finalDataPartitionLocation = outputDataLocation + Path.SEPARATOR + stagingDataPartitionDirName; - Optional<Path> destPartitionLocation = - HiveConverterUtils.getDestinationPartitionLocation(destinationTableMeta, this.workUnitState, - conversionEntity.getHivePartition().get().getName()); - finalDataPartitionLocation = HiveConverterUtils.updatePartitionLocation(finalDataPartitionLocation, this.workUnitState, - destPartitionLocation); - - log.debug("Partition directory to move: " + stagingDataPartitionLocation + " to: " + finalDataPartitionLocation); - publishDirectories.put(stagingDataPartitionLocation, finalDataPartitionLocation); - List<String> dropPartitionsDDL = - HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName, partitionsDMLInfo); - log.debug("Drop partitions if exist in final table: " + dropPartitionsDDL); - publishQueries.addAll(dropPartitionsDDL); - List<String> createFinalPartitionDDL = - HiveAvroORCQueryGenerator.generateCreatePartitionDDL(outputDatabaseName, outputTableName, - finalDataPartitionLocation, partitionsDMLInfo, Optional.<String>absent()); - - log.debug("Create final partition DDL: " + createFinalPartitionDDL); - publishQueries.addAll(createFinalPartitionDDL); - - String dropStagingTableDDL = - HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName); - - log.debug("Drop staging table DDL: " + dropStagingTableDDL); - cleanupQueries.add(dropStagingTableDDL); - - log.debug("Staging table directory to delete: " + stagingDataLocation); - cleanupDirectories.add(stagingDataLocation); - } - - publishQueries.addAll(HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName, - AbstractAvroToOrcConverter.getDropPartitionsDDLInfo(conversionEntity))); - - log.info("Publish partition entity: " + publishEntity); - return publishEntity; - } - - private QueryBasedHiveConversionEntity getConversionEntity() throws Exception { - - - try (AutoReturnableObject<IMetaStoreClient> client = this.pool.getClient()) { - - Table table = client.get().getTable(this.inputDbName, this.inputTableName); - - SchemaAwareHiveTable schemaAwareHiveTable = new SchemaAwareHiveTable(table, AvroSchemaManager.getSchemaFromUrl(workUnit.getTableSchemaUrl(), fs)); - - SchemaAwareHivePartition schemaAwareHivePartition = null; - - if (workUnit.getPartitionName().isPresent() && workUnit.getPartitionSchemaUrl().isPresent()) { - org.apache.hadoop.hive.metastore.api.Partition - partition = client.get().getPartition(this.inputDbName, this.inputTableName, workUnit.getPartitionName().get()); - schemaAwareHivePartition = - new SchemaAwareHivePartition(table, partition, AvroSchemaManager.getSchemaFromUrl(workUnit.getPartitionSchemaUrl().get(), fs)); - } - return new QueryBasedHiveConversionEntity(this.hiveDataset, schemaAwareHiveTable, Optional.fromNullable(schemaAwareHivePartition)); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java deleted file mode 100644 index d1c9371..0000000 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.data.management.conversion.hive.task; - -import java.util.List; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.SourceState; -import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder; -import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; -import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker; -import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; -import org.apache.gobblin.runtime.task.TaskUtils; -import org.apache.gobblin.source.workunit.WorkUnit; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j - -/** - * A simple HiveSource for {@link HiveMaterializer}. - */ -public class HiveMaterializerSource extends HiveSource { - - @Override - public List<WorkUnit> getWorkunits(SourceState state) { - if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) { - state.setProp(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY, ConvertibleHiveDatasetFinder.class.getName()); - } - if (!state.contains(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY)) { - state.setProp(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, "hive.conversion.avro"); - } - - List<WorkUnit> workUnits = super.getWorkunits(state); - - for(WorkUnit workUnit : workUnits) { - if (Boolean.valueOf(workUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY))) { - log.info("Ignoring Watermark workunit for {}", workUnit.getProp(ConfigurationKeys.DATASET_URN_KEY)); - continue; - } - TaskUtils.setTaskFactoryClass(workUnit, HiveMaterializerTaskFactory.class); - } - return workUnits; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java deleted file mode 100644 index c05d4bf..0000000 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.data.management.conversion.hive.task; - -import org.apache.gobblin.publisher.DataPublisher; -import org.apache.gobblin.publisher.NoopPublisher; -import org.apache.gobblin.runtime.JobState; -import org.apache.gobblin.runtime.TaskContext; -import org.apache.gobblin.runtime.task.TaskFactory; -import org.apache.gobblin.runtime.task.TaskIFace; - -/** - * A {@link TaskFactory} that runs a {@link HiveMaterializer} task. - * This factory is intended to publish data in the task directly, and - * uses a {@link NoopPublisher}. - */ -public class HiveMaterializerTaskFactory implements TaskFactory { - @Override - public TaskIFace createTask(TaskContext taskContext) { - try { - return new HiveMaterializer(taskContext); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public DataPublisher createDataPublisher(JobState.DatasetState datasetState) { - return new NoopPublisher(datasetState); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java index aabbb6e..16a2028 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java @@ -25,9 +25,11 @@ import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity; import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; @@ -52,6 +54,15 @@ import lombok.extern.slf4j.Slf4j; * which creates extract/write level queries and publish level queries respectively. */ public abstract class HiveTask extends BaseAbstractTask { + private static final String USE_WATERMARKER_KEY = "internal.hiveTask.useWatermarker"; + + /** + * Disable Hive watermarker. This is necessary when there is no concrete source table where watermark can be inferred. + */ + public static void disableHiveWatermarker(State state) { + state.setProp(USE_WATERMARKER_KEY, Boolean.toString(false)); + } + protected final TaskContext taskContext; protected final WorkUnitState workUnitState; protected final HiveWorkUnit workUnit; @@ -114,11 +125,8 @@ public abstract class HiveTask extends BaseAbstractTask { for (Map.Entry<String, String> publishDir : publishDirectories.entrySet()) { HadoopUtils.renamePath(fs, new Path(publishDir.getKey()), new Path(publishDir.getValue()), true); } - } catch (RuntimeException re) { - throw re; - } - catch (Exception e) { - log.error("error in move dir"); + } catch (Throwable t) { + throw Throwables.propagate(t); } } @@ -132,11 +140,12 @@ public abstract class HiveTask extends BaseAbstractTask { wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED); - HiveSourceWatermarker watermarker = GobblinConstructorUtils.invokeConstructor( - HiveSourceWatermarkerFactory.class, wus.getProp(HiveSource.HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY, - HiveSource.DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS)).createFromState(wus); + if (wus.getPropAsBoolean(USE_WATERMARKER_KEY, true)) { + HiveSourceWatermarker watermarker = GobblinConstructorUtils.invokeConstructor(HiveSourceWatermarkerFactory.class, + wus.getProp(HiveSource.HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY, HiveSource.DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS)).createFromState(wus); - watermarker.setActualHighWatermark(wus); + watermarker.setActualHighWatermark(wus); + } } catch (RuntimeException re) { throw re; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java index 1502b06..8c7357b 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java @@ -25,7 +25,7 @@ import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiv /** * An interface for generating queries. */ -interface QueryGenerator { +public interface QueryGenerator { /** * Generates queries to extract/convert/write data http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java index 94c427c..fa15459 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java @@ -73,7 +73,7 @@ public class HiveQueryExecutionWriter implements DataWriter<QueryBasedHiveConver * Method to add properties needed by publisher to preserve partition params */ private void addPropsForPublisher(QueryBasedHiveConversionEntity hiveConversionEntity) { - if (!hiveConversionEntity.getHivePartition().isPresent()) { + if (!hiveConversionEntity.getPartition().isPresent()) { return; } ConvertibleHiveDataset convertibleHiveDataset = hiveConversionEntity.getConvertibleHiveDataset(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java index 9d6bd32..eee2005 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java @@ -66,7 +66,7 @@ public class HiveSourceTest { SourceState testState = getTestState(dbName); - this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, tableSdLoc, Optional.<String> absent()); + this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName, tableSdLoc, Optional.<String> absent()); List<WorkUnit> workUnits = hiveSource.getWorkunits(testState); @@ -92,7 +92,7 @@ public class HiveSourceTest { SourceState testState = getTestState(dbName); - Table tbl = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, tableSdLoc, Optional.of("field")); + Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName, tableSdLoc, Optional.of("field")); this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("f1"), (int) System.currentTimeMillis()); @@ -126,8 +126,8 @@ public class HiveSourceTest { this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, false, true, true); - this.hiveMetastoreTestUtils.createTestTable(dbName, tableName1, tableSdLoc1, Optional.<String> absent()); - this.hiveMetastoreTestUtils.createTestTable(dbName, tableName2, tableSdLoc2, Optional.<String> absent(), true); + this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName1, tableSdLoc1, Optional.<String> absent()); + this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName2, tableSdLoc2, Optional.<String> absent(), true); List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java index 32c6af8..5d684b4 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.apache.thrift.TException; import com.google.common.base.Optional; @@ -93,12 +94,12 @@ public class LocalHiveMetastoreTestUtils { } } - public Table createTestTable(String dbName, String tableName, String tableSdLoc, Optional<String> partitionFieldName) + public Table createTestAvroTable(String dbName, String tableName, String tableSdLoc, Optional<String> partitionFieldName) throws Exception { - return createTestTable(dbName, tableName, tableSdLoc, partitionFieldName, false); + return createTestAvroTable(dbName, tableName, tableSdLoc, partitionFieldName, false); } - public Table createTestTable(String dbName, String tableName, String tableSdLoc, + public Table createTestAvroTable(String dbName, String tableName, String tableSdLoc, Optional<String> partitionFieldName, boolean ignoreDbCreation) throws Exception { if (!ignoreDbCreation) { createTestDb(dbName); @@ -106,6 +107,7 @@ public class LocalHiveMetastoreTestUtils { Table tbl = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(dbName, tableName); tbl.getSd().setLocation(tableSdLoc); + tbl.getSd().getSerdeInfo().setSerializationLib(AvroSerDe.class.getName()); tbl.getSd().getSerdeInfo().setParameters(ImmutableMap.of(HiveAvroSerDeManager.SCHEMA_URL, "/tmp/dummy")); if (partitionFieldName.isPresent()) { @@ -117,11 +119,11 @@ public class LocalHiveMetastoreTestUtils { return tbl; } - public Table createTestTable(String dbName, String tableName, List<String> partitionFieldNames) throws Exception { - return createTestTable(dbName, tableName, "/tmp/" + tableName, partitionFieldNames, true); + public Table createTestAvroTable(String dbName, String tableName, List<String> partitionFieldNames) throws Exception { + return createTestAvroTable(dbName, tableName, "/tmp/" + tableName, partitionFieldNames, true); } - public Table createTestTable(String dbName, String tableName, String tableSdLoc, + public Table createTestAvroTable(String dbName, String tableName, String tableSdLoc, List<String> partitionFieldNames, boolean ignoreDbCreation) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java index 7e38841..893e13c 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java @@ -68,7 +68,7 @@ public class HiveAvroToOrcConverterTest { this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, false, true, true); - Table table = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, tableSdLoc, Optional.<String> absent()); + Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName, tableSdLoc, Optional.<String> absent()); Schema schema = ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir, "recordWithinRecordWithinRecord_nested.json"); WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, tableName, 0); @@ -120,7 +120,7 @@ public class HiveAvroToOrcConverterTest { this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, false, true, true); - Table table = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, tableSdLoc, Optional.<String> absent()); + Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName, tableSdLoc, Optional.<String> absent()); Schema schema = ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir, "recordWithinRecordWithinRecord_nested.json"); WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, tableName, 0); wus.getJobState().setProp("orc.table.flatten.schema", "false"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java new file mode 100644 index 0000000..d237a7b --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.conversion.hive.materializer; + +import java.io.File; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.conversion.hive.LocalHiveMetastoreTestUtils; +import org.apache.gobblin.data.management.conversion.hive.entities.TableLikeStageableTableMetadata; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; +import org.apache.gobblin.data.management.conversion.hive.task.HiveTask; +import org.apache.gobblin.data.management.copy.hive.HiveDataset; +import org.apache.gobblin.hive.HiveMetastoreClientPool; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.AutoReturnableObject; +import org.apache.gobblin.util.HiveJdbcConnector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.io.Files; + + +public class HiveMaterializerTest { + + private final LocalHiveMetastoreTestUtils localHiveMetastore = LocalHiveMetastoreTestUtils.getInstance(); + private final String dbName = HiveMaterializerTest.class.getSimpleName(); + private final String sourceTableName = "source"; + private final String partitionColumn = "part"; + private File dataFile; + private HiveJdbcConnector jdbcConnector; + private HiveDataset dataset; + private HiveMetastoreClientPool pool; + + @BeforeClass + public void setup() throws Exception { + this.jdbcConnector = HiveJdbcConnector.newEmbeddedConnector(2); + this.dataFile = new File(getClass().getClassLoader().getResource("hiveMaterializerTest/source/").toURI()); + this.localHiveMetastore.dropDatabaseIfExists(this.dbName); + this.localHiveMetastore.createTestDb(this.dbName); + this.jdbcConnector.executeStatements( + String.format("CREATE EXTERNAL TABLE %s.%s (id STRING, name String) PARTITIONED BY (%s String) " + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE", + this.dbName, this.sourceTableName, this.partitionColumn), + String.format("ALTER TABLE %s.%s ADD PARTITION (part = 'part1') LOCATION '%s'", + this.dbName, this.sourceTableName, this.dataFile.getAbsolutePath() + "/part1"), + String.format("ALTER TABLE %s.%s ADD PARTITION (part = 'part2') LOCATION '%s'", + this.dbName, this.sourceTableName, this.dataFile.getAbsolutePath() + "/part2")); + + List<List<String>> allTable = executeStatementAndGetResults(this.jdbcConnector, + String.format("SELECT * FROM %s.%s", this.dbName, this.sourceTableName), 3); + Assert.assertEquals(allTable.size(), 8); + List<List<String>> part1 = executeStatementAndGetResults(this.jdbcConnector, + String.format("SELECT * FROM %s.%s WHERE %s='part1'", this.dbName, this.sourceTableName, this.partitionColumn), 3); + Assert.assertEquals(part1.size(), 4); + + this.pool = HiveMetastoreClientPool.get(new Properties(), Optional.absent()); + Table table; + try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) { + table = new Table(client.get().getTable(this.dbName, this.sourceTableName)); + } + this.dataset = new HiveDataset(FileSystem.getLocal(new Configuration()), pool, table, new Properties()); + } + + @AfterClass + public void teardown() throws Exception { + if (this.jdbcConnector != null) { + this.jdbcConnector.close(); + } + } + + @Test + public void testCopyTable() throws Exception { + String destinationTable = "copyTable"; + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + WorkUnit workUnit = HiveMaterializer.tableCopyWorkUnit(this.dataset, new TableLikeStageableTableMetadata(this.dataset.getTable(), + this.dbName, destinationTable, tmpDir.getAbsolutePath()), String.format("%s=part1", this.partitionColumn)); + + HiveMaterializer hiveMaterializer = new HiveMaterializer(getTaskContextForRun(workUnit)); + hiveMaterializer.run(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + hiveMaterializer.commit(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + + List<List<String>> allTable = executeStatementAndGetResults(this.jdbcConnector, + String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 3); + Assert.assertEquals(allTable.size(), 4); + Assert.assertEquals(allTable.stream().map(l -> l.get(0)).collect(Collectors.toList()), Lists.newArrayList("101", "102", "103", "104")); + } + + @Test + public void testMaterializeTable() throws Exception { + String destinationTable = "materializeTable"; + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + WorkUnit workUnit = HiveMaterializer.viewMaterializationWorkUnit(this.dataset, HiveConverterUtils.StorageFormat.AVRO, + new TableLikeStageableTableMetadata(this.dataset.getTable(), this.dbName, destinationTable, tmpDir.getAbsolutePath()), null); + + HiveMaterializer hiveMaterializer = new HiveMaterializer(getTaskContextForRun(workUnit)); + hiveMaterializer.run(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + hiveMaterializer.commit(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + + List<List<String>> allTable = executeStatementAndGetResults(this.jdbcConnector, + String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 3); + Assert.assertEquals(allTable.size(), 8); + Assert.assertEquals(allTable.stream().map(l -> l.get(0)).collect(Collectors.toList()), + Lists.newArrayList("101", "102", "103", "104", "201", "202", "203", "204")); + } + + @Test + public void testMaterializeTablePartition() throws Exception { + String destinationTable = "materializeTablePartition"; + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + WorkUnit workUnit = HiveMaterializer.viewMaterializationWorkUnit(this.dataset, HiveConverterUtils.StorageFormat.AVRO, + new TableLikeStageableTableMetadata(this.dataset.getTable(), this.dbName, destinationTable, tmpDir.getAbsolutePath()), + String.format("%s=part1", this.partitionColumn)); + + HiveMaterializer hiveMaterializer = new HiveMaterializer(getTaskContextForRun(workUnit)); + hiveMaterializer.run(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + hiveMaterializer.commit(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + + List<List<String>> allTable = executeStatementAndGetResults(this.jdbcConnector, + String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 3); + Assert.assertEquals(allTable.size(), 4); + Assert.assertEquals(allTable.stream().map(l -> l.get(0)).collect(Collectors.toList()), + Lists.newArrayList("101", "102", "103", "104")); + } + + @Test + public void testMaterializeView() throws Exception { + String destinationTable = "materializeView"; + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + + String viewName = "myView"; + + this.jdbcConnector.executeStatements(String.format("CREATE VIEW %s.%s AS SELECT * FROM %s.%s WHERE name = 'foo'", + this.dbName, viewName, this.dbName, this.sourceTableName)); + + Table view; + try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) { + view = new Table(client.get().getTable(this.dbName, viewName)); + } + HiveDataset viewDataset = new HiveDataset(FileSystem.getLocal(new Configuration()), pool, view, new Properties()); + + WorkUnit workUnit = HiveMaterializer.viewMaterializationWorkUnit(viewDataset, HiveConverterUtils.StorageFormat.AVRO, + new TableLikeStageableTableMetadata(viewDataset.getTable(), this.dbName, destinationTable, tmpDir.getAbsolutePath()), + null); + + HiveMaterializer hiveMaterializer = new HiveMaterializer(getTaskContextForRun(workUnit)); + hiveMaterializer.run(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + hiveMaterializer.commit(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + + List<List<String>> allTable = executeStatementAndGetResults(this.jdbcConnector, + String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 3); + Assert.assertEquals(allTable.size(), 4); + Assert.assertEquals(allTable.stream().map(l -> l.get(0)).collect(Collectors.toList()), + Lists.newArrayList("101", "103", "201", "203")); + } + + @Test + public void testMaterializeQuery() throws Exception { + String destinationTable = "materializeQuery"; + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + + WorkUnit workUnit = HiveMaterializer.queryResultMaterializationWorkUnit( + String.format("SELECT * FROM %s.%s WHERE name = 'foo'", this.dbName, this.sourceTableName), + HiveConverterUtils.StorageFormat.AVRO, + new TableLikeStageableTableMetadata(this.dataset.getTable(), this.dbName, destinationTable, tmpDir.getAbsolutePath())); + + HiveMaterializer hiveMaterializer = new HiveMaterializer(getTaskContextForRun(workUnit)); + hiveMaterializer.run(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + hiveMaterializer.commit(); + Assert.assertEquals(hiveMaterializer.getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL); + + List<List<String>> allTable = executeStatementAndGetResults(this.jdbcConnector, + String.format("SELECT * FROM %s.%s", this.dbName, destinationTable), 3); + Assert.assertEquals(allTable.size(), 4); + Assert.assertEquals(allTable.stream().map(l -> l.get(0)).collect(Collectors.toList()), + Lists.newArrayList("101", "103", "201", "203")); + } + + private TaskContext getTaskContextForRun(WorkUnit workUnit) { + workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, "job123"); + workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "task123"); + workUnit.setProp(HiveConverterUtils.HIVE_DATASET_DESTINATION_SKIP_SETGROUP, Boolean.toString(true)); + HiveTask.disableHiveWatermarker(workUnit); + JobState jobState = new JobState("job", "job123"); + return new TaskContext(new WorkUnitState(workUnit, jobState)); + } + + private List<List<String>> executeStatementAndGetResults(HiveJdbcConnector connector, String query, int columns) throws SQLException { + Connection conn = connector.getConnection(); + List<List<String>> result = new ArrayList<>(); + + try (Statement stmt = conn.createStatement()) { + stmt.execute(query); + ResultSet rs = stmt.getResultSet(); + while (rs.next()) { + List<String> thisResult = new ArrayList<>(); + for (int i = 0; i < columns; i++) { + thisResult.add(rs.getString(i + 1)); + } + result.add(thisResult); + } + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java index 281ee62..8a7d37e 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java @@ -28,7 +28,6 @@ import java.util.Map; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.tools.ant.taskdefs.Local; import org.joda.time.DateTime; import org.mockito.Mockito; import org.testng.Assert; @@ -423,7 +422,7 @@ public class PartitionLevelWatermarkerTest { File tableSdFile = Files.createTempDir(); tableSdFile.deleteOnExit(); return new Table(LocalHiveMetastoreTestUtils.getInstance() - .createTestTable(dbName, name, tableSdFile.getAbsolutePath(), + .createTestAvroTable(dbName, name, tableSdFile.getAbsolutePath(), partitioned ? Optional.of("part") : Optional.<String>absent())); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java index 2db01c3..fdd5f44 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/HiveRetentionTest.java @@ -74,14 +74,14 @@ public class HiveRetentionTest { // Setup db, table to purge. Creating 4 partitions. 2 will be deleted and 2 will be retained String purgedTableSdLoc = new Path(testTempPath, purgedDbName + purgedTableName).toString(); this.hiveMetastoreTestUtils.dropDatabaseIfExists(purgedDbName); - final Table purgedTbl = this.hiveMetastoreTestUtils.createTestTable(purgedDbName, purgedTableName, purgedTableSdLoc, ImmutableList.of("datepartition"), false); + final Table purgedTbl = this.hiveMetastoreTestUtils.createTestAvroTable(purgedDbName, purgedTableName, purgedTableSdLoc, ImmutableList.of("datepartition"), false); // Setup db, table and partitions to act as replacement partitions source String replacementSourceTableSdLoc = new Path(testTempPath, purgedDbName + purgedTableName + "_source").toString(); String replacementDbName = purgedDbName + "_source"; String replacementTableName = purgedTableName + "_source"; this.hiveMetastoreTestUtils.dropDatabaseIfExists(replacementDbName); - final Table replacementTbl = this.hiveMetastoreTestUtils.createTestTable(replacementDbName, replacementTableName, replacementSourceTableSdLoc, ImmutableList.of("datepartition"), false); + final Table replacementTbl = this.hiveMetastoreTestUtils.createTestAvroTable(replacementDbName, replacementTableName, replacementSourceTableSdLoc, ImmutableList.of("datepartition"), false); String deleted1 = "2016-01-01-00"; String deleted2 = "2016-01-02-02"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java index e028c34..d6285a9 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/DatePartitionedHiveVersionFinderTest.java @@ -75,7 +75,7 @@ public class DatePartitionedHiveVersionFinderTest { DatePartitionHiveVersionFinder versionFinder = new DatePartitionHiveVersionFinder(this.fs, ConfigFactory.empty()); String tableName = "VfTb1"; - Table tbl = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, ImmutableList.of("datepartition")); + Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName, ImmutableList.of("datepartition")); org.apache.hadoop.hive.metastore.api.Partition tp = this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("2016-01-01-20"), (int) System.currentTimeMillis()); Partition partition = new Partition(new org.apache.hadoop.hive.ql.metadata.Table(tbl), tp); @@ -95,7 +95,7 @@ public class DatePartitionedHiveVersionFinderTest { DatePartitionHiveVersionFinder versionFinder = new DatePartitionHiveVersionFinder(this.fs, conf); - Table tbl = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, ImmutableList.of("field1")); + Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName, ImmutableList.of("field1")); org.apache.hadoop.hive.metastore.api.Partition tp = this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("2016/01/01/20"), (int) System.currentTimeMillis()); Partition partition = new Partition(new org.apache.hadoop.hive.ql.metadata.Table(tbl), tp); @@ -109,7 +109,7 @@ public class DatePartitionedHiveVersionFinderTest { DatePartitionHiveVersionFinder versionFinder = new DatePartitionHiveVersionFinder(this.fs, ConfigFactory.empty()); String tableName = "VfTb3"; - Table tbl = this.hiveMetastoreTestUtils.createTestTable(dbName, tableName, ImmutableList.of("datepartition", "field1")); + Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName, ImmutableList.of("datepartition", "field1")); org.apache.hadoop.hive.metastore.api.Partition tp = this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("2016-01-01-20", "f1"), (int) System.currentTimeMillis()); Partition partition = new Partition(new org.apache.hadoop.hive.ql.metadata.Table(tbl), tp); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part1/data.txt ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part1/data.txt b/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part1/data.txt new file mode 100644 index 0000000..e195725 --- /dev/null +++ b/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part1/data.txt @@ -0,0 +1,4 @@ +101,foo +102,bar +103,foo +104,bar http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part2/data.txt ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part2/data.txt b/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part2/data.txt new file mode 100644 index 0000000..08e1734 --- /dev/null +++ b/gobblin-data-management/src/test/resources/hiveMaterializerTest/source/part2/data.txt @@ -0,0 +1,4 @@ +201,foo +202,bar +203,foo +204,bar http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-example/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-example/build.gradle b/gobblin-example/build.gradle index 0c77bce..d3cec0a 100644 --- a/gobblin-example/build.gradle +++ b/gobblin-example/build.gradle @@ -38,6 +38,7 @@ dependencies { compile externalDependency.httpcore compile externalDependency.httpclient compile externalDependency.commonsCli + compile externalDependency.hiveJdbc testCompile externalDependency.testng } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-example/src/main/java/org/apache/gobblin/example/hivematerializer/HiveMaterializerSource.java ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/java/org/apache/gobblin/example/hivematerializer/HiveMaterializerSource.java b/gobblin-example/src/main/java/org/apache/gobblin/example/hivematerializer/HiveMaterializerSource.java new file mode 100644 index 0000000..f7d9767 --- /dev/null +++ b/gobblin-example/src/main/java/org/apache/gobblin/example/hivematerializer/HiveMaterializerSource.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.example.hivematerializer; + +import java.io.IOException; +import java.util.List; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata; +import org.apache.gobblin.data.management.conversion.hive.materializer.HiveMaterializer; +import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils; +import org.apache.gobblin.data.management.conversion.hive.task.HiveTask; +import org.apache.gobblin.data.management.copy.hive.HiveDataset; +import org.apache.gobblin.hive.HiveMetastoreClientPool; +import org.apache.gobblin.source.Source; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.AutoReturnableObject; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.thrift.TException; + +import com.google.api.client.repackaged.com.google.common.base.Splitter; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; +import static org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder.*; + + +@Slf4j + +/** + * A sample source showing how to create work units for Hive materialization. This source allows copying of tables, + * materialization of views, and materialization of queries. + */ +public class HiveMaterializerSource implements Source<Object, Object> { + + private static final String HIVE_MATERIALIZER_SOURCE_PREFIX = "gobblin.hiveMaterializerSource"; + public static final String COPY_TABLE_KEY = HIVE_MATERIALIZER_SOURCE_PREFIX + ".copyTable"; + public static final String MATERIALIZE_VIEW = HIVE_MATERIALIZER_SOURCE_PREFIX + ".materializeView"; + public static final String MATERIALIZE_QUERY = HIVE_MATERIALIZER_SOURCE_PREFIX + ".materializeQuery"; + public static final String OUTPUT_STORAGE_FORMAT = HIVE_MATERIALIZER_SOURCE_PREFIX + ".outputStorageFormat"; + + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + try { + FileSystem fs = HadoopUtils.getSourceFileSystem(state); + Config config = ConfigUtils.propertiesToConfig(state.getProperties()); + + if (state.contains(COPY_TABLE_KEY)) { + HiveDataset dataset = getHiveDataset(state.getProp(COPY_TABLE_KEY), fs, state); + WorkUnit workUnit = HiveMaterializer.tableCopyWorkUnit(dataset, + new StageableTableMetadata(config.getConfig(HIVE_MATERIALIZER_SOURCE_PREFIX), dataset.getTable()), null); + HiveTask.disableHiveWatermarker(workUnit); + return Lists.newArrayList(workUnit); + } else if (state.contains(MATERIALIZE_VIEW)) { + HiveDataset dataset = getHiveDataset(state.getProp(MATERIALIZE_VIEW), fs, state); + WorkUnit workUnit = HiveMaterializer.viewMaterializationWorkUnit(dataset, getOutputStorageFormat(state), + new StageableTableMetadata(config.getConfig(HIVE_MATERIALIZER_SOURCE_PREFIX), dataset.getTable()), null); + HiveTask.disableHiveWatermarker(workUnit); + return Lists.newArrayList(workUnit); + } else if (state.contains(MATERIALIZE_QUERY)) { + String query = state.getProp(MATERIALIZE_QUERY); + WorkUnit workUnit = HiveMaterializer.queryResultMaterializationWorkUnit(query, getOutputStorageFormat(state), + new StageableTableMetadata(config.getConfig(HIVE_MATERIALIZER_SOURCE_PREFIX), null)); + HiveTask.disableHiveWatermarker(workUnit); + return Lists.newArrayList(workUnit); + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + throw new RuntimeException(String.format("Must specify either %s, %s, or %s.", COPY_TABLE_KEY, MATERIALIZE_QUERY, + MATERIALIZE_VIEW)); + } + + private HiveDataset getHiveDataset(String tableString, FileSystem fs, State state) throws IOException { + try { + HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(state.getProperties(), + Optional.fromNullable(state.getProp(HIVE_METASTORE_URI_KEY))); + + List<String> tokens = Splitter.on(".").splitToList(tableString); + DbAndTable sourceDbAndTable = new DbAndTable(tokens.get(0), tokens.get(1)); + + try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) { + Table sourceTable = new Table(client.get().getTable(sourceDbAndTable.getDb(), sourceDbAndTable.getTable())); + return new HiveDataset(fs, pool, sourceTable, ConfigUtils.propertiesToConfig(state.getProperties())); + } + } catch (TException exc) { + throw new RuntimeException(exc); + } + } + + private HiveConverterUtils.StorageFormat getOutputStorageFormat(State state) { + return HiveConverterUtils.StorageFormat.valueOf(state.getProp(OUTPUT_STORAGE_FORMAT, + HiveConverterUtils.StorageFormat.TEXT_FILE.name())); + } + + @Override + public Extractor<Object, Object> getExtractor(WorkUnitState state) throws IOException { + return null; + } + + @Override + public void shutdown(SourceState state) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-example/src/main/resources/hive-materializer.conf ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/resources/hive-materializer.conf b/gobblin-example/src/main/resources/hive-materializer.conf new file mode 100644 index 0000000..636caf5 --- /dev/null +++ b/gobblin-example/src/main/resources/hive-materializer.conf @@ -0,0 +1,19 @@ +# A job to materialize Hive tables / views / queries + +# ================= +# Enable exactly one of the modes below +# ================= +#gobblin.hiveMaterializerSource.copyTable="myDb.myTable" +#gobblin.hiveMaterializerSource.materializeView="myDb.myView" +#gobblin.hiveMaterializerSource.materializeQuery="SELECT * from myDb.myTable where name='foo'" + +source.class= org.apache.gobblin.example.hivematerializer.HiveMaterializerSource + +gobblin.hiveMaterializerSource.destination.dataPath="/tmp/hiveMaterializer/myTableMaterialized" +gobblin.hiveMaterializerSource.destination.tableName="myTable_materialized" +gobblin.hiveMaterializerSource.destination.dbName=myDb +# Supported: TEXTFILE, SEQUENCEFILE, ORC, PARQUET, AVRO, RCFILE +gobblin.hiveMaterializerSource.outputStorageFormat=AVRO + +hive.dataset.hive.metastore.uri="thrift://localhost:60083" +hiveserver.connection.string="jdbc:hive2://" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java index d301e29..1dd448b 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HiveJdbcConnector.java @@ -67,9 +67,6 @@ public class HiveJdbcConnector implements Closeable { // Connection to the Hive DB private Connection conn; - // Re-usable Statement - private Statement stmt; - private int hiveServerVersion; private boolean isSimulate; @@ -160,7 +157,6 @@ public class HiveJdbcConnector implements Closeable { */ private HiveJdbcConnector withHiveConnectionFromUrl(String hiveServerUrl) throws SQLException { this.conn = DriverManager.getConnection(hiveServerUrl); - this.stmt = this.conn.createStatement(); return this; } @@ -175,7 +171,6 @@ public class HiveJdbcConnector implements Closeable { private HiveJdbcConnector withHiveConnectionFromUrlUserPassword(String hiveServerUrl, String hiveServerUser, String hiveServerPassword) throws SQLException { this.conn = DriverManager.getConnection(hiveServerUrl, hiveServerUser, hiveServerPassword); - this.stmt = this.conn.createStatement(); return this; } @@ -190,7 +185,6 @@ public class HiveJdbcConnector implements Closeable { } else { this.conn = DriverManager.getConnection(HIVE2_EMBEDDED_CONNECTION_STRING); } - this.stmt = this.conn.createStatement(); return this; } @@ -250,7 +244,14 @@ public class HiveJdbcConnector implements Closeable { LOG.info("[SIMULATE MODE] STATEMENT NOT RUN: " + choppedStatement(statement)); } else { LOG.info("RUNNING STATEMENT: " + choppedStatement(statement)); - this.stmt.execute(statement); + try (Statement stmt = this.conn.createStatement()) { + try { + stmt.execute(statement); + } catch (SQLException sqe) { + LOG.error("Failed statement: " + statement); + throw sqe; + } + } } } } @@ -269,13 +270,6 @@ public class HiveJdbcConnector implements Closeable { @Override public void close() throws IOException { - if (this.stmt != null) { - try { - this.stmt.close(); - } catch (SQLException e) { - LOG.error("Failed to close JDBC statement object", e); - } - } if (this.conn != null) { try {
