This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f5db96b [GOBBLIN-899] Add config in replication config to determine
wheter schema cehck ena…
f5db96b is described below
commit f5db96b75b8298105a25e9e84c6d2aa196aed2e4
Author: Zihan Li <[email protected]>
AuthorDate: Fri Oct 11 13:45:40 2019 -0700
[GOBBLIN-899] Add config in replication config to determine wheter schema
cehck ena…
Closes #2753 from ZihanLi58/GOBBLIN-899
---
.../apache/gobblin/data/management/copy/CopySource.java | 9 +++++++--
.../FileAwareInputStreamExtractorWithCheckSchema.java | 10 ++++++++--
.../management/copy/replication/ConfigBasedDataset.java | 2 ++
.../copy/replication/ReplicationConfiguration.java | 16 ++++++++++++++++
.../runtime/embedded/EmbeddedGobblinDistcpTest.java | 2 ++
5 files changed, 35 insertions(+), 4 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 3d1f2c8..e7d2fe9 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -129,6 +129,8 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
public static final String FILESET_NAME = "fileset.name";
public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
public static final String FILESET_TOTAL_SIZE_IN_BYTES =
"fileset.total.size";
+ public static final String SCHEMA_CHECK_ENABLED = "shcema.check.enabled";
+ public final static boolean DEFAULT_SCHEMA_CHECK_ENABLED = false;
private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX
+ ".workUnitWeight";
private final WorkUnitWeighter weighter = new
FieldWeighter(WORK_UNIT_WEIGHT);
@@ -358,8 +360,11 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
WorkUnit workUnit = new WorkUnit(extract);
workUnit.addAll(this.state);
- if(this.copyableDataset instanceof ConfigBasedDataset &&
((ConfigBasedDataset)this.copyableDataset).getExpectedSchema() != null) {
- workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA,
((ConfigBasedDataset)this.copyableDataset).getExpectedSchema());
+ if(this.copyableDataset instanceof ConfigBasedDataset &&
((ConfigBasedDataset)this.copyableDataset).schemaCheckEnabled()) {
+ workUnit.setProp(SCHEMA_CHECK_ENABLED, true);
+ if (((ConfigBasedDataset)
this.copyableDataset).getExpectedSchema() != null) {
+ workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA,
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
+ }
}
serializeCopyEntity(workUnit, copyEntity);
serializeCopyableDataset(workUnit, metadata);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
index eb4f627..a5c7c8d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
+import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.util.schema_check.AvroSchemaCheckStrategy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -66,14 +67,19 @@ public class FileAwareInputStreamExtractorWithCheckSchema
extends FileAwareInput
* @throws IOException
*/
protected boolean schemaChecking(FileSystem fsFromFile) throws IOException {
+ if( !this.state.getPropAsBoolean(CopySource.SCHEMA_CHECK_ENABLED,
CopySource.DEFAULT_SCHEMA_CHECK_ENABLED) ) {
+ return true;
+ }
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
DataFileReader<GenericRecord> dataFileReader =
new DataFileReader(new FsInput(this.file.getFileStatus().getPath(),
new Configuration()), datumReader);
Schema schema = dataFileReader.getSchema();
+ if(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA) == null) {
+ throw new IOException("Expected schema is not set properly");
+ }
Schema expectedSchema = new
Schema.Parser().parse(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
AvroSchemaCheckStrategy strategy =
AvroSchemaCheckStrategy.AvroSchemaCheckStrategyFactory.create(this.state);
- if(strategy == null)
- {
+ if(strategy == null) {
throw new IOException("schema check strategy cannot be initialized");
}
return strategy.compare(expectedSchema,schema);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 4d99e3a..57f952f 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -164,6 +164,8 @@ public class ConfigBasedDataset implements CopyableDataset {
}
}
+ public boolean schemaCheckEnabled() { return this.rc.isSchemaCheckEnabled();
}
+
@Override
public String datasetURN() {
return this.datasetURN;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
index be07d50..2d7abc1 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
@@ -32,6 +32,7 @@ import com.typesafe.config.Config;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
import lombok.Getter;
@@ -71,6 +72,10 @@ public class ReplicationConfiguration {
public static final String REPLICATION_DATA_CATETORY_TYPE =
"replicationDataCategoryType";
public static final String REPLICATION_DATA_FINITE_INSTANCE =
"replicationDataFiniteInstance";
+ public static final String COPY_SCHEMA_CHECK_ENABLED =
"gobblin.copy.schemaCheck.enabled";
+
+ public static final boolean DEFAULT_COPY_SCHEMA_CHECK_ENABLED = true;
+
//copy route generator
public static final String DELETE_TARGET_IFNOT_ON_SOURCE = "deleteTarget";
@@ -98,6 +103,9 @@ public class ReplicationConfiguration {
private final ReplicationCopyMode copyMode;
@Getter
+ private final boolean schemaCheckEnabled;
+
+ @Getter
private final Config selectionConfig;
@Getter
@@ -142,6 +150,7 @@ public class ReplicationConfiguration {
.withDeleteTarget(config)
.withVersionStrategyFromConfigStore(config)
.withEnforceFileSizeMatchFromConfigStore(config)
+ .withSchemaCheckEnabled(config)
.build();
}
@@ -156,9 +165,11 @@ public class ReplicationConfiguration {
this.deleteTargetIfNotExistOnSource =
builder.deleteTargetIfNotExistOnSource;
this.versionStrategyFromConfigStore =
builder.versionStrategyFromConfigStore;
this.enforceFileSizeMatchFromConfigStore =
builder.enforceFileMatchFromConfigStore;
+ this.schemaCheckEnabled = builder.schemaCheckEnabled;
}
private static class Builder {
+ private boolean schemaCheckEnabled;
private ReplicationMetaData metaData;
@@ -193,6 +204,11 @@ public class ReplicationConfiguration {
return this;
}
+ public Builder withSchemaCheckEnabled(Config config) {
+ this.schemaCheckEnabled = ConfigUtils.getBoolean(config,
COPY_SCHEMA_CHECK_ENABLED, DEFAULT_COPY_SCHEMA_CHECK_ENABLED);
+ return this;
+ }
+
public Builder withVersionStrategyFromConfigStore(Config config) {
this.versionStrategyFromConfigStore =
config.hasPath(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY)?
Optional.of(config.getString(DataFileVersionStrategy.DATA_FILE_VERSION_STRATEGY_KEY))
:
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 60d93a5..18da5e8 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.converter.GobblinMetricsPinotFlattenerConverter;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.SchemaCheckedCopySource;
import org.apache.gobblin.runtime.api.JobExecutionResult;
import org.apache.gobblin.util.PathUtils;
@@ -115,6 +116,7 @@ public class EmbeddedGobblinDistcpTest {
EmbeddedGobblinDistcp embedded = new EmbeddedGobblinDistcp(new
Path(tmpSource.getAbsolutePath()),
new Path(tmpTarget.getAbsolutePath()));
+ embedded.setConfiguration(CopySource.SCHEMA_CHECK_ENABLED, "true");
embedded.setLaunchTimeout(30, TimeUnit.SECONDS);
embedded.setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY,
SchemaCheckedCopySource.class.getName());
embedded.setConfiguration(ConfigurationKeys.AVRO_SCHEMA_CHECK_STRATEGY,
"org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy");