This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 11ecc32  [GOBBLIN-884] Support ORC schema evolution across mappers in 
MR mode
11ecc32 is described below

commit 11ecc32439478474a7c72d1f2a950eb13c0bd7f9
Author: autumnust <[email protected]>
AuthorDate: Thu Sep 19 09:40:48 2019 -0700

    [GOBBLIN-884] Support ORC schema evolution across mappers in MR mode
    
    Closes #2737 from
    autumnust/orcMapperSchemaResolving
---
 .../compaction/mapreduce/orc/OrcValueMapper.java   | 143 ++++++++++++++++++++-
 .../mapreduce/AvroCompactionTaskTest.java          |  34 ++++-
 .../mapreduce/OrcCompactionTaskTest.java           |  27 +++-
 .../mapreduce/orc/OrcValueMapperTest.java          |  37 ++++++
 4 files changed, 229 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index 0dd3b5c..a023977 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -18,9 +18,17 @@
 package org.apache.gobblin.compaction.mapreduce.orc;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.ConvertTreeReaderFactory;
+import org.apache.orc.impl.SchemaEvolution;
 import org.apache.orc.mapred.OrcKey;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
@@ -36,28 +44,153 @@ public class OrcValueMapper extends 
RecordKeyMapperBase<NullWritable, OrcStruct,
 
   private OrcKey outKey;
   private OrcValue outValue;
+  private TypeDescription mapperSchema;
 
   @Override
-  protected void setup(Context context) throws IOException, 
InterruptedException {
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
     super.setup(context);
     this.outKey = new OrcKey();
     this.outValue = new OrcValue();
+    this.mapperSchema = 
TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
   }
 
   @Override
-  protected void map(NullWritable key, OrcStruct orcStruct, Context context) 
throws IOException, InterruptedException {
+  protected void map(NullWritable key, OrcStruct orcStruct, Context context)
+      throws IOException, InterruptedException {
+    OrcStruct upConvertedStruct = upConvertOrcStruct(orcStruct, context);
     if (context.getNumReduceTasks() == 0) {
-      this.outKey.key = orcStruct;
+      this.outKey.key = upConvertedStruct;
       context.write(this.outKey, NullWritable.get());
     } else {
-      this.outValue.value = orcStruct;
-      context.write(getDedupKey(orcStruct), this.outValue);
+      this.outValue.value = upConvertedStruct;
+      context.write(getDedupKey(upConvertedStruct), this.outValue);
     }
 
     context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
   }
 
   /**
+   * If a {@link OrcStruct}'s schema differs from newest schema obtained when 
creating MR jobs (which is the
+   * newest schema seen by the MR job), all the other ORC object will need to 
be up-converted.
+   */
+  OrcStruct upConvertOrcStruct(OrcStruct orcStruct, Context context) {
+    // For ORC schema, if schema object differs that means schema itself is 
different while for Avro,
+    // there are chances that documentation or attributes' difference lead to 
the schema object difference.
+    if (!orcStruct.getSchema().equals(mapperSchema)) {
+      OrcStruct newStruct = new OrcStruct(mapperSchema);
+
+      int indexInNewSchema = 0;
+      List<String> oldSchemaFieldNames = orcStruct.getSchema().getFieldNames();
+      List<TypeDescription> oldSchemaTypes = 
orcStruct.getSchema().getChildren();
+      List<TypeDescription> newSchemaTypes = mapperSchema.getChildren();
+
+      for (String field : mapperSchema.getFieldNames()) {
+        if (oldSchemaFieldNames.contains(field)) {
+          int fieldIndex = oldSchemaFieldNames.indexOf(field);
+
+          TypeDescription fileType = oldSchemaTypes.get(fieldIndex);
+          TypeDescription readerType = newSchemaTypes.get(indexInNewSchema);
+
+          if (isEvolutionValid(fileType, readerType)) {
+            newStruct.setFieldValue(field, orcStruct.getFieldValue(field));
+          } else {
+            throw new SchemaEvolution.IllegalEvolutionException(String
+                .format("ORC does not support type conversion from file" + " 
type %s to reader type %s ",
+                    fileType.toString(), readerType.toString()));
+          }
+        } else {
+          newStruct.setFieldValue(field, null);
+        }
+
+        indexInNewSchema++;
+      }
+
+      return newStruct;
+    } else {
+      return orcStruct;
+    }
+  }
+
+  /**
+   * Determine if two types are following valid evolution.
+   * Implementation stolen and manipulated from {@link SchemaEvolution} as 
that was package-private.
+   */
+  static boolean isEvolutionValid(TypeDescription fileType, TypeDescription 
readerType) {
+    boolean isOk = true;
+    if (fileType.getCategory() == readerType.getCategory()) {
+      switch (readerType.getCategory()) {
+        case BOOLEAN:
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+        case DOUBLE:
+        case FLOAT:
+        case STRING:
+        case TIMESTAMP:
+        case BINARY:
+        case DATE:
+          // these are always a match
+          break;
+        case CHAR:
+        case VARCHAR:
+          break;
+        case DECIMAL:
+          break;
+        case UNION:
+        case MAP:
+        case LIST: {
+          // these must be an exact match
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          if (fileChildren.size() == readerChildren.size()) {
+            for (int i = 0; i < fileChildren.size(); ++i) {
+              isOk &= isEvolutionValid(fileChildren.get(i), 
readerChildren.get(i));
+            }
+            return isOk;
+          } else {
+            return false;
+          }
+        }
+        case STRUCT: {
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          List<TypeDescription> fileChildren = fileType.getChildren();
+
+          List<String> readerFieldNames = readerType.getFieldNames();
+          List<String> fileFieldNames = fileType.getFieldNames();
+
+          final Map<String, TypeDescription> fileTypesIdx = new HashMap();
+          for (int i = 0; i < fileFieldNames.size(); i++) {
+            final String fileFieldName = fileFieldNames.get(i);
+            fileTypesIdx.put(fileFieldName, fileChildren.get(i));
+          }
+
+          for (int i = 0; i < readerFieldNames.size(); i++) {
+            final String readerFieldName = readerFieldNames.get(i);
+            TypeDescription readerField = readerChildren.get(i);
+            TypeDescription fileField = fileTypesIdx.get(readerFieldName);
+            if (fileField == null) {
+              continue;
+            }
+
+            isOk &= isEvolutionValid(fileField, readerField);
+          }
+          return isOk;
+        }
+        default:
+          throw new IllegalArgumentException("Unknown type " + readerType);
+      }
+      return isOk;
+    } else {
+      /*
+       * Check for the few cases where will not convert....
+       */
+      return ConvertTreeReaderFactory.canConvert(fileType, readerType);
+    }
+  }
+
+  /**
    * By default, dedup key contains the whole ORC record, except MAP since 
{@link org.apache.orc.mapred.OrcMap} is
    * an implementation of {@link java.util.TreeMap} which doesn't accept 
difference of records within the map in comparison.
    */
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 909d97c..a39658e 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.URI;
 
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Optional;
 import com.google.common.io.Files;
 
 import lombok.extern.slf4j.Slf4j;
@@ -72,8 +74,11 @@ public class AvroCompactionTaskTest {
 
     GenericRecord r1 = createRandomRecord();
     GenericRecord r2 = createRandomRecord();
+    GenericRecord r3= createEvolvedSchemaRecord();
     writeFileWithContent(jobDir, "file1", r1, 20);
     writeFileWithContent(jobDir, "file2", r2, 18);
+    File newestFile = writeFileWithContent(jobDir, "file3", r3, 10, 
r3.getSchema());
+    newestFile.setLastModified(Long.MAX_VALUE);
 
     EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("dedup", 
basePath.getAbsolutePath().toString());
     JobExecutionResult result = embeddedGobblin.run();
@@ -128,10 +133,19 @@ public class AvroCompactionTaskTest {
     Assert.assertTrue(fs.exists(new Path (basePath, 
"Identity/MemberAccount/hourly/2017/04/03/10")));
   }
 
-  private void writeFileWithContent(File dir, String fileName, GenericRecord 
r, int count) throws IOException {
+  // Returning file handler for setting modfication time.
+  private File writeFileWithContent(File dir, String fileName, GenericRecord 
r, int count) throws IOException {
     File file = new File(dir, fileName + "." + count + ".avro");
     Assert.assertTrue(file.createNewFile());
-    this.createAvroFileWithRepeatingRecords(file, r, count);
+    this.createAvroFileWithRepeatingRecords(file, r, count, Optional.absent());
+    return file;
+  }
+
+  private File writeFileWithContent(File dir, String fileName, GenericRecord 
r, int count, Schema schema) throws IOException {
+    File file = new File(dir, fileName + "." + count + ".avro");
+    Assert.assertTrue(file.createNewFile());
+    this.createAvroFileWithRepeatingRecords(file, r, count, 
Optional.of(schema));
+    return file;
   }
 
   public Schema getSchema() {
@@ -154,9 +168,21 @@ public class AvroCompactionTaskTest {
     return record;
   }
 
-  public void createAvroFileWithRepeatingRecords(File file, GenericRecord r, 
int count) throws IOException {
+  public GenericRecord createEvolvedSchemaRecord() {
+    Schema evolvedSchema =
+        SchemaBuilder.record("evolved").fields()
+            
.requiredLong("partitionKey").requiredString("environment").requiredString("subKey").optionalString("oppo").endRecord();
+    GenericRecordBuilder keyRecordBuilder = new 
GenericRecordBuilder(evolvedSchema);
+    keyRecordBuilder.set("partitionKey", new Long(1));
+    keyRecordBuilder.set("environment", "test");
+    keyRecordBuilder.set("subKey", "2");
+    keyRecordBuilder.set("oppo", "poop");
+    return keyRecordBuilder.build();
+  }
+
+  public void createAvroFileWithRepeatingRecords(File file, GenericRecord r, 
int count, Optional<Schema> schema) throws IOException {
       DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new 
GenericDatumWriter<GenericRecord>());
-      writer.create(getSchema(), new FileOutputStream(file));
+      writer.create(schema.isPresent() ? schema.get() : getSchema(), new 
FileOutputStream(file));
       for (int i = 0; i < count; ++i) {
         writer.append(r);
       }
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index 2d76549..da90579 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -82,10 +82,21 @@ public class OrcCompactionTaskTest {
     orcStruct_3.setFieldValue("i", new IntWritable(4));
     orcStruct_3.setFieldValue("j", new IntWritable(5));
 
+    // Writing a file with evolved schema.
+    TypeDescription evolvedSchema = 
TypeDescription.fromString("struct<i:int,j:int,k:int>");
+    OrcStruct orcStruct_4 = (OrcStruct) OrcStruct.createValue(evolvedSchema);
+    orcStruct_4.setFieldValue("i", new IntWritable(5));
+    orcStruct_4.setFieldValue("j", new IntWritable(6));
+    orcStruct_4.setFieldValue("k", new IntWritable(7));
+
     File file_0 = new File(jobDir, "file_0");
     File file_1 = new File(jobDir, "file_1");
+    File file_2 = new File(jobDir, "file_2");
     writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_0, orcStruct_2));
     writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_1, orcStruct_3));
+    writeOrcRecordsInFile(new Path(file_2.getAbsolutePath()), evolvedSchema, 
ImmutableList.of(orcStruct_4));
+    // Make this is the newest.
+    file_2.setLastModified(Long.MAX_VALUE);
 
     // Verify execution
 
@@ -113,13 +124,19 @@ public class OrcCompactionTaskTest {
 
     Assert.assertTrue(statuses.size() == 1);
     List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
-    Assert.assertEquals(result.size(), 3);
+    Assert.assertEquals(result.size(), 4);
     Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
     Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+    Assert.assertNull(result.get(0).getFieldValue("k"));
     Assert.assertEquals(result.get(1).getFieldValue("i"), new IntWritable(2));
     Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(3));
+    Assert.assertNull(result.get(1).getFieldValue("k"));
     Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(4));
     Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(5));
+    Assert.assertNull(result.get(2).getFieldValue("k"));
+    Assert.assertEquals(result.get(3).getFieldValue("i"), new IntWritable(5));
+    Assert.assertEquals(result.get(3).getFieldValue("j"), new IntWritable(6));
+    Assert.assertEquals(result.get(3).getFieldValue("k"), new IntWritable(7));
   }
 
   /**
@@ -143,8 +160,12 @@ public class OrcCompactionTaskTest {
   private OrcStruct copyIntOrcStruct(OrcStruct record) {
     OrcStruct result = new OrcStruct(record.getSchema());
     for (int i = 0 ; i < record.getNumFields() ; i ++ ) {
-      IntWritable newCopy = new IntWritable(((IntWritable) 
record.getFieldValue(i)).get());
-      result.setFieldValue(i, newCopy);
+      if (record.getFieldValue(i) != null) {
+        IntWritable newCopy = new IntWritable(((IntWritable) 
record.getFieldValue(i)).get());
+        result.setFieldValue(i, newCopy);
+      } else {
+        result.setFieldValue(i, null);
+      }
     }
     return result;
   }
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
new file mode 100644
index 0000000..2304bdc
--- /dev/null
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+
+public class OrcValueMapperTest {
+  @Test
+  public void testIsEvolutionValid() {
+    TypeDescription schema_1 = 
TypeDescription.fromString("struct<i:int,j:int,k:int>");
+    TypeDescription schema_2 = 
TypeDescription.fromString("struct<i:int,j:int,k:bigint>");
+    TypeDescription schema_3 = 
TypeDescription.fromString("struct<i:int,j:int,k:tinyint>");
+    TypeDescription schema_4 = 
TypeDescription.fromString("struct<i:int,j:int>");
+    Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_2));
+    Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_3));
+    Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_4));
+    Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_4, schema_1));
+  }
+}
\ No newline at end of file

Reply via email to