This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 48d2eb5  [Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro 
type check in AvroToOrc
48d2eb5 is described below

commit 48d2eb56e0a6b1b570c7cf67489b566976574a66
Author: Zihan Li <[email protected]>
AuthorDate: Fri Nov 22 13:51:55 2019 -0800

    [Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro type check in 
AvroToOrc
    
    Closes #2822 from ZihanLi58/GOBBLIN-975
---
 .../conversion/hive/source/HiveSource.java         | 26 ++++++++++++----------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index fe4f94f..3ff8096 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -106,6 +106,8 @@ import 
org.apache.gobblin.data.management.conversion.hive.extractor.HiveConvertE
 @Alpha
 public class HiveSource implements Source {
 
+  public static final String DISABLE_AVRO_CHAECK = 
"hive.source.disable.avro.check";
+  public static final boolean DEFAULT_DISABLE_AVRO_CHAECK = false;
   public static final String HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS_KEY = 
"hive.source.maximum.lookbackDays";
   public static final int DEFAULT_HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS = 3;
 
@@ -163,7 +165,7 @@ public class HiveSource implements Source {
 
       EventSubmitter.submit(Optional.of(this.eventSubmitter), 
EventConstants.CONVERSION_FIND_HIVE_TABLES_EVENT);
       Iterator<HiveDataset> iterator = 
this.datasetFinder.getDatasetsIterator();
-
+      boolean disableAvroCheck = state.getPropAsBoolean(DISABLE_AVRO_CHAECK, 
DEFAULT_DISABLE_AVRO_CHAECK);
       while (iterator.hasNext()) {
         HiveDataset hiveDataset = iterator.next();
         try (AutoReturnableObject<IMetaStoreClient> client = 
hiveDataset.getClientPool().getClient()) {
@@ -174,9 +176,9 @@ public class HiveSource implements Source {
           if (hiveDataset.getTable().isPartitioned()
               && 
state.getPropAsBoolean(HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS,
               DEFAULT_HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS)) {
-            createWorkunitsForPartitionedTable(hiveDataset, client);
+            createWorkunitsForPartitionedTable(hiveDataset, client, 
disableAvroCheck);
           } else {
-            createWorkunitForNonPartitionedTable(hiveDataset);
+            createWorkunitForNonPartitionedTable(hiveDataset, 
disableAvroCheck);
           }
         }
       }
@@ -221,7 +223,7 @@ public class HiveSource implements Source {
   }
 
 
-  protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset) 
throws IOException {
+  protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset, 
boolean disableAvroCheck) throws IOException {
     // Create workunits for tables
     try {
 
@@ -246,7 +248,7 @@ public class HiveSource implements Source {
             "Creating workunit for table %s as updateTime %s or createTime %s 
is greater than low watermark %s",
             hiveDataset.getTable().getCompleteName(), updateTime, 
hiveDataset.getTable().getTTable().getCreateTime(),
             lowWatermark.getValue()));
-        HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset);
+        HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset, 
disableAvroCheck);
 
         LongWatermark expectedDatasetHighWatermark =
             this.watermarker.getExpectedHighWatermark(hiveDataset.getTable(), 
tableProcessTime);
@@ -274,15 +276,15 @@ public class HiveSource implements Source {
     }
   }
 
-  protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset) throws 
IOException {
+  protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset, boolean 
disableAvroCheck) throws IOException {
     HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset);
-    if (isAvro(hiveDataset.getTable())) {
+    if (disableAvroCheck || isAvro(hiveDataset.getTable())) {
       
hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
     }
     return hiveWorkUnit;
   }
 
-  protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, 
AutoReturnableObject<IMetaStoreClient> client) throws IOException {
+  protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, 
AutoReturnableObject<IMetaStoreClient> client, boolean disableAvroCheck) throws 
IOException {
 
     long tableProcessTime = new DateTime().getMillis();
     this.watermarker.onTableProcessBegin(hiveDataset.getTable(), 
tableProcessTime);
@@ -326,7 +328,7 @@ public class HiveSource implements Source {
           LongWatermark expectedPartitionHighWatermark = 
this.watermarker.getExpectedHighWatermark(sourcePartition,
               tableProcessTime, partitionProcessTime);
 
-          HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset, 
sourcePartition);
+          HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset, 
sourcePartition, disableAvroCheck);
           hiveWorkUnit.setWatermarkInterval(new 
WatermarkInterval(lowWatermark, expectedPartitionHighWatermark));
 
           EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, 
hiveDataset.getTable(), sourcePartition, updateTime,
@@ -354,9 +356,9 @@ public class HiveSource implements Source {
     }
   }
 
-  protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, 
Partition partition) throws IOException {
+  protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, 
Partition partition, boolean disableAvroCheck) throws IOException {
     HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset, partition);
-    if (isAvro(hiveDataset.getTable())) {
+    if (disableAvroCheck || isAvro(hiveDataset.getTable())) {
       
hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
       
hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(partition));
     }
@@ -410,7 +412,7 @@ public class HiveSource implements Source {
    */
   @VisibleForTesting
   public boolean isOlderThanLookback(Partition partition) {
-     return new 
DateTime(getCreateTime(partition)).isBefore(this.maxLookBackTime);
+    return new 
DateTime(getCreateTime(partition)).isBefore(this.maxLookBackTime);
   }
 
   @VisibleForTesting

Reply via email to