This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 48d2eb5 [Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro
type check in AvroToOrc
48d2eb5 is described below
commit 48d2eb56e0a6b1b570c7cf67489b566976574a66
Author: Zihan Li <[email protected]>
AuthorDate: Fri Nov 22 13:51:55 2019 -0800
[Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro type check in
AvroToOrc
Closes #2822 from ZihanLi58/GOBBLIN-975
---
.../conversion/hive/source/HiveSource.java | 26 ++++++++++++----------
1 file changed, 14 insertions(+), 12 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index fe4f94f..3ff8096 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -106,6 +106,8 @@ import
org.apache.gobblin.data.management.conversion.hive.extractor.HiveConvertE
@Alpha
public class HiveSource implements Source {
+ public static final String DISABLE_AVRO_CHAECK =
"hive.source.disable.avro.check";
+ public static final boolean DEFAULT_DISABLE_AVRO_CHAECK = false;
public static final String HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS_KEY =
"hive.source.maximum.lookbackDays";
public static final int DEFAULT_HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS = 3;
@@ -163,7 +165,7 @@ public class HiveSource implements Source {
EventSubmitter.submit(Optional.of(this.eventSubmitter),
EventConstants.CONVERSION_FIND_HIVE_TABLES_EVENT);
Iterator<HiveDataset> iterator =
this.datasetFinder.getDatasetsIterator();
-
+ boolean disableAvroCheck = state.getPropAsBoolean(DISABLE_AVRO_CHAECK,
DEFAULT_DISABLE_AVRO_CHAECK);
while (iterator.hasNext()) {
HiveDataset hiveDataset = iterator.next();
try (AutoReturnableObject<IMetaStoreClient> client =
hiveDataset.getClientPool().getClient()) {
@@ -174,9 +176,9 @@ public class HiveSource implements Source {
if (hiveDataset.getTable().isPartitioned()
&&
state.getPropAsBoolean(HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS,
DEFAULT_HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS)) {
- createWorkunitsForPartitionedTable(hiveDataset, client);
+ createWorkunitsForPartitionedTable(hiveDataset, client,
disableAvroCheck);
} else {
- createWorkunitForNonPartitionedTable(hiveDataset);
+ createWorkunitForNonPartitionedTable(hiveDataset,
disableAvroCheck);
}
}
}
@@ -221,7 +223,7 @@ public class HiveSource implements Source {
}
- protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset)
throws IOException {
+ protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset,
boolean disableAvroCheck) throws IOException {
// Create workunits for tables
try {
@@ -246,7 +248,7 @@ public class HiveSource implements Source {
"Creating workunit for table %s as updateTime %s or createTime %s
is greater than low watermark %s",
hiveDataset.getTable().getCompleteName(), updateTime,
hiveDataset.getTable().getTTable().getCreateTime(),
lowWatermark.getValue()));
- HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset);
+ HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset,
disableAvroCheck);
LongWatermark expectedDatasetHighWatermark =
this.watermarker.getExpectedHighWatermark(hiveDataset.getTable(),
tableProcessTime);
@@ -274,15 +276,15 @@ public class HiveSource implements Source {
}
}
- protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset) throws
IOException {
+ protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset, boolean
disableAvroCheck) throws IOException {
HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset);
- if (isAvro(hiveDataset.getTable())) {
+ if (disableAvroCheck || isAvro(hiveDataset.getTable())) {
hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
}
return hiveWorkUnit;
}
- protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset,
AutoReturnableObject<IMetaStoreClient> client) throws IOException {
+ protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset,
AutoReturnableObject<IMetaStoreClient> client, boolean disableAvroCheck) throws
IOException {
long tableProcessTime = new DateTime().getMillis();
this.watermarker.onTableProcessBegin(hiveDataset.getTable(),
tableProcessTime);
@@ -326,7 +328,7 @@ public class HiveSource implements Source {
LongWatermark expectedPartitionHighWatermark =
this.watermarker.getExpectedHighWatermark(sourcePartition,
tableProcessTime, partitionProcessTime);
- HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset,
sourcePartition);
+ HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset,
sourcePartition, disableAvroCheck);
hiveWorkUnit.setWatermarkInterval(new
WatermarkInterval(lowWatermark, expectedPartitionHighWatermark));
EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit,
hiveDataset.getTable(), sourcePartition, updateTime,
@@ -354,9 +356,9 @@ public class HiveSource implements Source {
}
}
- protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset,
Partition partition) throws IOException {
+ protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset,
Partition partition, boolean disableAvroCheck) throws IOException {
HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset, partition);
- if (isAvro(hiveDataset.getTable())) {
+ if (disableAvroCheck || isAvro(hiveDataset.getTable())) {
hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(partition));
}
@@ -410,7 +412,7 @@ public class HiveSource implements Source {
*/
@VisibleForTesting
public boolean isOlderThanLookback(Partition partition) {
- return new
DateTime(getCreateTime(partition)).isBefore(this.maxLookBackTime);
+ return new
DateTime(getCreateTime(partition)).isBefore(this.maxLookBackTime);
}
@VisibleForTesting