This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f5db96b  [GOBBLIN-899] Add config in replication config to determine 
wheter schema cehck ena…
f5db96b is described below

commit f5db96b75b8298105a25e9e84c6d2aa196aed2e4
Author: Zihan Li <[email protected]>
AuthorDate: Fri Oct 11 13:45:40 2019 -0700

    [GOBBLIN-899] Add config in replication config to determine wheter schema 
cehck ena…
    
    Closes #2753 from ZihanLi58/GOBBLIN-899
---
 .../apache/gobblin/data/management/copy/CopySource.java  |  9 +++++++--
 .../FileAwareInputStreamExtractorWithCheckSchema.java    | 10 ++++++++--
 .../management/copy/replication/ConfigBasedDataset.java  |  2 ++
 .../copy/replication/ReplicationConfiguration.java       | 16 ++++++++++++++++
 .../runtime/embedded/EmbeddedGobblinDistcpTest.java      |  2 ++
 5 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 3d1f2c8..e7d2fe9 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -129,6 +129,8 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
   public static final String FILESET_NAME = "fileset.name";
   public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
   public static final String FILESET_TOTAL_SIZE_IN_BYTES = 
"fileset.total.size";
+  public static final String SCHEMA_CHECK_ENABLED = "shcema.check.enabled";
+  public final static boolean DEFAULT_SCHEMA_CHECK_ENABLED = false;
 
   private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX 
+ ".workUnitWeight";
   private final WorkUnitWeighter weighter = new 
FieldWeighter(WORK_UNIT_WEIGHT);
@@ -358,8 +360,11 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
 
           WorkUnit workUnit = new WorkUnit(extract);
           workUnit.addAll(this.state);
-          if(this.copyableDataset instanceof ConfigBasedDataset && 
((ConfigBasedDataset)this.copyableDataset).getExpectedSchema() != null) {
-            workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
((ConfigBasedDataset)this.copyableDataset).getExpectedSchema());
+          if(this.copyableDataset instanceof ConfigBasedDataset && 
((ConfigBasedDataset)this.copyableDataset).schemaCheckEnabled()) {
+            workUnit.setProp(SCHEMA_CHECK_ENABLED, true);
+            if (((ConfigBasedDataset) 
this.copyableDataset).getExpectedSchema() != null) {
+              workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
+            }
           }
           serializeCopyEntity(workUnit, copyEntity);
           serializeCopyableDataset(workUnit, metadata);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
index eb4f627..a5c7c8d 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
+import org.apache.gobblin.data.management.copy.CopySource;
 import org.apache.gobblin.util.schema_check.AvroSchemaCheckStrategy;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -66,14 +67,19 @@ public class FileAwareInputStreamExtractorWithCheckSchema 
extends FileAwareInput
    * @throws IOException
    */
   protected boolean schemaChecking(FileSystem fsFromFile) throws IOException {
+    if( !this.state.getPropAsBoolean(CopySource.SCHEMA_CHECK_ENABLED, 
CopySource.DEFAULT_SCHEMA_CHECK_ENABLED) ) {
+      return true;
+    }
     DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
     DataFileReader<GenericRecord> dataFileReader =
         new DataFileReader(new FsInput(this.file.getFileStatus().getPath(), 
new Configuration()), datumReader);
     Schema schema = dataFileReader.getSchema();
+    if(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA) == null) {
+      throw new IOException("Expected schema is not set properly");
+    }
     Schema expectedSchema = new 
Schema.Parser().parse(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
     AvroSchemaCheckStrategy strategy = 
AvroSchemaCheckStrategy.AvroSchemaCheckStrategyFactory.create(this.state);
-    if(strategy == null)
-    {
+    if(strategy == null) {
       throw new IOException("schema check strategy cannot be initialized");
     }
     return strategy.compare(expectedSchema,schema);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 4d99e3a..57f952f 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -164,6 +164,8 @@ public class ConfigBasedDataset implements CopyableDataset {
     }
   }
 
+  public boolean schemaCheckEnabled() { return this.rc.isSchemaCheckEnabled(); 
}
+
   @Override
   public String datasetURN() {
     return this.datasetURN;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
index be07d50..2d7abc1 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
@@ -32,6 +32,7 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
 
 import lombok.Getter;
@@ -71,6 +72,10 @@ public class ReplicationConfiguration {
   public static final String REPLICATION_DATA_CATETORY_TYPE = 
"replicationDataCategoryType";
   public static final String REPLICATION_DATA_FINITE_INSTANCE = 
"replicationDataFiniteInstance";
 
+  public static final String COPY_SCHEMA_CHECK_ENABLED = 
"gobblin.copy.schemaCheck.enabled";
+
+  public static final boolean DEFAULT_COPY_SCHEMA_CHECK_ENABLED = true;
+
   //copy route generator
   public static final String DELETE_TARGET_IFNOT_ON_SOURCE = "deleteTarget";
 
@@ -98,6 +103,9 @@ public class ReplicationConfiguration {
   private final ReplicationCopyMode copyMode;
 
   @Getter
+  private final boolean schemaCheckEnabled;
+
+  @Getter
   private final Config selectionConfig;
 
   @Getter
@@ -142,6 +150,7 @@ public class ReplicationConfiguration {
         .withDeleteTarget(config)
         .withVersionStrategyFromConfigStore(config)
         .withEnforceFileSizeMatchFromConfigStore(config)
+        .withSchemaCheckEnabled(config)
         .build();
   }
 
@@ -156,9 +165,11 @@ public class ReplicationConfiguration {
     this.deleteTargetIfNotExistOnSource = 
builder.deleteTargetIfNotExistOnSource;
     this.versionStrategyFromConfigStore = 
builder.versionStrategyFromConfigStore;
     this.enforceFileSizeMatchFromConfigStore = 
builder.enforceFileMatchFromConfigStore;
+    this.schemaCheckEnabled = builder.schemaCheckEnabled;
   }
 
   private static class Builder {
+    private boolean schemaCheckEnabled;
 
     private ReplicationMetaData metaData;
 
@@ -193,6 +204,11 @@ public class ReplicationConfiguration {
       return this;
     }
 
+    public Builder withSchemaCheckEnabled(Config config) {
+      this.schemaCheckEnabled = ConfigUtils.getBoolean(config, 
COPY_SCHEMA_CHECK_ENABLED, DEFAULT_COPY_SCHEMA_CHECK_ENABLED);
+      return this;
+    }
+
     public Builder withVersionStrategyFromConfigStore(Config config) {
       this.versionStrategyFromConfigStore = 
config.hasPath(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY)?
           
Optional.of(config.getString(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY))
 :
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 60d93a5..18da5e8 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.converter.GobblinMetricsPinotFlattenerConverter;
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopySource;
 import org.apache.gobblin.data.management.copy.SchemaCheckedCopySource;
 import org.apache.gobblin.runtime.api.JobExecutionResult;
 import org.apache.gobblin.util.PathUtils;
@@ -115,6 +116,7 @@ public class EmbeddedGobblinDistcpTest {
 
     EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new 
Path(tmpSource.getAbsolutePath()),
         new Path(tmpTarget.getAbsolutePath()));
+    embedded.setConfiguration(CopySource.SCHEMA_CHECK_ENABLED, "true");
     embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
     embedded.setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY, 
SchemaCheckedCopySource.class.getName());
     embedded.setConfiguration(ConfigurationKeys.AVRO_SCHEMA_CHECK_STRATEGY, 
"org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy");

Reply via email to