This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fc44bc  [GOBBLIN-1006] Enable configurable case-preserving and schema 
source-of-truth in table level properties
9fc44bc is described below

commit 9fc44bca542754a6e865010c67eda1b6e4cfe0e8
Author: autumnust <[email protected]>
AuthorDate: Wed Dec 18 10:38:41 2019 -0800

    [GOBBLIN-1006] Enable configurable case-preserving and schema 
source-of-truth in table level properties
    
    Closes #2851 from autumnust/enhaceA2OSchemaSupport
---
 .../hive/converter/AbstractAvroToOrcConverter.java |  8 ++-
 .../hive/entities/StageableTableMetadata.java      | 14 +++++
 .../entities/TableLikeStageableTableMetadata.java  |  4 +-
 .../hive/query/HiveAvroORCQueryGenerator.java      | 64 ++++++++++++++--------
 .../conversion/hive/task/HiveConverterUtils.java   | 20 ++++---
 .../hive/converter/HiveSchemaEvolutionTest.java    | 30 ++++++----
 .../hive/task/HiveConverterUtilsTest.java          |  5 ++
 .../hive/util/HiveAvroORCQueryGeneratorTest.java   | 14 ++---
 .../java/org/apache/gobblin/util/AvroUtils.java    |  9 +++
 .../org/apache/gobblin/util/AvroUtilsTest.java     | 10 ++++
 10 files changed, 125 insertions(+), 53 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
index 7de06c6..9f3f269 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
@@ -222,6 +222,7 @@ public abstract class AbstractAvroToOrcConverter extends 
Converter<Schema, Schem
     String orcDataLocation = getOrcDataLocation();
     String orcStagingDataLocation = 
getOrcStagingDataLocation(orcStagingTableName);
     boolean isEvolutionEnabled = getConversionConfig().isEvolutionEnabled();
+    boolean isCasePreserved = getConversionConfig().isCasePreserved();
     Pair<Optional<Table>, Optional<List<Partition>>> destinationMeta = 
HiveConverterUtils.getDestinationTableMeta(orcTableDatabase,
         orcTableName, workUnit.getProperties());
     Optional<Table> destinationTableMeta = destinationMeta.getLeft();
@@ -346,6 +347,7 @@ public abstract class AbstractAvroToOrcConverter extends 
Converter<Schema, Schem
             Optional.<String>absent(),
             tableProperties,
             isEvolutionEnabled,
+            isCasePreserved,
             destinationTableMeta,
             hiveColumns);
     conversionEntity.getQueries().add(createStagingTableDDL);
@@ -435,6 +437,7 @@ public abstract class AbstractAvroToOrcConverter extends 
Converter<Schema, Schem
               Optional.<String>absent(),
               Optional.<String>absent(),
               tableProperties,
+              isCasePreserved,
               isEvolutionEnabled,
               destinationTableMeta,
               new HashMap<String, String>());
@@ -443,7 +446,7 @@ public abstract class AbstractAvroToOrcConverter extends 
Converter<Schema, Schem
     }
 
     // Step:
-    // A.2.1: If table pre-exists (destinationTableMeta would be present), 
evolve table
+    // A.2.1: If table pre-exists (destinationTableMeta would be present), 
evolve table and update table properties
     // B.2.1: No-op
     List<String> evolutionDDLs = 
HiveAvroORCQueryGenerator.generateEvolutionDDL(orcStagingTableName,
         orcTableName,
@@ -452,7 +455,8 @@ public abstract class AbstractAvroToOrcConverter extends 
Converter<Schema, Schem
         outputAvroSchema,
         isEvolutionEnabled,
         hiveColumns,
-        destinationTableMeta);
+        destinationTableMeta,
+        tableProperties);
     log.debug("Evolve final table DDLs: " + evolutionDDLs);
     EventWorkunitUtils.setEvolutionMetadata(workUnit, evolutionDDLs);
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
index 39494c2..46e6987 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
@@ -53,6 +53,12 @@ public class StageableTableMetadata {
   public static final String ROW_LIMIT_KEY = "rowLimit";
   public static final String HIVE_VERSION_KEY = "hiveVersion";
   public static final String HIVE_RUNTIME_PROPERTIES_LIST_KEY = 
"hiveRuntimeProperties";
+
+  /**
+   * The output of Hive-based conversion can preserve casing in ORC-writer if 
"columns" and "columns.types" are being set
+   * in table-level properties. Turning this off by-default as Hive engine 
itself doesn't preserve the case.
+   */
+  public static final String OUTPUT_FILE_CASE_PRESERVED = "casePreserved";
   /***
    * Comma separated list of string that should be used as a prefix for 
destination partition directory name
    * ... (if present in the location path string of source partition)
@@ -92,6 +98,12 @@ public class StageableTableMetadata {
    */
   public static final String SOURCE_DATA_PATH_IDENTIFIER_KEY = 
"source.dataPathIdentifier";
 
+  /**
+   * Attributes like "avro.schema.literal" are usually used in offline system 
as the source-of-truth of schema.
+   * This configuration's value should be the key name that users expects to 
preserve to schema string if necessary.
+   */
+  public static final String SCHEMA_SOURCE_OF_TRUTH = "schema.original";
+
 
   /** Table name of the destination table. */
   private final String destinationTableName;
@@ -109,6 +121,7 @@ public class StageableTableMetadata {
   private final Optional<Integer> numBuckets;
   private final Properties hiveRuntimeProperties;
   private final boolean evolutionEnabled;
+  private final boolean casePreserved;
   private final Optional<Integer> rowLimit;
   private final List<String> sourceDataPathIdentifier;
 
@@ -136,6 +149,7 @@ public class StageableTableMetadata {
     this.hiveRuntimeProperties =
         convertKeyValueListToProperties(ConfigUtils.getStringList(config, 
HIVE_RUNTIME_PROPERTIES_LIST_KEY));
     this.evolutionEnabled = ConfigUtils.getBoolean(config, EVOLUTION_ENABLED, 
false);
+    this.casePreserved = ConfigUtils.getBoolean(config, 
OUTPUT_FILE_CASE_PRESERVED, false);
     this.rowLimit = Optional.fromNullable(ConfigUtils.getInt(config, 
ROW_LIMIT_KEY, null));
     this.sourceDataPathIdentifier = ConfigUtils.getStringList(config, 
SOURCE_DATA_PATH_IDENTIFIER_KEY);
   }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
index b4fe9d4..cecd035 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
@@ -34,7 +34,7 @@ public class TableLikeStageableTableMetadata extends 
StageableTableMetadata {
 
   public TableLikeStageableTableMetadata(Table referenceTable, String 
destinationDB, String destinationTableName, String targetDataPath) {
     super(destinationTableName, destinationTableName + "_STAGING", 
destinationDB, targetDataPath,
-        getTableProperties(referenceTable), new ArrayList<>(), 
Optional.of(referenceTable.getNumBuckets()), new Properties(), false, 
Optional.absent(),
+        getTableProperties(referenceTable), new ArrayList<>(), 
Optional.of(referenceTable.getNumBuckets()), new Properties(), false, false, 
Optional.absent(),
         new ArrayList<>());
   }
 
@@ -44,7 +44,7 @@ public class TableLikeStageableTableMetadata extends 
StageableTableMetadata {
         
HiveDataset.resolveTemplate(config.getString(StageableTableMetadata.DESTINATION_DB_KEY),
 referenceTable),
         
HiveDataset.resolveTemplate(config.getString(DESTINATION_DATA_PATH_KEY), 
referenceTable),
         getTableProperties(referenceTable), new ArrayList<>(), 
Optional.of(referenceTable.getNumBuckets()),
-        new Properties(), false, Optional.absent(), new ArrayList<>());
+        new Properties(), false, false, Optional.absent(), new ArrayList<>());
   }
 
   private static Properties getTableProperties(Table table) {
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index 8d593ed..9ebf59f 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -23,18 +23,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import java.util.stream.Collectors;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.configuration.State;
+import 
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import 
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
 import org.apache.gobblin.util.HiveAvroTypeConstants;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -53,8 +53,11 @@ import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
-import org.apache.gobblin.configuration.State;
-import 
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import static 
org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata.SCHEMA_SOURCE_OF_TRUTH;
+import static org.apache.gobblin.util.AvroUtils.sanitizeSchemaString;
 
 
 /***
@@ -126,6 +129,7 @@ public class HiveAvroORCQueryGenerator {
       Optional<String> optionalOutputFormat,
       Properties tableProperties,
       boolean isEvolutionEnabled,
+      boolean casePreserved,
       Optional<Table> destinationTableMeta,
       Map<String, String> hiveColumns) {
 
@@ -160,21 +164,25 @@ public class HiveAvroORCQueryGenerator {
     //    (evolution does not matter if its new destination table)
     // 4. If evolution is disabled, and destination table does exists
     //    .. use columns from destination schema
+    // Make sure the schema attribute will be updated in source-of-truth 
attribute.
+    // Or fall back to default attribute-pair used in Hive for ORC format.
+    if (tableProperties.containsKey(SCHEMA_SOURCE_OF_TRUTH)) {
+      
tableProperties.setProperty(tableProperties.getProperty(SCHEMA_SOURCE_OF_TRUTH),
 sanitizeSchemaString(schema.toString()));
+      tableProperties.remove(SCHEMA_SOURCE_OF_TRUTH);
+    }
+
     if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
       log.info("Generating DDL using source schema");
       ddl.append(generateAvroToHiveColumnMapping(schema, 
Optional.of(hiveColumns), true, dbName + "." + tblName));
-      try {
-        AvroObjectInspectorGenerator objectInspectorGenerator = new 
AvroObjectInspectorGenerator(schema);
-        String columns = 
Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
-        String columnTypes = Joiner.on(",").join(
-            objectInspectorGenerator.getColumnTypes().stream().map(x -> 
x.getTypeName())
-                .collect(Collectors.toList()));
-        tableProperties.setProperty("columns", columns);
-        tableProperties.setProperty("columns.types", columnTypes);
-
-      } catch (Exception e) {
-        log.error("Cannot generate add partition DDL due to ", e);
-        throw new RuntimeException(e);
+      if (casePreserved) {
+        try {
+          Pair<String, String> orcSchemaProps = 
HiveConverterUtils.getORCSchemaPropsFromAvroSchema(schema);
+          tableProperties.setProperty("columns", orcSchemaProps.getLeft());
+          tableProperties.setProperty("columns.types", 
orcSchemaProps.getRight());
+        } catch (SerDeException e) {
+          log.error("Cannot generate add partition DDL due to ", e);
+          throw new RuntimeException(e);
+        }
       }
     } else {
       log.info("Generating DDL using destination schema");
@@ -504,7 +512,7 @@ public class HiveAvroORCQueryGenerator {
   public static String escapeHiveType(String type) {
     TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(type);
 
-    // Primitve
+    // Primitive
     if (ObjectInspector.Category.PRIMITIVE.equals(typeInfo.getCategory())) {
       return type;
     }
@@ -779,7 +787,8 @@ public class HiveAvroORCQueryGenerator {
   }
 
   /***
-   * Generate DDLs to evolve final destination table.
+   * Generate DDLs to evolve final destination table. This DDL should not only 
contains schema evolution but also
+   * include table-property updates.
    * @param stagingTableName Staging table.
    * @param finalTableName Un-evolved final destination table.
    * @param optionalStagingDbName Optional staging database name, defaults to 
default.
@@ -797,7 +806,9 @@ public class HiveAvroORCQueryGenerator {
       Schema evolvedSchema,
       boolean isEvolutionEnabled,
       Map<String, String> evolvedColumns,
-      Optional<Table> destinationTableMeta) {
+      Optional<Table> destinationTableMeta,
+      Properties tableProperties
+      ) {
     // If schema evolution is disabled, then do nothing OR
     // If destination table does not exists, then do nothing
     if (!isEvolutionEnabled || !destinationTableMeta.isPresent()) {
@@ -812,7 +823,7 @@ public class HiveAvroORCQueryGenerator {
     // Evolve schema
     Table destinationTable = destinationTableMeta.get();
     if (destinationTable.getSd().getCols().size() == 0) {
-      log.warn("Desination Table: " + destinationTable + " does not has column 
details in StorageDescriptor. "
+      log.warn("Destination Table: " + destinationTable + " does not has 
column details in StorageDescriptor. "
           + "It is probably of Avro type. Cannot evolve via traditional HQL, 
so skipping evolution checks.");
       return ddl;
     }
@@ -855,6 +866,13 @@ public class HiveAvroORCQueryGenerator {
       }
     }
 
+    // Updating table properties.
+    ddl.add(String.format("USE %s%n", finalDbName));
+    for (String property :tableProperties.stringPropertyNames()) {
+      ddl.add(String.format("ALTER TABLE `%s` SET TBLPROPERTIES ('%s'='%s')", 
finalTableName,
+          property, tableProperties.getProperty(property)));
+    }
+
     return ddl;
   }
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
index ad01b35..8005ee4 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.thrift.TException;
 
@@ -164,13 +165,9 @@ public class HiveConverterUtils {
 
     String dbName = optionalDbName.isPresent() ? optionalDbName.get() : 
"default";
     try {
-      AvroObjectInspectorGenerator objectInspectorGenerator = new 
AvroObjectInspectorGenerator(schema);
-      String columns = 
Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
-      String columnTypes = Joiner.on(",").join(
-          objectInspectorGenerator.getColumnTypes().stream().map(x -> 
x.getTypeName())
-              .collect(Collectors.toList()));
-    String dml = String.format("ALTER TABLE `%s`.`%s` SET TBLPROPERTIES 
('columns'='%s', 'columns.types'='%s')", dbName, tableName,
-        columns, columnTypes);
+      Pair<String, String> orcSchemaProps = 
getORCSchemaPropsFromAvroSchema(schema);
+      String dml = String.format("ALTER TABLE `%s`.`%s` SET TBLPROPERTIES 
('columns'='%s', 'columns.types'='%s')", dbName, tableName,
+          orcSchemaProps.getLeft(), orcSchemaProps.getRight());
     return dml;
     } catch (Exception e) {
       log.error("Cannot generate add partition DDL due to ", e);
@@ -178,6 +175,15 @@ public class HiveConverterUtils {
     }
   }
 
+  public static Pair<String, String> getORCSchemaPropsFromAvroSchema(Schema 
avroSchema) throws SerDeException {
+    AvroObjectInspectorGenerator objectInspectorGenerator = new 
AvroObjectInspectorGenerator(avroSchema);
+    String columns = 
Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
+    String columnTypes = Joiner.on(",").join(
+        objectInspectorGenerator.getColumnTypes().stream().map(x -> 
x.getTypeName())
+            .collect(Collectors.toList()));
+    return new ImmutablePair<>(columns, columnTypes);
+  }
+
   /**
    * Generates a CTAS statement to dump the contents of a table / partition 
into a new table.
    * @param outputDbAndTable output db and table where contents should be 
written.
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveSchemaEvolutionTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveSchemaEvolutionTest.java
index 4432b54..2ea5185 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveSchemaEvolutionTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveSchemaEvolutionTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -76,7 +77,7 @@ public class HiveSchemaEvolutionTest {
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
             Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-            null, isEvolutionEnabled, destinationTableMeta,
+            null, isEvolutionEnabled, true, destinationTableMeta,
             new HashMap<String, String>());
 
     Assert.assertEquals(ddl, 
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -101,7 +102,7 @@ public class HiveSchemaEvolutionTest {
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
             Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-            null, isEvolutionEnabled, destinationTableMeta,
+            null, isEvolutionEnabled, true, destinationTableMeta,
             new HashMap<String, String>());
 
     Assert.assertEquals(ddl, 
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -121,6 +122,7 @@ public class HiveSchemaEvolutionTest {
   @Test
   public void testEvolutionDisabledForExistingTable() throws IOException {
     boolean isEvolutionEnabled = false;
+    boolean casePreserved = true;
     Optional<Table> destinationTableMeta = 
createEvolvedDestinationTable(schemaName, "default", "", true);
 
     String ddl = HiveAvroORCQueryGenerator
@@ -128,7 +130,7 @@ public class HiveSchemaEvolutionTest {
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
             Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-            null, isEvolutionEnabled, destinationTableMeta,
+            null, isEvolutionEnabled, casePreserved, destinationTableMeta,
             new HashMap<String, String>());
 
     Assert.assertEquals(ddl, 
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -155,7 +157,7 @@ public class HiveSchemaEvolutionTest {
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
             Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-            null, isEvolutionEnabled, destinationTableMeta,
+            null, isEvolutionEnabled, true, destinationTableMeta,
             new HashMap<String, String>());
 
     Assert.assertEquals(ddl, 
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -182,7 +184,7 @@ public class HiveSchemaEvolutionTest {
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
             Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-            null, isEvolutionEnabled, destinationTableMeta,
+            null, isEvolutionEnabled, true, destinationTableMeta,
             new HashMap<String, String>());
 
     Assert.assertEquals(ddl, 
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -212,13 +214,15 @@ public class HiveSchemaEvolutionTest {
         Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
         Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
         Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-        null, isEvolutionEnabled, destinationTableMeta, hiveColumns);
+        null, isEvolutionEnabled, true, destinationTableMeta, hiveColumns);
 
     // Destination table exists
+    Properties tableProperties = new Properties();
+    tableProperties.setProperty("random", "value");
     List<String> generateEvolutionDDL = HiveAvroORCQueryGenerator
         .generateEvolutionDDL(orcStagingTableName, orcTableName, 
Optional.of(hiveDbName), Optional.of(hiveDbName),
-            outputSchema, isEvolutionEnabled, hiveColumns, 
destinationTableMeta);
-    Assert.assertEquals(generateEvolutionDDL.size(), 2);
+            outputSchema, isEvolutionEnabled, hiveColumns, 
destinationTableMeta, tableProperties);
+    Assert.assertEquals(generateEvolutionDDL.size(), 4);
     Assert.assertEquals(generateEvolutionDDL.get(1),
         "ALTER TABLE `sourceSchema` ADD COLUMNS 
(`parentFieldRecord__nestedFieldInt` int "
             + "COMMENT 'from flatten_source 
parentFieldRecord.nestedFieldInt')",
@@ -228,7 +232,7 @@ public class HiveSchemaEvolutionTest {
     destinationTableMeta = Optional.absent();
     generateEvolutionDDL = HiveAvroORCQueryGenerator
         .generateEvolutionDDL(orcStagingTableName, orcTableName, 
Optional.of(hiveDbName), Optional.of(hiveDbName),
-            outputSchema, isEvolutionEnabled, hiveColumns, 
destinationTableMeta);
+            outputSchema, isEvolutionEnabled, hiveColumns, 
destinationTableMeta, tableProperties);
     // No DDL should be generated, because create table will take care of 
destination table
     Assert.assertEquals(generateEvolutionDDL.size(), 0,
         "Generated evolution DDL did not match for evolution enabled");
@@ -247,12 +251,14 @@ public class HiveSchemaEvolutionTest {
         Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
         Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
         Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-        null, isEvolutionEnabled, destinationTableMeta, hiveColumns);
+        null, isEvolutionEnabled, true, destinationTableMeta, hiveColumns);
 
     // Destination table exists
+    Properties tableProperties = new Properties();
+    tableProperties.setProperty("random", "value");
     List<String> generateEvolutionDDL = HiveAvroORCQueryGenerator
         .generateEvolutionDDL(orcStagingTableName, orcTableName, 
Optional.of(hiveDbName), Optional.of(hiveDbName),
-            outputSchema, isEvolutionEnabled, hiveColumns, 
destinationTableMeta);
+            outputSchema, isEvolutionEnabled, hiveColumns, 
destinationTableMeta, tableProperties);
     // No DDL should be generated, because select based on destination table 
will selectively project columns
     Assert.assertEquals(generateEvolutionDDL.size(), 0,
         "Generated evolution DDL did not match for evolution disabled");
@@ -261,7 +267,7 @@ public class HiveSchemaEvolutionTest {
     destinationTableMeta = Optional.absent();
     generateEvolutionDDL = HiveAvroORCQueryGenerator
         .generateEvolutionDDL(orcStagingTableName, orcTableName, 
Optional.of(hiveDbName), Optional.of(hiveDbName),
-            outputSchema, isEvolutionEnabled, hiveColumns, 
destinationTableMeta);
+            outputSchema, isEvolutionEnabled, hiveColumns, 
destinationTableMeta, tableProperties);
     // No DDL should be generated, because create table will take care of 
destination table
     Assert.assertEquals(generateEvolutionDDL.size(), 0,
         "Generated evolution DDL did not match for evolution disabled");
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
index a2a3dbe..bb8592d 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
@@ -19,12 +19,17 @@ package 
org.apache.gobblin.data.management.conversion.hive.task;
 
 import java.util.Map;
 
+import org.apache.avro.Schema;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Test;
 import org.testng.Assert;
 import org.testng.collections.Maps;
 
 import com.google.common.base.Optional;
 
+import static 
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils.getORCSchemaPropsFromAvroSchema;
+
+
 public class HiveConverterUtilsTest {
   private final String inputDbName = "testdb";
   private final String inputTableName = "testtable";
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
index 410c3bd..fadff0f 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
@@ -60,7 +60,7 @@ public class HiveAvroORCQueryGeneratorTest {
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
             Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-            null, isEvolutionEnabled, destinationTableMeta,
+            null, isEvolutionEnabled, true, destinationTableMeta,
             new HashMap<String, String>());
 
     Assert.assertEquals(q,
@@ -82,7 +82,7 @@ public class HiveAvroORCQueryGeneratorTest {
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
             Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-            null, isEvolutionEnabled, destinationTableMeta,
+            null, isEvolutionEnabled, true, destinationTableMeta,
             new HashMap<String, String>());
 
     Assert.assertEquals(q,
@@ -104,7 +104,7 @@ public class HiveAvroORCQueryGeneratorTest {
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
             Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-            null, isEvolutionEnabled, destinationTableMeta,
+            null, isEvolutionEnabled, true, destinationTableMeta,
             new HashMap<String, String>());
 
     Assert.assertEquals(q.trim(),
@@ -126,7 +126,7 @@ public class HiveAvroORCQueryGeneratorTest {
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
             Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(),
-            null, isEvolutionEnabled, destinationTableMeta,
+            null, isEvolutionEnabled, true, destinationTableMeta,
             new HashMap<String, String>());
 
     Assert.assertEquals(q.trim(),
@@ -149,7 +149,7 @@ public class HiveAvroORCQueryGeneratorTest {
         .generateCreateTableDDL(flattenedSchema, schemaName, 
"file:/user/hive/warehouse/" + schemaName,
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
-            Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(), null, isEvolutionEnabled,
+            Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(), null, isEvolutionEnabled, true,
             destinationTableMeta, new HashMap<String, String>());
 
     Assert.assertEquals(q,
@@ -196,7 +196,7 @@ public class HiveAvroORCQueryGeneratorTest {
         .generateCreateTableDDL(flattenedSchema, schemaName, 
"file:/user/hive/warehouse/" + schemaName,
             Optional.<String>absent(), Optional.of(partitionDDLInfo), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
-            Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(), null, isEvolutionEnabled,
+            Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(), null, isEvolutionEnabled, true,
             destinationTableMeta, new HashMap<String, String>());
 
     Assert.assertEquals(q,
@@ -239,7 +239,7 @@ public class HiveAvroORCQueryGeneratorTest {
         .generateCreateTableDDL(schema, schemaName, 
"file:/user/hive/warehouse/" + schemaName,
             Optional.<String>absent(), Optional.<Map<String, String>>absent(), 
Optional.<List<String>>absent(),
             Optional.<Map<String, 
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(), 
Optional.<Integer>absent(),
-            Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(), null, isEvolutionEnabled,
+            Optional.<String>absent(), Optional.<String>absent(), 
Optional.<String>absent(), null, isEvolutionEnabled, true,
             destinationTableMeta, new HashMap<String, String>());
   }
 
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index fa1fa3b..fceaaa6 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -856,6 +856,15 @@ public class AvroUtils {
   }
 
   /**
+   * Escaping ";" and "'" character in the schema string when it is being used 
in DDL.
+   * These characters are not allowed to show as part of column name but could 
possibly appear in documentation field.
+   * Therefore the escaping behavior won't cause correctness issues.
+   */
+  public static String sanitizeSchemaString(String schemaString) {
+    return schemaString.replaceAll(";",  "\\\\;").replaceAll("'", "\\\\'");
+  }
+
+  /**
    * Deserialize a {@link GenericRecord} from a byte array. This method is not 
intended for high performance.
    */
   public static GenericRecord slowDeserializeGenericRecord(byte[] 
serializedRecord, Schema schema) throws IOException {
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index a2e205b..e7a8c35 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -264,6 +264,16 @@ public class AvroUtilsTest {
     Assert.assertEquals(AvroUtils.serializeAsPath(partition, false, false), 
new Path("a/b_c_d_e/title"));
   }
 
+  @Test
+  public void testStringEscaping() {
+    String invalidString = "foo;foo'bar";
+    String expectedString = "foo\\;foo\\'bar";
+    String actualString = AvroUtils.sanitizeSchemaString(invalidString);
+    Assert.assertEquals(actualString, expectedString);
+    // Verify that there's only one slash being added.
+    Assert.assertEquals(actualString.length(), invalidString.length() + 2);
+  }
+
   public static List<GenericRecord> getRecordFromFile(String path)
       throws IOException {
     Configuration config = new Configuration();

Reply via email to