autumnust commented on a change in pull request #2637: [GOBBLIN-772]Implement 
Schema Comparison Strategy during Disctp
URL: https://github.com/apache/incubator-gobblin/pull/2637#discussion_r285199219
 
 

 ##########
 File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
 ##########
 @@ -39,32 +42,109 @@
  * check if the schema matches the expected schema. If not it will abort the 
job.
  */
 
-public class FileAwareInputStreamExtractorWithCheckSchema extends 
FileAwareInputStreamExtractor{
+public class FileAwareInputStreamExtractorWithCheckSchema extends 
FileAwareInputStreamExtractor {
 
-  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, 
CopyableFile file, WorkUnitState state)
-  {
+  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, 
CopyableFile file, WorkUnitState state) {
     super(fs, file, state);
   }
-  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, 
CopyableFile file)
-  {
+
+  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, 
CopyableFile file) {
     this(fs, file, null);
   }
 
   @Override
-  protected FileAwareInputStream buildStream(FileSystem fsFromFile)
-      throws DataRecordException, IOException{
-    if(!schemaChecking(fsFromFile))
-    {
+  protected FileAwareInputStream buildStream(FileSystem fsFromFile) throws 
DataRecordException, IOException {
+    if (!schemaChecking(fsFromFile)) {
       throw new DataRecordException("Schema does not match the expected 
schema");
     }
     return super.buildStream(fsFromFile);
   }
 
-  protected boolean schemaChecking(FileSystem fsFromFile)
-      throws IOException {
+  protected boolean schemaChecking(FileSystem fsFromFile) throws IOException {
     DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
-    DataFileReader<GenericRecord> dataFileReader = new DataFileReader(new 
FsInput(this.file.getFileStatus().getPath(),fsFromFile), datumReader);
+    DataFileReader<GenericRecord> dataFileReader =
+        new DataFileReader(new FsInput(this.file.getFileStatus().getPath(), 
fsFromFile), datumReader);
     Schema schema = dataFileReader.getSchema();
-    return 
schema.toString().equals(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+    Schema expectedSchema = new 
Schema.Parser().parse(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+
+    return compare(schema, expectedSchema);
+  }
+
+  private boolean compare(Schema toValidate, Schema expected) {
 
 Review comment:
   Depending on the scope of your schema checking, this method can be reused 
for other purposes. May be make it static method or put in some utilities 
classes? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to