Repository: incubator-gobblin Updated Branches: refs/heads/master 21cc7c048 -> eda77bcb6
[GOBBLIN-399] Improved logs and error messages for avro2orc conversion Closes #2323 from aditya1105/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/eda77bcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/eda77bcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/eda77bcb Branch: refs/heads/master Commit: eda77bcb6ae57fc1b38f7c70c2ac45995fde38b6 Parents: 21cc7c0 Author: Aditya Sharma <[email protected]> Authored: Thu Mar 29 11:25:10 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Mar 29 11:25:10 2018 -0700 ---------------------------------------------------------------------- .../converter/AbstractAvroToOrcConverter.java | 6 ++- .../hive/dataset/ConvertibleHiveDataset.java | 3 +- .../hive/query/HiveAvroORCQueryGenerator.java | 48 ++++++++++++-------- .../hive/writer/HiveQueryExecutionWriter.java | 12 +++-- 4 files changed, 45 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java index ed42946..732e149 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java @@ -676,7 +676,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem // Values for a partition are separated by "," List<String> partitionValues = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(partitionsInfoString); - // Do not drop partition the being processed. Sometimes a partition may have replaced another partition of the same values. + // Do not drop the partition being processed. Sometimes a partition may have replaced another partition of the same values. if (!partitionValues.equals(hivePartition.getValues())) { ImmutableMap.Builder<String, String> partitionDDLInfoMap = ImmutableMap.builder(); for (int i = 0; i < partitionKeys.size(); i++) { @@ -713,7 +713,9 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem return Optional.of(qlPartition.getDataLocation()); } } catch (IOException | TException | HiveException e) { - throw new DataConversionException("Could not fetch destination table metadata", e); + throw new DataConversionException( + String.format("Could not fetch destination table %s.%s metadata", table.get().getDbName(), + table.get().getTableName()), e); } return Optional.<Path>absent(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java index 933e86a..f4c8744 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java @@ -92,9 +92,10 @@ public class ConvertibleHiveDataset extends HiveDataset { super(fs, clientPool, table, jobProps, config); Preconditions.checkArgument(config.hasPath(DESTINATION_CONVERSION_FORMATS_KEY), String.format( - "Atleast one destination format should be specified at %s.%s. If you do not intend to convert this dataset set %s.%s to true", + "At least one destination format should be specified at %s.%s. If you do not intend to convert dataset %s set %s.%s to true", super.properties.getProperty(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, ""), DESTINATION_CONVERSION_FORMATS_KEY, + table.getCompleteName(), super.properties.getProperty(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, ""), HiveDatasetFinder.HIVE_DATASET_IS_BLACKLISTED_KEY)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java index d16df29..3f2206d 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java @@ -202,7 +202,7 @@ public class HiveAvroORCQueryGenerator { // .. use columns from destination schema if (isEvolutionEnabled || !destinationTableMeta.isPresent()) { log.info("Generating DDL using source schema"); - ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true)); + ddl.append(generateAvroToHiveColumnMapping(schema, Optional.of(hiveColumns), true, dbName + "." + tblName)); } else { log.info("Generating DDL using destination schema"); ddl.append(generateDestinationToHiveColumnMapping(Optional.of(hiveColumns), destinationTableMeta.get())); @@ -229,14 +229,15 @@ public class HiveAvroORCQueryGenerator { if (optionalClusterInfo.isPresent()) { if (!optionalNumOfBuckets.isPresent()) { - throw new IllegalArgumentException(("CLUSTERED BY requested, but no NUM_BUCKETS specified")); + throw new IllegalArgumentException( + (String.format("CLUSTERED BY requested, but no NUM_BUCKETS specified for table %s.%s", dbName, tblName))); } ddl.append("CLUSTERED BY ( "); boolean isFirst = true; for (String clusterByCol : optionalClusterInfo.get()) { if (!hiveColumns.containsKey(clusterByCol)) { throw new IllegalArgumentException(String.format("Requested CLUSTERED BY column: %s " - + "is not present in schema", clusterByCol)); + + "is not present in schema for table %s.%s", clusterByCol, dbName, tblName)); } if (isFirst) { isFirst = false; @@ -254,7 +255,8 @@ public class HiveAvroORCQueryGenerator { for (Map.Entry<String, COLUMN_SORT_ORDER> sortOrderInfo : sortOrderInfoMap.entrySet()){ if (!hiveColumns.containsKey(sortOrderInfo.getKey())) { throw new IllegalArgumentException(String.format( - "Requested SORTED BY column: %s " + "is not present in schema", sortOrderInfo.getKey())); + "Requested SORTED BY column: %s " + "is not present in schema for table %s.%s", sortOrderInfo.getKey(), + dbName, tblName)); } if (isFirst) { isFirst = false; @@ -268,7 +270,8 @@ public class HiveAvroORCQueryGenerator { ddl.append(String.format(" INTO %s BUCKETS %n", optionalNumOfBuckets.get())); } else { if (optionalSortOrderInfo.isPresent()) { - throw new IllegalArgumentException("SORTED BY requested, but no CLUSTERED BY specified"); + throw new IllegalArgumentException( + String.format("SORTED BY requested, but no CLUSTERED BY specified for table %s.%s", dbName, tblName)); } } @@ -389,12 +392,12 @@ public class HiveAvroORCQueryGenerator { * @param topLevel If this is first level * @return Generate Hive columns with types for given Avro schema */ - private static String generateAvroToHiveColumnMapping(Schema schema, - Optional<Map<String, String>> hiveColumns, - boolean topLevel) { + private static String generateAvroToHiveColumnMapping(Schema schema, Optional<Map<String, String>> hiveColumns, + boolean topLevel, String datasetName) { if (topLevel && !schema.getType().equals(Schema.Type.RECORD)) { - throw new IllegalArgumentException(String.format("Schema for table must be of type RECORD. Received type: %s", - schema.getType())); + throw new IllegalArgumentException( + String.format("Schema for table must be of type RECORD. Received type: %s for dataset %s", schema.getType(), + datasetName)); } StringBuilder columns = new StringBuilder(); @@ -409,7 +412,7 @@ public class HiveAvroORCQueryGenerator { } else { columns.append(", \n"); } - String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false); + String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName); if (hiveColumns.isPresent()) { hiveColumns.get().put(field.name(), type); } @@ -427,7 +430,7 @@ public class HiveAvroORCQueryGenerator { } else { columns.append(","); } - String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false); + String type = generateAvroToHiveColumnMapping(field.schema(), hiveColumns, false, datasetName); columns.append("`").append(field.name()).append("`").append(":").append(type); } columns.append(">"); @@ -437,7 +440,7 @@ public class HiveAvroORCQueryGenerator { Optional<Schema> optionalType = isOfOptionType(schema); if (optionalType.isPresent()) { Schema optionalTypeSchema = optionalType.get(); - columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, hiveColumns, false)); + columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, hiveColumns, false, datasetName)); } else { columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<"); isFirst = true; @@ -450,19 +453,20 @@ public class HiveAvroORCQueryGenerator { } else { columns.append(","); } - columns.append(generateAvroToHiveColumnMapping(unionMember, hiveColumns, false)); + columns.append(generateAvroToHiveColumnMapping(unionMember, hiveColumns, false, datasetName)); } columns.append(">"); } break; case MAP: columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<"); - columns.append("string,").append(generateAvroToHiveColumnMapping(schema.getValueType(), hiveColumns, false)); + columns.append("string,") + .append(generateAvroToHiveColumnMapping(schema.getValueType(), hiveColumns, false, datasetName)); columns.append(">"); break; case ARRAY: columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<"); - columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), hiveColumns, false)); + columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), hiveColumns, false, datasetName)); columns.append(">"); break; case NULL: @@ -479,7 +483,8 @@ public class HiveAvroORCQueryGenerator { columns.append(AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())); break; default: - String exceptionMessage = String.format("DDL query generation failed for \"%s\" ", schema); + String exceptionMessage = + String.format("DDL query generation failed for \"%s\" of dataset %s", schema, datasetName); log.error(exceptionMessage); throw new AvroRuntimeException(exceptionMessage); } @@ -844,7 +849,14 @@ public class HiveAvroORCQueryGenerator { if (destinationField.getName().equalsIgnoreCase(evolvedColumn.getKey())) { // If evolved column is found, but type is evolved - evolve it // .. if incompatible, isTypeEvolved will throw an exception - if (isTypeEvolved(evolvedColumn.getValue(), destinationField.getType())) { + boolean typeEvolved; + try { + typeEvolved = isTypeEvolved(evolvedColumn.getValue(), destinationField.getType()); + } catch (Exception e) { + throw new RuntimeException( + String.format("Unable to evolve schema for table %s.%s", finalDbName, finalTableName), e); + } + if (typeEvolved) { ddl.add(String.format("USE %s%n", finalDbName)); ddl.add(String.format("ALTER TABLE `%s` CHANGE COLUMN %s %s %s COMMENT '%s'", finalTableName, evolvedColumn.getKey(), evolvedColumn.getKey(), evolvedColumn.getValue(), http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/eda77bcb/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java index fa15459..9c9599c 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/writer/HiveQueryExecutionWriter.java @@ -61,11 +61,17 @@ public class HiveQueryExecutionWriter implements DataWriter<QueryBasedHiveConver addPropsForPublisher(hiveConversionEntity); EventWorkunitUtils.setEndConversionDDLExecuteTimeMetadata(this.workUnit, System.currentTimeMillis()); } catch (SQLException e) { - log.warn("Failed to execute queries: "); + StringBuilder sb = new StringBuilder(); + sb.append(String.format("Failed to execute queries for %s: ", + hiveConversionEntity.getPartition().isPresent() ? hiveConversionEntity.getPartition().get().getCompleteName() + : hiveConversionEntity.getTable().getCompleteName())); for (String conversionQuery : conversionQueries) { - log.warn("Conversion query attempted by Hive Query writer: " + conversionQuery); + sb.append("\nConversion query attempted by Hive Query writer: "); + sb.append(conversionQuery); } - throw new IOException(e); + String message = sb.toString(); + log.warn(message); + throw new IOException(message, e); } }
