This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9fc44bc [GOBBLIN-1006] Enable configurable case-preserving and schema
source-of-truth in table level properties
9fc44bc is described below
commit 9fc44bca542754a6e865010c67eda1b6e4cfe0e8
Author: autumnust <[email protected]>
AuthorDate: Wed Dec 18 10:38:41 2019 -0800
[GOBBLIN-1006] Enable configurable case-preserving and schema
source-of-truth in table level properties
Closes #2851 from autumnust/enhaceA2OSchemaSupport
---
.../hive/converter/AbstractAvroToOrcConverter.java | 8 ++-
.../hive/entities/StageableTableMetadata.java | 14 +++++
.../entities/TableLikeStageableTableMetadata.java | 4 +-
.../hive/query/HiveAvroORCQueryGenerator.java | 64 ++++++++++++++--------
.../conversion/hive/task/HiveConverterUtils.java | 20 ++++---
.../hive/converter/HiveSchemaEvolutionTest.java | 30 ++++++----
.../hive/task/HiveConverterUtilsTest.java | 5 ++
.../hive/util/HiveAvroORCQueryGeneratorTest.java | 14 ++---
.../java/org/apache/gobblin/util/AvroUtils.java | 9 +++
.../org/apache/gobblin/util/AvroUtilsTest.java | 10 ++++
10 files changed, 125 insertions(+), 53 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
index 7de06c6..9f3f269 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
@@ -222,6 +222,7 @@ public abstract class AbstractAvroToOrcConverter extends
Converter<Schema, Schem
String orcDataLocation = getOrcDataLocation();
String orcStagingDataLocation =
getOrcStagingDataLocation(orcStagingTableName);
boolean isEvolutionEnabled = getConversionConfig().isEvolutionEnabled();
+ boolean isCasePreserved = getConversionConfig().isCasePreserved();
Pair<Optional<Table>, Optional<List<Partition>>> destinationMeta =
HiveConverterUtils.getDestinationTableMeta(orcTableDatabase,
orcTableName, workUnit.getProperties());
Optional<Table> destinationTableMeta = destinationMeta.getLeft();
@@ -346,6 +347,7 @@ public abstract class AbstractAvroToOrcConverter extends
Converter<Schema, Schem
Optional.<String>absent(),
tableProperties,
isEvolutionEnabled,
+ isCasePreserved,
destinationTableMeta,
hiveColumns);
conversionEntity.getQueries().add(createStagingTableDDL);
@@ -435,6 +437,7 @@ public abstract class AbstractAvroToOrcConverter extends
Converter<Schema, Schem
Optional.<String>absent(),
Optional.<String>absent(),
tableProperties,
+ isCasePreserved,
isEvolutionEnabled,
destinationTableMeta,
new HashMap<String, String>());
@@ -443,7 +446,7 @@ public abstract class AbstractAvroToOrcConverter extends
Converter<Schema, Schem
}
// Step:
- // A.2.1: If table pre-exists (destinationTableMeta would be present),
evolve table
+ // A.2.1: If table pre-exists (destinationTableMeta would be present),
evolve table and update table properties
// B.2.1: No-op
List<String> evolutionDDLs =
HiveAvroORCQueryGenerator.generateEvolutionDDL(orcStagingTableName,
orcTableName,
@@ -452,7 +455,8 @@ public abstract class AbstractAvroToOrcConverter extends
Converter<Schema, Schem
outputAvroSchema,
isEvolutionEnabled,
hiveColumns,
- destinationTableMeta);
+ destinationTableMeta,
+ tableProperties);
log.debug("Evolve final table DDLs: " + evolutionDDLs);
EventWorkunitUtils.setEvolutionMetadata(workUnit, evolutionDDLs);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
index 39494c2..46e6987 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
@@ -53,6 +53,12 @@ public class StageableTableMetadata {
public static final String ROW_LIMIT_KEY = "rowLimit";
public static final String HIVE_VERSION_KEY = "hiveVersion";
public static final String HIVE_RUNTIME_PROPERTIES_LIST_KEY =
"hiveRuntimeProperties";
+
+ /**
+ * The output of Hive-based conversion can preserve casing in ORC-writer if
"columns" and "columns.types" are being set
+ * in table-level properties. Turning this off by-default as Hive engine
itself doesn't preserve the case.
+ */
+ public static final String OUTPUT_FILE_CASE_PRESERVED = "casePreserved";
/***
* Comma separated list of string that should be used as a prefix for
destination partition directory name
* ... (if present in the location path string of source partition)
@@ -92,6 +98,12 @@ public class StageableTableMetadata {
*/
public static final String SOURCE_DATA_PATH_IDENTIFIER_KEY =
"source.dataPathIdentifier";
+ /**
+ * Attributes like "avro.schema.literal" are usually used in offline system
as the source-of-truth of schema.
+ * This configuration's value should be the key name that users expects to
preserve to schema string if necessary.
+ */
+ public static final String SCHEMA_SOURCE_OF_TRUTH = "schema.original";
+
/** Table name of the destination table. */
private final String destinationTableName;
@@ -109,6 +121,7 @@ public class StageableTableMetadata {
private final Optional<Integer> numBuckets;
private final Properties hiveRuntimeProperties;
private final boolean evolutionEnabled;
+ private final boolean casePreserved;
private final Optional<Integer> rowLimit;
private final List<String> sourceDataPathIdentifier;
@@ -136,6 +149,7 @@ public class StageableTableMetadata {
this.hiveRuntimeProperties =
convertKeyValueListToProperties(ConfigUtils.getStringList(config,
HIVE_RUNTIME_PROPERTIES_LIST_KEY));
this.evolutionEnabled = ConfigUtils.getBoolean(config, EVOLUTION_ENABLED,
false);
+ this.casePreserved = ConfigUtils.getBoolean(config,
OUTPUT_FILE_CASE_PRESERVED, false);
this.rowLimit = Optional.fromNullable(ConfigUtils.getInt(config,
ROW_LIMIT_KEY, null));
this.sourceDataPathIdentifier = ConfigUtils.getStringList(config,
SOURCE_DATA_PATH_IDENTIFIER_KEY);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
index b4fe9d4..cecd035 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
@@ -34,7 +34,7 @@ public class TableLikeStageableTableMetadata extends
StageableTableMetadata {
public TableLikeStageableTableMetadata(Table referenceTable, String
destinationDB, String destinationTableName, String targetDataPath) {
super(destinationTableName, destinationTableName + "_STAGING",
destinationDB, targetDataPath,
- getTableProperties(referenceTable), new ArrayList<>(),
Optional.of(referenceTable.getNumBuckets()), new Properties(), false,
Optional.absent(),
+ getTableProperties(referenceTable), new ArrayList<>(),
Optional.of(referenceTable.getNumBuckets()), new Properties(), false, false,
Optional.absent(),
new ArrayList<>());
}
@@ -44,7 +44,7 @@ public class TableLikeStageableTableMetadata extends
StageableTableMetadata {
HiveDataset.resolveTemplate(config.getString(StageableTableMetadata.DESTINATION_DB_KEY),
referenceTable),
HiveDataset.resolveTemplate(config.getString(DESTINATION_DATA_PATH_KEY),
referenceTable),
getTableProperties(referenceTable), new ArrayList<>(),
Optional.of(referenceTable.getNumBuckets()),
- new Properties(), false, Optional.absent(), new ArrayList<>());
+ new Properties(), false, false, Optional.absent(), new ArrayList<>());
}
private static Properties getTableProperties(Table table) {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index 8d593ed..9ebf59f 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -23,18 +23,18 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.stream.Collectors;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.configuration.State;
+import
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
import org.apache.gobblin.util.HiveAvroTypeConstants;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -53,8 +53,11 @@ import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import org.apache.gobblin.configuration.State;
-import
org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import static
org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata.SCHEMA_SOURCE_OF_TRUTH;
+import static org.apache.gobblin.util.AvroUtils.sanitizeSchemaString;
/***
@@ -126,6 +129,7 @@ public class HiveAvroORCQueryGenerator {
Optional<String> optionalOutputFormat,
Properties tableProperties,
boolean isEvolutionEnabled,
+ boolean casePreserved,
Optional<Table> destinationTableMeta,
Map<String, String> hiveColumns) {
@@ -160,21 +164,25 @@ public class HiveAvroORCQueryGenerator {
// (evolution does not matter if its new destination table)
// 4. If evolution is disabled, and destination table does exists
// .. use columns from destination schema
+ // Make sure the schema attribute will be updated in source-of-truth
attribute.
+ // Or fall back to default attribute-pair used in Hive for ORC format.
+ if (tableProperties.containsKey(SCHEMA_SOURCE_OF_TRUTH)) {
+
tableProperties.setProperty(tableProperties.getProperty(SCHEMA_SOURCE_OF_TRUTH),
sanitizeSchemaString(schema.toString()));
+ tableProperties.remove(SCHEMA_SOURCE_OF_TRUTH);
+ }
+
if (isEvolutionEnabled || !destinationTableMeta.isPresent()) {
log.info("Generating DDL using source schema");
ddl.append(generateAvroToHiveColumnMapping(schema,
Optional.of(hiveColumns), true, dbName + "." + tblName));
- try {
- AvroObjectInspectorGenerator objectInspectorGenerator = new
AvroObjectInspectorGenerator(schema);
- String columns =
Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
- String columnTypes = Joiner.on(",").join(
- objectInspectorGenerator.getColumnTypes().stream().map(x ->
x.getTypeName())
- .collect(Collectors.toList()));
- tableProperties.setProperty("columns", columns);
- tableProperties.setProperty("columns.types", columnTypes);
-
- } catch (Exception e) {
- log.error("Cannot generate add partition DDL due to ", e);
- throw new RuntimeException(e);
+ if (casePreserved) {
+ try {
+ Pair<String, String> orcSchemaProps =
HiveConverterUtils.getORCSchemaPropsFromAvroSchema(schema);
+ tableProperties.setProperty("columns", orcSchemaProps.getLeft());
+ tableProperties.setProperty("columns.types",
orcSchemaProps.getRight());
+ } catch (SerDeException e) {
+ log.error("Cannot generate add partition DDL due to ", e);
+ throw new RuntimeException(e);
+ }
}
} else {
log.info("Generating DDL using destination schema");
@@ -504,7 +512,7 @@ public class HiveAvroORCQueryGenerator {
public static String escapeHiveType(String type) {
TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(type);
- // Primitve
+ // Primitive
if (ObjectInspector.Category.PRIMITIVE.equals(typeInfo.getCategory())) {
return type;
}
@@ -779,7 +787,8 @@ public class HiveAvroORCQueryGenerator {
}
/***
- * Generate DDLs to evolve final destination table.
+ * Generate DDLs to evolve final destination table. This DDL should not only
contains schema evolution but also
+ * include table-property updates.
* @param stagingTableName Staging table.
* @param finalTableName Un-evolved final destination table.
* @param optionalStagingDbName Optional staging database name, defaults to
default.
@@ -797,7 +806,9 @@ public class HiveAvroORCQueryGenerator {
Schema evolvedSchema,
boolean isEvolutionEnabled,
Map<String, String> evolvedColumns,
- Optional<Table> destinationTableMeta) {
+ Optional<Table> destinationTableMeta,
+ Properties tableProperties
+ ) {
// If schema evolution is disabled, then do nothing OR
// If destination table does not exists, then do nothing
if (!isEvolutionEnabled || !destinationTableMeta.isPresent()) {
@@ -812,7 +823,7 @@ public class HiveAvroORCQueryGenerator {
// Evolve schema
Table destinationTable = destinationTableMeta.get();
if (destinationTable.getSd().getCols().size() == 0) {
- log.warn("Desination Table: " + destinationTable + " does not has column
details in StorageDescriptor. "
+ log.warn("Destination Table: " + destinationTable + " does not has
column details in StorageDescriptor. "
+ "It is probably of Avro type. Cannot evolve via traditional HQL,
so skipping evolution checks.");
return ddl;
}
@@ -855,6 +866,13 @@ public class HiveAvroORCQueryGenerator {
}
}
+ // Updating table properties.
+ ddl.add(String.format("USE %s%n", finalDbName));
+ for (String property :tableProperties.stringPropertyNames()) {
+ ddl.add(String.format("ALTER TABLE `%s` SET TBLPROPERTIES ('%s'='%s')",
finalTableName,
+ property, tableProperties.getProperty(property)));
+ }
+
return ddl;
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
index ad01b35..8005ee4 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
@@ -40,6 +40,7 @@ import
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.thrift.TException;
@@ -164,13 +165,9 @@ public class HiveConverterUtils {
String dbName = optionalDbName.isPresent() ? optionalDbName.get() :
"default";
try {
- AvroObjectInspectorGenerator objectInspectorGenerator = new
AvroObjectInspectorGenerator(schema);
- String columns =
Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
- String columnTypes = Joiner.on(",").join(
- objectInspectorGenerator.getColumnTypes().stream().map(x ->
x.getTypeName())
- .collect(Collectors.toList()));
- String dml = String.format("ALTER TABLE `%s`.`%s` SET TBLPROPERTIES
('columns'='%s', 'columns.types'='%s')", dbName, tableName,
- columns, columnTypes);
+ Pair<String, String> orcSchemaProps =
getORCSchemaPropsFromAvroSchema(schema);
+ String dml = String.format("ALTER TABLE `%s`.`%s` SET TBLPROPERTIES
('columns'='%s', 'columns.types'='%s')", dbName, tableName,
+ orcSchemaProps.getLeft(), orcSchemaProps.getRight());
return dml;
} catch (Exception e) {
log.error("Cannot generate add partition DDL due to ", e);
@@ -178,6 +175,15 @@ public class HiveConverterUtils {
}
}
+ public static Pair<String, String> getORCSchemaPropsFromAvroSchema(Schema
avroSchema) throws SerDeException {
+ AvroObjectInspectorGenerator objectInspectorGenerator = new
AvroObjectInspectorGenerator(avroSchema);
+ String columns =
Joiner.on(",").join(objectInspectorGenerator.getColumnNames());
+ String columnTypes = Joiner.on(",").join(
+ objectInspectorGenerator.getColumnTypes().stream().map(x ->
x.getTypeName())
+ .collect(Collectors.toList()));
+ return new ImmutablePair<>(columns, columnTypes);
+ }
+
/**
* Generates a CTAS statement to dump the contents of a table / partition
into a new table.
* @param outputDbAndTable output db and table where contents should be
written.
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveSchemaEvolutionTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveSchemaEvolutionTest.java
index 4432b54..2ea5185 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveSchemaEvolutionTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveSchemaEvolutionTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
@@ -76,7 +77,7 @@ public class HiveSchemaEvolutionTest {
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta,
+ null, isEvolutionEnabled, true, destinationTableMeta,
new HashMap<String, String>());
Assert.assertEquals(ddl,
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -101,7 +102,7 @@ public class HiveSchemaEvolutionTest {
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta,
+ null, isEvolutionEnabled, true, destinationTableMeta,
new HashMap<String, String>());
Assert.assertEquals(ddl,
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -121,6 +122,7 @@ public class HiveSchemaEvolutionTest {
@Test
public void testEvolutionDisabledForExistingTable() throws IOException {
boolean isEvolutionEnabled = false;
+ boolean casePreserved = true;
Optional<Table> destinationTableMeta =
createEvolvedDestinationTable(schemaName, "default", "", true);
String ddl = HiveAvroORCQueryGenerator
@@ -128,7 +130,7 @@ public class HiveSchemaEvolutionTest {
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta,
+ null, isEvolutionEnabled, casePreserved, destinationTableMeta,
new HashMap<String, String>());
Assert.assertEquals(ddl,
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -155,7 +157,7 @@ public class HiveSchemaEvolutionTest {
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta,
+ null, isEvolutionEnabled, true, destinationTableMeta,
new HashMap<String, String>());
Assert.assertEquals(ddl,
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -182,7 +184,7 @@ public class HiveSchemaEvolutionTest {
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta,
+ null, isEvolutionEnabled, true, destinationTableMeta,
new HashMap<String, String>());
Assert.assertEquals(ddl,
ConversionHiveTestUtils.readQueryFromFile(resourceDir,
@@ -212,13 +214,15 @@ public class HiveSchemaEvolutionTest {
Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta, hiveColumns);
+ null, isEvolutionEnabled, true, destinationTableMeta, hiveColumns);
// Destination table exists
+ Properties tableProperties = new Properties();
+ tableProperties.setProperty("random", "value");
List<String> generateEvolutionDDL = HiveAvroORCQueryGenerator
.generateEvolutionDDL(orcStagingTableName, orcTableName,
Optional.of(hiveDbName), Optional.of(hiveDbName),
- outputSchema, isEvolutionEnabled, hiveColumns,
destinationTableMeta);
- Assert.assertEquals(generateEvolutionDDL.size(), 2);
+ outputSchema, isEvolutionEnabled, hiveColumns,
destinationTableMeta, tableProperties);
+ Assert.assertEquals(generateEvolutionDDL.size(), 4);
Assert.assertEquals(generateEvolutionDDL.get(1),
"ALTER TABLE `sourceSchema` ADD COLUMNS
(`parentFieldRecord__nestedFieldInt` int "
+ "COMMENT 'from flatten_source
parentFieldRecord.nestedFieldInt')",
@@ -228,7 +232,7 @@ public class HiveSchemaEvolutionTest {
destinationTableMeta = Optional.absent();
generateEvolutionDDL = HiveAvroORCQueryGenerator
.generateEvolutionDDL(orcStagingTableName, orcTableName,
Optional.of(hiveDbName), Optional.of(hiveDbName),
- outputSchema, isEvolutionEnabled, hiveColumns,
destinationTableMeta);
+ outputSchema, isEvolutionEnabled, hiveColumns,
destinationTableMeta, tableProperties);
// No DDL should be generated, because create table will take care of
destination table
Assert.assertEquals(generateEvolutionDDL.size(), 0,
"Generated evolution DDL did not match for evolution enabled");
@@ -247,12 +251,14 @@ public class HiveSchemaEvolutionTest {
Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta, hiveColumns);
+ null, isEvolutionEnabled, true, destinationTableMeta, hiveColumns);
// Destination table exists
+ Properties tableProperties = new Properties();
+ tableProperties.setProperty("random", "value");
List<String> generateEvolutionDDL = HiveAvroORCQueryGenerator
.generateEvolutionDDL(orcStagingTableName, orcTableName,
Optional.of(hiveDbName), Optional.of(hiveDbName),
- outputSchema, isEvolutionEnabled, hiveColumns,
destinationTableMeta);
+ outputSchema, isEvolutionEnabled, hiveColumns,
destinationTableMeta, tableProperties);
// No DDL should be generated, because select based on destination table
will selectively project columns
Assert.assertEquals(generateEvolutionDDL.size(), 0,
"Generated evolution DDL did not match for evolution disabled");
@@ -261,7 +267,7 @@ public class HiveSchemaEvolutionTest {
destinationTableMeta = Optional.absent();
generateEvolutionDDL = HiveAvroORCQueryGenerator
.generateEvolutionDDL(orcStagingTableName, orcTableName,
Optional.of(hiveDbName), Optional.of(hiveDbName),
- outputSchema, isEvolutionEnabled, hiveColumns,
destinationTableMeta);
+ outputSchema, isEvolutionEnabled, hiveColumns,
destinationTableMeta, tableProperties);
// No DDL should be generated, because create table will take care of
destination table
Assert.assertEquals(generateEvolutionDDL.size(), 0,
"Generated evolution DDL did not match for evolution disabled");
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
index a2a3dbe..bb8592d 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
@@ -19,12 +19,17 @@ package
org.apache.gobblin.data.management.conversion.hive.task;
import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.testng.Assert;
import org.testng.collections.Maps;
import com.google.common.base.Optional;
+import static
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils.getORCSchemaPropsFromAvroSchema;
+
+
public class HiveConverterUtilsTest {
private final String inputDbName = "testdb";
private final String inputTableName = "testtable";
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
index 410c3bd..fadff0f 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
@@ -60,7 +60,7 @@ public class HiveAvroORCQueryGeneratorTest {
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta,
+ null, isEvolutionEnabled, true, destinationTableMeta,
new HashMap<String, String>());
Assert.assertEquals(q,
@@ -82,7 +82,7 @@ public class HiveAvroORCQueryGeneratorTest {
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta,
+ null, isEvolutionEnabled, true, destinationTableMeta,
new HashMap<String, String>());
Assert.assertEquals(q,
@@ -104,7 +104,7 @@ public class HiveAvroORCQueryGeneratorTest {
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta,
+ null, isEvolutionEnabled, true, destinationTableMeta,
new HashMap<String, String>());
Assert.assertEquals(q.trim(),
@@ -126,7 +126,7 @@ public class HiveAvroORCQueryGeneratorTest {
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(),
- null, isEvolutionEnabled, destinationTableMeta,
+ null, isEvolutionEnabled, true, destinationTableMeta,
new HashMap<String, String>());
Assert.assertEquals(q.trim(),
@@ -149,7 +149,7 @@ public class HiveAvroORCQueryGeneratorTest {
.generateCreateTableDDL(flattenedSchema, schemaName,
"file:/user/hive/warehouse/" + schemaName,
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
- Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(), null, isEvolutionEnabled,
+ Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(), null, isEvolutionEnabled, true,
destinationTableMeta, new HashMap<String, String>());
Assert.assertEquals(q,
@@ -196,7 +196,7 @@ public class HiveAvroORCQueryGeneratorTest {
.generateCreateTableDDL(flattenedSchema, schemaName,
"file:/user/hive/warehouse/" + schemaName,
Optional.<String>absent(), Optional.of(partitionDDLInfo),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
- Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(), null, isEvolutionEnabled,
+ Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(), null, isEvolutionEnabled, true,
destinationTableMeta, new HashMap<String, String>());
Assert.assertEquals(q,
@@ -239,7 +239,7 @@ public class HiveAvroORCQueryGeneratorTest {
.generateCreateTableDDL(schema, schemaName,
"file:/user/hive/warehouse/" + schemaName,
Optional.<String>absent(), Optional.<Map<String, String>>absent(),
Optional.<List<String>>absent(),
Optional.<Map<String,
HiveAvroORCQueryGenerator.COLUMN_SORT_ORDER>>absent(),
Optional.<Integer>absent(),
- Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(), null, isEvolutionEnabled,
+ Optional.<String>absent(), Optional.<String>absent(),
Optional.<String>absent(), null, isEvolutionEnabled, true,
destinationTableMeta, new HashMap<String, String>());
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index fa1fa3b..fceaaa6 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -856,6 +856,15 @@ public class AvroUtils {
}
/**
+ * Escaping ";" and "'" character in the schema string when it is being used
in DDL.
+ * These characters are not allowed to show as part of column name but could
possibly appear in documentation field.
+ * Therefore the escaping behavior won't cause correctness issues.
+ */
+ public static String sanitizeSchemaString(String schemaString) {
+ return schemaString.replaceAll(";", "\\\\;").replaceAll("'", "\\\\'");
+ }
+
+ /**
* Deserialize a {@link GenericRecord} from a byte array. This method is not
intended for high performance.
*/
public static GenericRecord slowDeserializeGenericRecord(byte[]
serializedRecord, Schema schema) throws IOException {
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index a2e205b..e7a8c35 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -264,6 +264,16 @@ public class AvroUtilsTest {
Assert.assertEquals(AvroUtils.serializeAsPath(partition, false, false),
new Path("a/b_c_d_e/title"));
}
+ @Test
+ public void testStringEscaping() {
+ String invalidString = "foo;foo'bar";
+ String expectedString = "foo\\;foo\\'bar";
+ String actualString = AvroUtils.sanitizeSchemaString(invalidString);
+ Assert.assertEquals(actualString, expectedString);
+ // Verify that there's only one slash being added.
+ Assert.assertEquals(actualString.length(), invalidString.length() + 2);
+ }
+
public static List<GenericRecord> getRecordFromFile(String path)
throws IOException {
Configuration config = new Configuration();