This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 11ecc32 [GOBBLIN-884] Support ORC schema evolution across mappers in
MR mode
11ecc32 is described below
commit 11ecc32439478474a7c72d1f2a950eb13c0bd7f9
Author: autumnust <[email protected]>
AuthorDate: Thu Sep 19 09:40:48 2019 -0700
[GOBBLIN-884] Support ORC schema evolution across mappers in MR mode
Closes #2737 from
autumnust/orcMapperSchemaResolving
---
.../compaction/mapreduce/orc/OrcValueMapper.java | 143 ++++++++++++++++++++-
.../mapreduce/AvroCompactionTaskTest.java | 34 ++++-
.../mapreduce/OrcCompactionTaskTest.java | 27 +++-
.../mapreduce/orc/OrcValueMapperTest.java | 37 ++++++
4 files changed, 229 insertions(+), 12 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index 0dd3b5c..a023977 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -18,9 +18,17 @@
package org.apache.gobblin.compaction.mapreduce.orc;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.ConvertTreeReaderFactory;
+import org.apache.orc.impl.SchemaEvolution;
import org.apache.orc.mapred.OrcKey;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapred.OrcValue;
@@ -36,28 +44,153 @@ public class OrcValueMapper extends
RecordKeyMapperBase<NullWritable, OrcStruct,
private OrcKey outKey;
private OrcValue outValue;
+ private TypeDescription mapperSchema;
@Override
- protected void setup(Context context) throws IOException,
InterruptedException {
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
super.setup(context);
this.outKey = new OrcKey();
this.outValue = new OrcValue();
+ this.mapperSchema =
TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
}
@Override
- protected void map(NullWritable key, OrcStruct orcStruct, Context context)
throws IOException, InterruptedException {
+ protected void map(NullWritable key, OrcStruct orcStruct, Context context)
+ throws IOException, InterruptedException {
+ OrcStruct upConvertedStruct = upConvertOrcStruct(orcStruct, context);
if (context.getNumReduceTasks() == 0) {
- this.outKey.key = orcStruct;
+ this.outKey.key = upConvertedStruct;
context.write(this.outKey, NullWritable.get());
} else {
- this.outValue.value = orcStruct;
- context.write(getDedupKey(orcStruct), this.outValue);
+ this.outValue.value = upConvertedStruct;
+ context.write(getDedupKey(upConvertedStruct), this.outValue);
}
context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
}
/**
+ * If a {@link OrcStruct}'s schema differs from newest schema obtained when
creating MR jobs (which is the
+ * newest schema seen by the MR job), all the other ORC object will need to
be up-converted.
+ */
+ OrcStruct upConvertOrcStruct(OrcStruct orcStruct, Context context) {
+ // For ORC schema, if schema object differs that means schema itself is
different while for Avro,
+ // there are chances that documentation or attributes' difference lead to
the schema object difference.
+ if (!orcStruct.getSchema().equals(mapperSchema)) {
+ OrcStruct newStruct = new OrcStruct(mapperSchema);
+
+ int indexInNewSchema = 0;
+ List<String> oldSchemaFieldNames = orcStruct.getSchema().getFieldNames();
+ List<TypeDescription> oldSchemaTypes =
orcStruct.getSchema().getChildren();
+ List<TypeDescription> newSchemaTypes = mapperSchema.getChildren();
+
+ for (String field : mapperSchema.getFieldNames()) {
+ if (oldSchemaFieldNames.contains(field)) {
+ int fieldIndex = oldSchemaFieldNames.indexOf(field);
+
+ TypeDescription fileType = oldSchemaTypes.get(fieldIndex);
+ TypeDescription readerType = newSchemaTypes.get(indexInNewSchema);
+
+ if (isEvolutionValid(fileType, readerType)) {
+ newStruct.setFieldValue(field, orcStruct.getFieldValue(field));
+ } else {
+ throw new SchemaEvolution.IllegalEvolutionException(String
+ .format("ORC does not support type conversion from file" + "
type %s to reader type %s ",
+ fileType.toString(), readerType.toString()));
+ }
+ } else {
+ newStruct.setFieldValue(field, null);
+ }
+
+ indexInNewSchema++;
+ }
+
+ return newStruct;
+ } else {
+ return orcStruct;
+ }
+ }
+
+ /**
+ * Determine if two types are following valid evolution.
+ * Implementation stolen and manipulated from {@link SchemaEvolution} as
that was package-private.
+ */
+ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription
readerType) {
+ boolean isOk = true;
+ if (fileType.getCategory() == readerType.getCategory()) {
+ switch (readerType.getCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case DOUBLE:
+ case FLOAT:
+ case STRING:
+ case TIMESTAMP:
+ case BINARY:
+ case DATE:
+ // these are always a match
+ break;
+ case CHAR:
+ case VARCHAR:
+ break;
+ case DECIMAL:
+ break;
+ case UNION:
+ case MAP:
+ case LIST: {
+ // these must be an exact match
+ List<TypeDescription> fileChildren = fileType.getChildren();
+ List<TypeDescription> readerChildren = readerType.getChildren();
+ if (fileChildren.size() == readerChildren.size()) {
+ for (int i = 0; i < fileChildren.size(); ++i) {
+ isOk &= isEvolutionValid(fileChildren.get(i),
readerChildren.get(i));
+ }
+ return isOk;
+ } else {
+ return false;
+ }
+ }
+ case STRUCT: {
+ List<TypeDescription> readerChildren = readerType.getChildren();
+ List<TypeDescription> fileChildren = fileType.getChildren();
+
+ List<String> readerFieldNames = readerType.getFieldNames();
+ List<String> fileFieldNames = fileType.getFieldNames();
+
+ final Map<String, TypeDescription> fileTypesIdx = new HashMap();
+ for (int i = 0; i < fileFieldNames.size(); i++) {
+ final String fileFieldName = fileFieldNames.get(i);
+ fileTypesIdx.put(fileFieldName, fileChildren.get(i));
+ }
+
+ for (int i = 0; i < readerFieldNames.size(); i++) {
+ final String readerFieldName = readerFieldNames.get(i);
+ TypeDescription readerField = readerChildren.get(i);
+ TypeDescription fileField = fileTypesIdx.get(readerFieldName);
+ if (fileField == null) {
+ continue;
+ }
+
+ isOk &= isEvolutionValid(fileField, readerField);
+ }
+ return isOk;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown type " + readerType);
+ }
+ return isOk;
+ } else {
+ /*
+ * Check for the few cases where will not convert....
+ */
+ return ConvertTreeReaderFactory.canConvert(fileType, readerType);
+ }
+ }
+
+ /**
* By default, dedup key contains the whole ORC record, except MAP since
{@link org.apache.orc.mapred.OrcMap} is
* an implementation of {@link java.util.TreeMap} which doesn't accept
difference of records within the map in comparison.
*/
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 909d97c..a39658e 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.URI;
import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.base.Optional;
import com.google.common.io.Files;
import lombok.extern.slf4j.Slf4j;
@@ -72,8 +74,11 @@ public class AvroCompactionTaskTest {
GenericRecord r1 = createRandomRecord();
GenericRecord r2 = createRandomRecord();
+ GenericRecord r3= createEvolvedSchemaRecord();
writeFileWithContent(jobDir, "file1", r1, 20);
writeFileWithContent(jobDir, "file2", r2, 18);
+ File newestFile = writeFileWithContent(jobDir, "file3", r3, 10,
r3.getSchema());
+ newestFile.setLastModified(Long.MAX_VALUE);
EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("dedup",
basePath.getAbsolutePath().toString());
JobExecutionResult result = embeddedGobblin.run();
@@ -128,10 +133,19 @@ public class AvroCompactionTaskTest {
Assert.assertTrue(fs.exists(new Path (basePath,
"Identity/MemberAccount/hourly/2017/04/03/10")));
}
- private void writeFileWithContent(File dir, String fileName, GenericRecord
r, int count) throws IOException {
+ // Returning file handler for setting modfication time.
+ private File writeFileWithContent(File dir, String fileName, GenericRecord
r, int count) throws IOException {
File file = new File(dir, fileName + "." + count + ".avro");
Assert.assertTrue(file.createNewFile());
- this.createAvroFileWithRepeatingRecords(file, r, count);
+ this.createAvroFileWithRepeatingRecords(file, r, count, Optional.absent());
+ return file;
+ }
+
+ private File writeFileWithContent(File dir, String fileName, GenericRecord
r, int count, Schema schema) throws IOException {
+ File file = new File(dir, fileName + "." + count + ".avro");
+ Assert.assertTrue(file.createNewFile());
+ this.createAvroFileWithRepeatingRecords(file, r, count,
Optional.of(schema));
+ return file;
}
public Schema getSchema() {
@@ -154,9 +168,21 @@ public class AvroCompactionTaskTest {
return record;
}
- public void createAvroFileWithRepeatingRecords(File file, GenericRecord r,
int count) throws IOException {
+ public GenericRecord createEvolvedSchemaRecord() {
+ Schema evolvedSchema =
+ SchemaBuilder.record("evolved").fields()
+
.requiredLong("partitionKey").requiredString("environment").requiredString("subKey").optionalString("oppo").endRecord();
+ GenericRecordBuilder keyRecordBuilder = new
GenericRecordBuilder(evolvedSchema);
+ keyRecordBuilder.set("partitionKey", new Long(1));
+ keyRecordBuilder.set("environment", "test");
+ keyRecordBuilder.set("subKey", "2");
+ keyRecordBuilder.set("oppo", "poop");
+ return keyRecordBuilder.build();
+ }
+
+ public void createAvroFileWithRepeatingRecords(File file, GenericRecord r,
int count, Optional<Schema> schema) throws IOException {
DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new
GenericDatumWriter<GenericRecord>());
- writer.create(getSchema(), new FileOutputStream(file));
+ writer.create(schema.isPresent() ? schema.get() : getSchema(), new
FileOutputStream(file));
for (int i = 0; i < count; ++i) {
writer.append(r);
}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index 2d76549..da90579 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -82,10 +82,21 @@ public class OrcCompactionTaskTest {
orcStruct_3.setFieldValue("i", new IntWritable(4));
orcStruct_3.setFieldValue("j", new IntWritable(5));
+ // Writing a file with evolved schema.
+ TypeDescription evolvedSchema =
TypeDescription.fromString("struct<i:int,j:int,k:int>");
+ OrcStruct orcStruct_4 = (OrcStruct) OrcStruct.createValue(evolvedSchema);
+ orcStruct_4.setFieldValue("i", new IntWritable(5));
+ orcStruct_4.setFieldValue("j", new IntWritable(6));
+ orcStruct_4.setFieldValue("k", new IntWritable(7));
+
File file_0 = new File(jobDir, "file_0");
File file_1 = new File(jobDir, "file_1");
+ File file_2 = new File(jobDir, "file_2");
writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_0, orcStruct_2));
writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_1, orcStruct_3));
+ writeOrcRecordsInFile(new Path(file_2.getAbsolutePath()), evolvedSchema,
ImmutableList.of(orcStruct_4));
+ // Make this is the newest.
+ file_2.setLastModified(Long.MAX_VALUE);
// Verify execution
@@ -113,13 +124,19 @@ public class OrcCompactionTaskTest {
Assert.assertTrue(statuses.size() == 1);
List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
- Assert.assertEquals(result.size(), 3);
+ Assert.assertEquals(result.size(), 4);
Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+ Assert.assertNull(result.get(0).getFieldValue("k"));
Assert.assertEquals(result.get(1).getFieldValue("i"), new IntWritable(2));
Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(3));
+ Assert.assertNull(result.get(1).getFieldValue("k"));
Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(4));
Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(5));
+ Assert.assertNull(result.get(2).getFieldValue("k"));
+ Assert.assertEquals(result.get(3).getFieldValue("i"), new IntWritable(5));
+ Assert.assertEquals(result.get(3).getFieldValue("j"), new IntWritable(6));
+ Assert.assertEquals(result.get(3).getFieldValue("k"), new IntWritable(7));
}
/**
@@ -143,8 +160,12 @@ public class OrcCompactionTaskTest {
private OrcStruct copyIntOrcStruct(OrcStruct record) {
OrcStruct result = new OrcStruct(record.getSchema());
for (int i = 0 ; i < record.getNumFields() ; i ++ ) {
- IntWritable newCopy = new IntWritable(((IntWritable)
record.getFieldValue(i)).get());
- result.setFieldValue(i, newCopy);
+ if (record.getFieldValue(i) != null) {
+ IntWritable newCopy = new IntWritable(((IntWritable)
record.getFieldValue(i)).get());
+ result.setFieldValue(i, newCopy);
+ } else {
+ result.setFieldValue(i, null);
+ }
}
return result;
}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
new file mode 100644
index 0000000..2304bdc
--- /dev/null
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+
+public class OrcValueMapperTest {
+ @Test
+ public void testIsEvolutionValid() {
+ TypeDescription schema_1 =
TypeDescription.fromString("struct<i:int,j:int,k:int>");
+ TypeDescription schema_2 =
TypeDescription.fromString("struct<i:int,j:int,k:bigint>");
+ TypeDescription schema_3 =
TypeDescription.fromString("struct<i:int,j:int,k:tinyint>");
+ TypeDescription schema_4 =
TypeDescription.fromString("struct<i:int,j:int>");
+ Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_2));
+ Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_3));
+ Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_4));
+ Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_4, schema_1));
+ }
+}
\ No newline at end of file