[
https://issues.apache.org/jira/browse/GOBBLIN-772?focusedWorklogId=244125&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-244125
]
ASF GitHub Bot logged work on GOBBLIN-772:
------------------------------------------
Author: ASF GitHub Bot
Created on: 17/May/19 16:30
Start Date: 17/May/19 16:30
Worklog Time Spent: 10m
Work Description: autumnust commented on pull request #2637:
[GOBBLIN-772]Implement Schema Comparison Strategy during Disctp
URL: https://github.com/apache/incubator-gobblin/pull/2637#discussion_r285198718
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
##########
@@ -39,32 +42,109 @@
* check if the schema matches the expected schema. If not it will abort the
job.
*/
-public class FileAwareInputStreamExtractorWithCheckSchema extends
FileAwareInputStreamExtractor{
+public class FileAwareInputStreamExtractorWithCheckSchema extends
FileAwareInputStreamExtractor {
- public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs,
CopyableFile file, WorkUnitState state)
- {
+ public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs,
CopyableFile file, WorkUnitState state) {
super(fs, file, state);
}
- public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs,
CopyableFile file)
- {
+
+ public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs,
CopyableFile file) {
this(fs, file, null);
}
@Override
- protected FileAwareInputStream buildStream(FileSystem fsFromFile)
- throws DataRecordException, IOException{
- if(!schemaChecking(fsFromFile))
- {
+ protected FileAwareInputStream buildStream(FileSystem fsFromFile) throws
DataRecordException, IOException {
+ if (!schemaChecking(fsFromFile)) {
throw new DataRecordException("Schema does not match the expected
schema");
}
return super.buildStream(fsFromFile);
}
- protected boolean schemaChecking(FileSystem fsFromFile)
- throws IOException {
+ protected boolean schemaChecking(FileSystem fsFromFile) throws IOException {
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
- DataFileReader<GenericRecord> dataFileReader = new DataFileReader(new
FsInput(this.file.getFileStatus().getPath(),fsFromFile), datumReader);
+ DataFileReader<GenericRecord> dataFileReader =
+ new DataFileReader(new FsInput(this.file.getFileStatus().getPath(),
fsFromFile), datumReader);
Schema schema = dataFileReader.getSchema();
- return
schema.toString().equals(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+ Schema expectedSchema = new
Schema.Parser().parse(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+
+ return compare(schema, expectedSchema);
+ }
+
+ private boolean compare(Schema toValidate, Schema expected) {
+ if (toValidate.getType() != expected.getType() ||
!toValidate.getName().equals(expected.getName())) {return false;}
+ else {
+ switch (toValidate.getType()) {
+ case NULL:
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BYTES:
+ case STRING: {
+ return true;
+ }
+ case ARRAY: {
+ return compare(toValidate.getElementType(),
expected.getElementType());
+ }
+ case MAP: {
+ return compare(toValidate.getValueType(), expected.getValueType());
+ }
+ case FIXED: {
+ // fixed size and name must match:
+ if (toValidate.getFixedSize() != expected.getFixedSize()) {
+ return false;
+ }
+ }
+ case ENUM: {
+ // expected symbols must contain all toValidate symbols:
+ final Set<String> expectedSymbols = new
HashSet<String>(expected.getEnumSymbols());
+ final Set<String> toValidateSymbols = new
HashSet<String>(toValidate.getEnumSymbols());
+ if (expectedSymbols.size() != toValidateSymbols.size()) {
+ return false;
+ }
+ if (!expectedSymbols.containsAll(toValidateSymbols)) {
+ return false;
+ }
+ }
+
+ case RECORD: {
+ // Check that each field of toValidate schema is in expected schema
+ if(toValidate.getFields().size() != expected.getFields().size())
{return false;}
+ for (final Schema.Field expectedFiled : expected.getFields()) {
+ final Schema.Field toValidateField =
toValidate.getField(expectedFiled.name());
+ if (toValidateField == null) {
+ // expected field does not correspond to any field in the
toValidate record schema
+ return false;
+ } else {
+ if (!compare(toValidateField.schema(), expectedFiled.schema())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ case UNION: {
Review comment:
Just want to confirm, do you care about the ordering of fields ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 244125)
Time Spent: 40m (was: 0.5h)
> Implement Schema Comparison Strategy during Disctp
> --------------------------------------------------
>
> Key: GOBBLIN-772
> URL: https://issues.apache.org/jira/browse/GOBBLIN-772
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> We need a schema comparison strategy to make sure the real schema and the
> expected schema have matching field names and types.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)