This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1c89e85 [GOBBLIN-1126] Make ORC compaction shuffle key configurable
1c89e85 is described below
commit 1c89e85fedf90187a5a4ee4752df9464c76907e0
Author: Lei Sun <[email protected]>
AuthorDate: Tue May 12 22:43:19 2020 -0700
[GOBBLIN-1126] Make ORC compaction shuffle key configurable
Create Orc-Schema audo-filler;
Adding unit test for that;
Trying to re-use the object when upconvert is
required, and trying to reuse that for Orc record
projection, need to finish unit test for reuse
part first
Refactoring many pieces and pass all unit tests
for nested schema up-conversion
Remove Junit library in unit tests
Reorder the method in OrcUtils to improve
readability
Fix unit tests
Fix union and add tests for column projection
Add reducer-side OrcStruct comparator
Add unit tests for Reducer side of dedup for ORC
Make unit test check record content after
compaction
Edit gitignore file to make ignore vs-code related
configuration files
Fix unit tests
Add test for multi-key on reducer side
Remove excessive log in upConvertOrcStruct
Add helper to reflect problematic file during
compaciton to help debug
Catch all types of exception in map method
Address reviewer's comments
Address reviewer's comments
Enhance unit test: Add union into reducer-side
dedup's schema
Add one more tests for OrcUtils and separate the
testing workload in travis for compaction job
Closes #2966 from autumnust/orc-compaction-
compare-key-configurable
---
.gitignore | 3 +
.../mapreduce/CompactionOrcJobConfigurator.java | 13 +-
.../mapreduce/RecordKeyDedupReducerBase.java | 37 ++-
.../mapreduce/orc/OrcKeyDedupReducer.java | 41 ++-
.../gobblin/compaction/mapreduce/orc/OrcUtils.java | 335 ++++++++++++++++++++-
.../compaction/mapreduce/orc/OrcValueMapper.java | 155 +++-------
.../mapreduce/AvroCompactionTaskTest.java | 1 +
.../compaction/mapreduce/KeyDedupReducerTest.java | 1 +
.../mapreduce/OrcCompactionTaskTest.java | 87 ++++--
.../compaction/mapreduce/orc/OrcTestUtils.java | 136 +++++++++
.../compaction/mapreduce/orc/OrcUtilsTest.java | 296 ++++++++++++++++++
.../mapreduce/orc/OrcValueMapperTest.java | 56 ----
travis/test-groups.inc | 2 +-
13 files changed, 962 insertions(+), 201 deletions(-)
diff --git a/.gitignore b/.gitignore
index ca034f5..d07dada 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,6 +11,9 @@
**/*.ipr
.shelf/
+# VS Code related
+.vscode
+
**/.classpath
**/.project
**/.settings
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
index ba56047..40a5742 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.compaction.mapreduce;
import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyCompactorOutputFormat;
import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyComparator;
import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyDedupReducer;
@@ -36,6 +38,13 @@ import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*
public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
+
+ /**
+ * The key schema for the shuffle output.
+ */
+ public static final String ORC_MAPPER_SHUFFLE_KEY_SCHEMA =
"orcMapperShuffleSchema";
+ private String orcMapperShuffleSchemaString;
+
public static class Factory implements
CompactionJobConfigurator.ConfiguratorFactory {
@Override
public CompactionJobConfigurator createConfigurator(State state) throws
IOException {
@@ -45,6 +54,7 @@ public class CompactionOrcJobConfigurator extends
CompactionJobConfigurator {
public CompactionOrcJobConfigurator(State state) throws IOException {
super(state);
+ this.orcMapperShuffleSchemaString =
state.getProp(ORC_MAPPER_SHUFFLE_KEY_SCHEMA, StringUtils.EMPTY);
}
@Override
@@ -56,7 +66,8 @@ public class CompactionOrcJobConfigurator extends
CompactionJobConfigurator {
TypeDescription schema = OrcUtils.getNewestSchemaFromSource(job, this.fs);
job.getConfiguration().set(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute(),
schema.toString());
-
job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(),
schema.toString());
+
job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(),
+ orcMapperShuffleSchemaString.isEmpty() ? schema.toString() :
orcMapperShuffleSchemaString);
job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.getAttribute(),
schema.toString());
job.getConfiguration().set(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute(),
schema.toString());
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
index adcb603..27974fd 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
@@ -18,9 +18,12 @@
package org.apache.gobblin.compaction.mapreduce;
import com.google.common.base.Optional;
+
import java.io.IOException;
import java.util.Comparator;
+
import lombok.Getter;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
@@ -31,9 +34,7 @@ import org.apache.hadoop.mapreduce.Reducer;
*/
public abstract class RecordKeyDedupReducerBase<KI, VI, KO, VO> extends
Reducer<KI, VI, KO, VO> {
public enum EVENT_COUNTER {
- MORE_THAN_1,
- DEDUPED,
- RECORD_COUNT
+ MORE_THAN_1, DEDUPED, RECORD_COUNT
}
/**
@@ -45,10 +46,8 @@ public abstract class RecordKeyDedupReducerBase<KI, VI, KO,
VO> extends Reducer<
@Getter
protected VO outValue;
-
protected Optional<Comparator<VI>> deltaComparatorOptional;
-
protected abstract void initReusableObject();
/**
@@ -65,7 +64,6 @@ public abstract class RecordKeyDedupReducerBase<KI, VI, KO,
VO> extends Reducer<
protected abstract void initDeltaComparator(Configuration conf);
-
@Override
protected void setup(Context context) {
initReusableObject();
@@ -79,6 +77,7 @@ public abstract class RecordKeyDedupReducerBase<KI, VI, KO,
VO> extends Reducer<
VI valueToRetain = null;
+ // Preserve only one values among all duplicates.
for (VI value : values) {
if (valueToRetain == null) {
valueToRetain = value;
@@ -88,16 +87,15 @@ public abstract class RecordKeyDedupReducerBase<KI, VI, KO,
VO> extends Reducer<
numVals++;
}
+ writeRetainedValue(valueToRetain, context);
+ updateCounters(numVals, context);
+ }
+
+ protected void writeRetainedValue(VI valueToRetain, Context context)
+ throws IOException, InterruptedException {
setOutKey(valueToRetain);
setOutValue(valueToRetain);
- if (numVals > 1) {
- context.getCounter(EVENT_COUNTER.MORE_THAN_1).increment(1);
- context.getCounter(EVENT_COUNTER.DEDUPED).increment(numVals - 1);
- }
-
- context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
-
// Safety check
if (outKey == null || outValue == null) {
throw new IllegalStateException("Either outKey or outValue is not being
properly initialized");
@@ -105,4 +103,17 @@ public abstract class RecordKeyDedupReducerBase<KI, VI,
KO, VO> extends Reducer<
context.write(this.outKey, this.outValue);
}
+
+ /**
+ * Update the MR counter based on input {@param numDuplicates}, which
indicates the times of duplication of a
+ * record seen in a reducer call.
+ */
+ protected void updateCounters(int numDuplicates, Context context) {
+ if (numDuplicates > 1) {
+ context.getCounter(EVENT_COUNTER.MORE_THAN_1).increment(1);
+ context.getCounter(EVENT_COUNTER.DEDUPED).increment(numDuplicates - 1);
+ }
+
+ context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
+ }
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
index ad5c4dd..8539a99 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
@@ -17,15 +17,30 @@
package org.apache.gobblin.compaction.mapreduce.orc;
-import com.google.common.base.Optional;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapred.OrcValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+
+/**
+ * Check record duplicates in reducer-side.
+ */
public class OrcKeyDedupReducer extends RecordKeyDedupReducerBase<OrcKey,
OrcValue, NullWritable, OrcValue> {
+ @VisibleForTesting
+ public static final String ORC_DELTA_SCHEMA_PROVIDER =
+ "org.apache.gobblin.compaction." +
OrcKeyDedupReducer.class.getSimpleName() + ".deltaFieldsProvider";
+ public static final String USING_WHOLE_RECORD_FOR_COMPARE =
"usingWholeRecordForCompareInReducer";
+
@Override
protected void setOutValue(OrcValue valueToRetain) {
// Better to copy instead reassigning reference.
@@ -38,6 +53,30 @@ public class OrcKeyDedupReducer extends
RecordKeyDedupReducerBase<OrcKey, OrcVal
}
@Override
+ protected void reduce(OrcKey key, Iterable<OrcValue> values, Context context)
+ throws IOException, InterruptedException {
+
+ /* Map from hash of value(Typed in OrcStruct) object to its times of
duplication*/
+ Map<Integer, Integer> valuesToRetain = new HashMap<>();
+ int valueHash = 0;
+
+ for (OrcValue value : values) {
+ valueHash = ((OrcStruct) value.value).hashCode();
+ if (valuesToRetain.containsKey(valueHash)) {
+ valuesToRetain.put(valueHash, valuesToRetain.get(valueHash) + 1);
+ } else {
+ valuesToRetain.put(valueHash, 1);
+ writeRetainedValue(value, context);
+ }
+ }
+
+ /* At this point, keyset of valuesToRetain should contains all different
OrcValue. */
+ for (Map.Entry<Integer, Integer> entry : valuesToRetain.entrySet()) {
+ updateCounters(entry.getValue(), context);
+ }
+ }
+
+ @Override
protected void initDeltaComparator(Configuration conf) {
deltaComparatorOptional = Optional.absent();
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
index 6d053d2..36ed95a 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -22,8 +22,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
import org.apache.gobblin.util.FileListUtils;
@@ -31,6 +35,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.orc.OrcFile;
@@ -38,23 +54,37 @@ import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.ConvertTreeReaderFactory;
import org.apache.orc.impl.SchemaEvolution;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMap;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcTimestamp;
+import org.apache.orc.mapred.OrcUnion;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
public class OrcUtils {
// For Util class to prevent initialization
private OrcUtils() {
}
- public static TypeDescription getTypeDescriptionFromFile(Configuration conf,
Path orcFilePath) throws IOException {
+ public static TypeDescription getTypeDescriptionFromFile(Configuration conf,
Path orcFilePath)
+ throws IOException {
return getRecordReaderFromFile(conf, orcFilePath).getSchema();
}
- public static Reader getRecordReaderFromFile(Configuration conf, Path
orcFilePath) throws IOException {
+ public static Reader getRecordReaderFromFile(Configuration conf, Path
orcFilePath)
+ throws IOException {
return OrcFile.createReader(orcFilePath, new OrcFile.ReaderOptions(conf));
}
- public static TypeDescription getNewestSchemaFromSource(Job job, FileSystem
fs) throws IOException {
+ public static TypeDescription getNewestSchemaFromSource(Job job, FileSystem
fs)
+ throws IOException {
Path[] sourceDirs = FileInputFormat.getInputPaths(job);
if (sourceDirs.length == 0) {
throw new IllegalStateException("There should be at least one directory
specified for the MR job");
@@ -75,13 +105,14 @@ public class OrcUtils {
}
}
- throw new IllegalStateException(
- String.format("There's no file carrying orc file schema in the list of
directories: %s", Arrays.toString(sourceDirs)));
+ throw new IllegalStateException(String
+ .format("There's no file carrying orc file schema in the list of
directories: %s",
+ Arrays.toString(sourceDirs)));
}
/**
* Determine if two types are following valid evolution.
- * Implementation stolen and manipulated from {@link SchemaEvolution} as
that was package-private.
+ * Implementation taken and manipulated from {@link SchemaEvolution} as that
was package-private.
*/
static boolean isEvolutionValid(TypeDescription fileType, TypeDescription
readerType) {
boolean isOk = true;
@@ -156,4 +187,296 @@ public class OrcUtils {
return ConvertTreeReaderFactory.canConvert(fileType, readerType);
}
}
+
+ /**
+ * This method copies value in object {@param w} into object {@param v}
recursively even if the schema of w and v
+ * differs in a compatible way, meaning if there's a field existing in v but
not in w, the null value will be filled.
+ * It served as a helper method for {@link #upConvertOrcStruct(OrcStruct,
OrcStruct, TypeDescription)} when OrcStruct
+ * contains nested structure as a member.
+ *
+ * Suppress the warning of type checking: All casts are clearly valid as
they are all (sub)elements Orc types.
+ * Check failure will trigger Cast exception and blow up the process.
+ */
+ @SuppressWarnings("unchecked")
+ private static WritableComparable structConversionHelper(WritableComparable
w, WritableComparable v,
+ TypeDescription targetSchema) {
+
+ if (w instanceof OrcStruct) {
+ upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema);
+ } else if (w instanceof OrcList) {
+ OrcList castedList = (OrcList) w;
+ OrcList targetList = (OrcList) v;
+ TypeDescription elementType = targetSchema.getChildren().get(0);
+ WritableComparable targetListRecordContainer =
+ targetList.size() > 0 ? (WritableComparable) targetList.get(0) :
createValueRecursively(elementType, 0);
+ targetList.clear();
+
+ for (int i = 0; i < castedList.size(); i++) {
+ targetList.add(i,
+ structConversionHelper((WritableComparable) castedList.get(i),
targetListRecordContainer, elementType));
+ }
+ } else if (w instanceof OrcMap) {
+ OrcMap castedMap = (OrcMap) w;
+ OrcMap targetMap = (OrcMap) v;
+ TypeDescription valueSchema = targetSchema.getChildren().get(1);
+
+ // Create recordContainer with the schema of value.
+ Iterator targetMapEntries = targetMap.values().iterator();
+ WritableComparable targetMapRecordContainer =
+ targetMapEntries.hasNext() ? (WritableComparable)
targetMapEntries.next()
+ : createValueRecursively(valueSchema);
+
+ targetMap.clear();
+
+ for (Object entry : castedMap.entrySet()) {
+ Map.Entry<WritableComparable, WritableComparable> castedEntry =
+ (Map.Entry<WritableComparable, WritableComparable>) entry;
+
+ targetMapRecordContainer =
+ structConversionHelper(castedEntry.getValue(),
targetMapRecordContainer, valueSchema);
+ targetMap.put(castedEntry.getKey(), targetMapRecordContainer);
+ }
+ } else if (w instanceof OrcUnion) {
+ OrcUnion castedUnion = (OrcUnion) w;
+ OrcUnion targetUnion = (OrcUnion) v;
+ byte tag = castedUnion.getTag();
+
+ // ORC doesn't support Union type widening
+ // Avro doesn't allow it either, reference:
https://avro.apache.org/docs/current/spec.html#Schema+Resolution
+ // As a result, member schema within source and target should be
identical.
+ TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag);
+ targetUnion.set(tag, structConversionHelper((WritableComparable)
castedUnion.getObject(),
+ (WritableComparable)
OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema));
+ } else {
+ // Regardless whether type-widening is happening or not, this method
copy the value of w into v.
+ handlePrimitiveWritableComparable(w, v);
+ }
+
+ // If non-primitive or type-widening is required, v should already be
populated by w's value recursively.
+ return v;
+ }
+
+ /**
+ * Recursively convert the {@param oldStruct} into {@param newStruct} whose
schema is {@param targetSchema}.
+ * This serves similar purpose like GenericDatumReader for Avro, which
accepts an reader schema and writer schema
+ * to allow users convert bytes into reader's schema in a compatible
approach.
+ * Calling this method SHALL NOT cause any side-effect for {@param
oldStruct}, also it will copy value of each fields
+ * in {@param oldStruct} into {@param newStruct} recursively. Please ensure
avoiding unnecessary call as it could
+ * be pretty expensive if the struct schema is complicated, or contains
container objects like array/map.
+ *
+ * Note that if newStruct containing things like List/Map (container-type),
the up-conversion is doing two things:
+ * 1. Clear all elements in original containers.
+ * 2. Make value of container elements in {@param oldStruct} is populated
into {@param newStruct} with element-type
+ * in {@param newStruct} if compatible.
+ *
+ * Limitation:
+ * 1. Does not support up-conversion of key types in Maps. The underlying
reasoning is because of the primary format
+ * from upstream is Avro, which enforces key-type to be string only.
+ * 2. Conversion from a field A to field B only happens if
+ *
org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B)
return true.
+ */
+ @VisibleForTesting
+ public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct
newStruct, TypeDescription targetSchema) {
+
+ // If target schema is not equal to newStruct's schema, it is a illegal
state and doesn't make sense to work through.
+ Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema));
+
+ int indexInNewSchema = 0;
+ List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames();
+ /* Construct a fieldName -> Index map to efficient access within the loop
below. */
+ Map<String, Integer> oldSchemaIndex = IntStream.range(0,
oldSchemaFieldNames.size()).boxed()
+ .collect(Collectors.toMap(oldSchemaFieldNames::get,
Function.identity()));
+ List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren();
+ List<TypeDescription> newSchemaTypes = targetSchema.getChildren();
+
+ for (String fieldName : targetSchema.getFieldNames()) {
+ if (oldSchemaFieldNames.contains(fieldName) &&
oldStruct.getFieldValue(fieldName) != null) {
+ int fieldIndex = oldSchemaIndex.get(fieldName);
+
+ TypeDescription oldFieldSchema = oldSchemaTypes.get(fieldIndex);
+ TypeDescription newFieldSchema = newSchemaTypes.get(indexInNewSchema);
+
+ if (isEvolutionValid(oldFieldSchema, newFieldSchema)) {
+ WritableComparable oldField = oldStruct.getFieldValue(fieldName);
+ WritableComparable newField = newStruct.getFieldValue(fieldName);
+ newField = (newField == null) ?
OrcUtils.createValueRecursively(newFieldSchema) : newField;
+ newStruct.setFieldValue(fieldName, structConversionHelper(oldField,
newField, newFieldSchema));
+ } else {
+ throw new SchemaEvolution.IllegalEvolutionException(String
+ .format("ORC does not support type conversion from file" + "
type %s to reader type %s ",
+ oldFieldSchema.toString(), newFieldSchema.toString()));
+ }
+ } else {
+ newStruct.setFieldValue(fieldName, null);
+ }
+
+ indexInNewSchema++;
+ }
+ }
+
+ /**
+ * Copy the value of {@param from} object into {@param to} with supporting
of type-widening that ORC allowed.
+ */
+ public static void handlePrimitiveWritableComparable(WritableComparable
from, WritableComparable to) {
+ if (from instanceof ByteWritable) {
+ if (to instanceof ByteWritable) {
+ ((ByteWritable) to).set(((ByteWritable) from).get());
+ return;
+ } else if (to instanceof ShortWritable) {
+ ((ShortWritable) to).set(((ByteWritable) from).get());
+ return;
+ } else if (to instanceof IntWritable) {
+ ((IntWritable) to).set(((ByteWritable) from).get());
+ return;
+ } else if (to instanceof LongWritable) {
+ ((LongWritable) to).set(((ByteWritable) from).get());
+ return;
+ } else if (to instanceof DoubleWritable) {
+ ((DoubleWritable) to).set(((ByteWritable) from).get());
+ return;
+ }
+ } else if (from instanceof ShortWritable) {
+ if (to instanceof ShortWritable) {
+ ((ShortWritable) to).set(((ShortWritable) from).get());
+ return;
+ } else if (to instanceof IntWritable) {
+ ((IntWritable) to).set(((ShortWritable) from).get());
+ return;
+ } else if (to instanceof LongWritable) {
+ ((LongWritable) to).set(((ShortWritable) from).get());
+ return;
+ } else if (to instanceof DoubleWritable) {
+ ((DoubleWritable) to).set(((ShortWritable) from).get());
+ return;
+ }
+ } else if (from instanceof IntWritable) {
+ if (to instanceof IntWritable) {
+ ((IntWritable) to).set(((IntWritable) from).get());
+ return;
+ } else if (to instanceof LongWritable) {
+ ((LongWritable) to).set(((IntWritable) from).get());
+ return;
+ } else if (to instanceof DoubleWritable) {
+ ((DoubleWritable) to).set(((IntWritable) from).get());
+ return;
+ }
+ } else if (from instanceof LongWritable) {
+ if (to instanceof LongWritable) {
+ ((LongWritable) to).set(((LongWritable) from).get());
+ return;
+ } else if (to instanceof DoubleWritable) {
+ ((DoubleWritable) to).set(((LongWritable) from).get());
+ return;
+ }
+ // Following from this branch, type-widening is not allowed and only
value-copy will happen.
+ } else if (from instanceof DoubleWritable) {
+ if (to instanceof DoubleWritable) {
+ ((DoubleWritable) to).set(((DoubleWritable) from).get());
+ return;
+ }
+ } else if (from instanceof BytesWritable) {
+ if (to instanceof BytesWritable) {
+ ((BytesWritable) to).set((BytesWritable) from);
+ return;
+ }
+ } else if (from instanceof FloatWritable) {
+ if (to instanceof FloatWritable) {
+ ((FloatWritable) to).set(((FloatWritable) from).get());
+ return;
+ }
+ } else if (from instanceof Text) {
+ if (to instanceof Text) {
+ ((Text) to).set((Text) from);
+ return;
+ }
+ } else if (from instanceof DateWritable) {
+ if (to instanceof DateWritable) {
+ ((DateWritable) to).set(((DateWritable) from).get());
+ return;
+ }
+ } else if (from instanceof OrcTimestamp && to instanceof OrcTimestamp) {
+ ((OrcTimestamp) to).set(((OrcTimestamp) from).toString());
+ return;
+ } else if (from instanceof HiveDecimalWritable && to instanceof
HiveDecimalWritable) {
+ ((HiveDecimalWritable) to).set(((HiveDecimalWritable)
from).getHiveDecimal());
+ return;
+ }
+ throw new UnsupportedOperationException(String
+ .format("The conversion of primitive-type WritableComparable object
from %s to %s is not supported",
+ from.getClass(), to.getClass()));
+ }
+
+ /**
+ * For nested structure like struct<a:array<struct<int,string>>>, calling
OrcStruct.createValue doesn't create entry for the inner
+ * list, which would be required to assign a value if the entry-type has
nested structure, or it just cannot see the
+ * entry's nested structure.
+ *
+ * This function should be fed back to open-source ORC.
+ */
+ public static WritableComparable createValueRecursively(TypeDescription
schema, int elemNum) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanWritable();
+ case BYTE:
+ return new ByteWritable();
+ case SHORT:
+ return new ShortWritable();
+ case INT:
+ return new IntWritable();
+ case LONG:
+ return new LongWritable();
+ case FLOAT:
+ return new FloatWritable();
+ case DOUBLE:
+ return new DoubleWritable();
+ case BINARY:
+ return new BytesWritable();
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return new Text();
+ case DATE:
+ return new DateWritable();
+ case TIMESTAMP:
+ case TIMESTAMP_INSTANT:
+ return new OrcTimestamp();
+ case DECIMAL:
+ return new HiveDecimalWritable();
+ case STRUCT: {
+ OrcStruct result = new OrcStruct(schema);
+ int c = 0;
+ for (TypeDescription child : schema.getChildren()) {
+ result.setFieldValue(c++, createValueRecursively(child, elemNum));
+ }
+ return result;
+ }
+ case UNION: {
+ // For union, there's no way to determine which tag's object type to
create with only schema.
+ // It can be determined in the cases when a OrcUnion's value needs to
be copied to another object recursively,
+ // and the source OrcUnion can provide this information.
+ return new OrcUnion(schema);
+ }
+ case LIST: {
+ OrcList result = new OrcList(schema);
+ for (int i = 0; i < elemNum; i++) {
+ result.add(createValueRecursively(schema.getChildren().get(0),
elemNum));
+ }
+ return result;
+ }
+ case MAP: {
+ OrcMap result = new OrcMap(schema);
+ for (int i = 0; i < elemNum; i++) {
+ result.put(createValueRecursively(schema.getChildren().get(0),
elemNum),
+ createValueRecursively(schema.getChildren().get(1), elemNum));
+ }
+ return result;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown type " + schema);
+ }
+ }
+
+ public static WritableComparable createValueRecursively(TypeDescription
schema) {
+ return createValueRecursively(schema, 1);
+ }
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index ab9b065..b6bcdcd 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -18,26 +18,24 @@
package org.apache.gobblin.compaction.mapreduce.orc;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
+import java.lang.reflect.Field;
+import java.util.Arrays;
import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.apache.orc.OrcConf;
import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.SchemaEvolution;
import org.apache.orc.mapred.OrcKey;
-import org.apache.orc.mapred.OrcList;
-import org.apache.orc.mapred.OrcMap;
import org.apache.orc.mapred.OrcStruct;
-import org.apache.orc.mapred.OrcUnion;
import org.apache.orc.mapred.OrcValue;
import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
-import com.google.common.annotations.VisibleForTesting;
-
import lombok.extern.slf4j.Slf4j;
@@ -49,8 +47,12 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OrcValueMapper extends RecordKeyMapperBase<NullWritable,
OrcStruct, Object, OrcValue> {
+ // This key will only be initialized lazily when dedup is enabled.
+ private OrcKey outKey;
private OrcValue outValue;
- private TypeDescription mapperSchema;
+ private TypeDescription mrOutputSchema;
+ private TypeDescription shuffleKeySchema;
+ private JobConf jobConf;
// This is added mostly for debuggability.
private static int writeCount = 0;
@@ -59,127 +61,68 @@ public class OrcValueMapper extends
RecordKeyMapperBase<NullWritable, OrcStruct,
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
+ this.jobConf = new JobConf(context.getConfiguration());
+ this.outKey = new OrcKey();
+ this.outKey.configure(jobConf);
this.outValue = new OrcValue();
- this.mapperSchema =
+ this.outValue.configure(jobConf);
+ this.mrOutputSchema =
TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
+ this.shuffleKeySchema =
+
TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()));
}
@Override
protected void map(NullWritable key, OrcStruct orcStruct, Context context)
throws IOException, InterruptedException {
- OrcStruct upConvertedStruct = upConvertOrcStruct(orcStruct, mapperSchema);
- this.outValue.value = upConvertedStruct;
+
+ // Up-convert OrcStruct only if schema differs
+ if (!orcStruct.getSchema().equals(this.mrOutputSchema)) {
+ // Note that outValue.value is being re-used.
+ log.info("There's a schema difference between output schema and input
schema");
+ OrcUtils.upConvertOrcStruct(orcStruct, (OrcStruct) outValue.value,
mrOutputSchema);
+ } else {
+ this.outValue.value = orcStruct;
+ }
+
try {
if (context.getNumReduceTasks() == 0) {
context.write(NullWritable.get(), this.outValue);
} else {
- context.write(getDedupKey(upConvertedStruct), this.outValue);
+ fillDedupKey(orcStruct);
+ context.write(this.outKey, this.outValue);
}
} catch (Exception e) {
- throw new RuntimeException("Failure in write record no." + writeCount,
e);
+ String inputPathInString = getInputsplitHelper(context);
+ throw new RuntimeException("Failure in write record no." + writeCount +
" the processing split is:" + inputPathInString, e);
}
writeCount += 1;
context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
}
- /**
- * Recursively up-convert the {@link OrcStruct} into {@link #mapperSchema}
- * Limitation:
- * 1. Does not support up-conversion of key types in Maps
- * 2. Conversion only happens if
org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid
return true.
- */
- @VisibleForTesting
- OrcStruct upConvertOrcStruct(OrcStruct orcStruct, TypeDescription
mapperSchema) {
- // For ORC schema, if schema object differs that means schema itself is
different while for Avro,
- // there are chances that documentation or attributes' difference lead to
the schema object difference.
- if (!orcStruct.getSchema().equals(mapperSchema)) {
- log.info("There's schema mismatch identified from reader's schema and
writer's schema");
- OrcStruct newStruct = new OrcStruct(mapperSchema);
-
- int indexInNewSchema = 0;
- List<String> oldSchemaFieldNames = orcStruct.getSchema().getFieldNames();
- List<TypeDescription> oldSchemaTypes =
orcStruct.getSchema().getChildren();
- List<TypeDescription> newSchemaTypes = mapperSchema.getChildren();
-
- for (String field : mapperSchema.getFieldNames()) {
- if (oldSchemaFieldNames.contains(field)) {
- int fieldIndex = oldSchemaFieldNames.indexOf(field);
-
- TypeDescription fileType = oldSchemaTypes.get(fieldIndex);
- TypeDescription readerType = newSchemaTypes.get(indexInNewSchema);
-
- if (OrcUtils.isEvolutionValid(fileType, readerType)) {
- WritableComparable oldField = orcStruct.getFieldValue(field);
- oldField = structConversionHelper(oldField,
mapperSchema.getChildren().get(fieldIndex));
- newStruct.setFieldValue(field, oldField);
- } else {
- throw new SchemaEvolution.IllegalEvolutionException(String
- .format("ORC does not support type conversion from file" + "
type %s to reader type %s ",
- fileType.toString(), readerType.toString()));
- }
- } else {
- newStruct.setFieldValue(field, null);
- }
-
- indexInNewSchema++;
- }
-
- return newStruct;
- } else {
- return orcStruct;
- }
- }
-
- /**
- * Suppress the warning of type checking: All casts are clearly valid as
they are all (sub)elements Orc types.
- * Check failure will trigger Cast exception and blow up the process.
- */
- @SuppressWarnings("unchecked")
- private WritableComparable structConversionHelper(WritableComparable w,
TypeDescription mapperSchema) {
- if (w instanceof OrcStruct) {
- return upConvertOrcStruct((OrcStruct) w, mapperSchema);
- } else if (w instanceof OrcList) {
- OrcList castedList = (OrcList) w;
- TypeDescription elementType = mapperSchema.getChildren().get(0);
- for (int i = 0; i < castedList.size(); i++) {
- castedList.set(i, structConversionHelper((WritableComparable)
castedList.get(i), elementType));
- }
- } else if (w instanceof OrcMap) {
- OrcMap castedMap = (OrcMap) w;
- for (Object entry : castedMap.entrySet()) {
- Map.Entry<WritableComparable, WritableComparable> castedEntry =
- (Map.Entry<WritableComparable, WritableComparable>) entry;
- castedMap.put(castedEntry.getKey(),
- structConversionHelper(castedEntry.getValue(),
mapperSchema.getChildren().get(1)));
- }
- return castedMap;
- } else if (w instanceof OrcUnion) {
- OrcUnion castedUnion = (OrcUnion) w;
- byte tag = castedUnion.getTag();
- castedUnion.set(tag,
- structConversionHelper((WritableComparable) castedUnion.getObject(),
mapperSchema.getChildren().get(tag)));
+ private String getInputsplitHelper(Context context) {
+ try {
+ Field mapContextField =
WrappedMapper.Context.class.getDeclaredField("mapContext");
+ mapContextField.setAccessible(true);
+ Path[] inputPaths = ((CombineFileSplit) ((MapContextImpl)
mapContextField.get((WrappedMapper.Context) context))
+ .getInputSplit()).getPaths();
+ return Arrays.toString(inputPaths);
+ } catch (NoSuchFieldException | IllegalAccessException ie) {
+ throw new RuntimeException(ie);
}
-
- // Directly return if primitive object.
- return w;
}
/**
* By default, dedup key contains the whole ORC record, except MAP since
{@link org.apache.orc.mapred.OrcMap} is
* an implementation of {@link java.util.TreeMap} which doesn't accept
difference of records within the map in comparison.
+ * Note: This method should have no side-effect on input record.
*/
- protected OrcKey getDedupKey(OrcStruct originalRecord) {
- return convertOrcStructToOrcKey(originalRecord);
- }
-
- /**
- * The output key of mapper needs to be comparable. In the scenarios that we
need the orc record itself
- * to be the output key, this conversion will be necessary.
- */
- protected OrcKey convertOrcStructToOrcKey(OrcStruct struct) {
- OrcKey orcKey = new OrcKey();
- orcKey.key = struct;
- return orcKey;
+ private void fillDedupKey(OrcStruct originalRecord) {
+ if (!originalRecord.getSchema().equals(this.shuffleKeySchema)) {
+ OrcUtils.upConvertOrcStruct(originalRecord, (OrcStruct) this.outKey.key,
this.shuffleKeySchema);
+ } else {
+ this.outKey.key = originalRecord;
+ }
}
}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 2a04d68..048fe47 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -56,6 +56,7 @@ import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
@Slf4j
+@Test(groups = {"gobblin.compaction"})
public class AvroCompactionTaskTest {
protected FileSystem getFileSystem()
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
index 6183f35..5671378 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/KeyDedupReducerTest.java
@@ -47,6 +47,7 @@ import static org.mockito.Mockito.when;
* Test class for {@link
org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase}.
* Will have test separately in both avro and orc.
*/
+@Test(groups = {"gobblin.compaction"})
public class KeyDedupReducerTest {
private static final String AVRO_KEY_SCHEMA =
"{ \"type\" : \"record\", \"name\" : \"etl\",\"namespace\" :
\"reducerTest\", \"fields\" : [ { \"name\" : "
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index e6a3fab..b58fc36 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.commons.io.FilenameUtils;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcTestUtils;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.runtime.api.JobExecutionResult;
import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
@@ -35,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@@ -50,11 +53,12 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*;
+import static
org.apache.gobblin.compaction.mapreduce.CompactionOrcJobConfigurator.ORC_MAPPER_SHUFFLE_KEY_SCHEMA;
import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
import static
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET;
import static
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_SHOULD_DEDUPLICATE;
-
+@Test(groups = {"gobblin.compaction"})
public class OrcCompactionTaskTest {
final String extensionName = "orc";
@@ -86,7 +90,7 @@ public class OrcCompactionTaskTest {
}
@Test
- public void basicTestWithRecompaction() throws Exception {
+ public void basicTestWithRecompactionAndBasicSchemaEvolution() throws
Exception {
File basePath = Files.createTempDir();
basePath.deleteOnExit();
@@ -98,7 +102,7 @@ public class OrcCompactionTaskTest {
// Writing some basic ORC files
createTestingData(jobDir);
- // Writing an additional file with evolved schema.
+ // Writing an additional file with ** evolved schema **.
TypeDescription evolvedSchema =
TypeDescription.fromString("struct<i:int,j:int,k:int>");
OrcStruct orcStruct_4 = (OrcStruct) OrcStruct.createValue(evolvedSchema);
orcStruct_4.setFieldValue("i", new IntWritable(5));
@@ -159,6 +163,64 @@ public class OrcCompactionTaskTest {
Assert.assertEquals(result.size(), 4 + 1);
}
+ @Test
+ public void testReducerSideDedup() throws Exception {
+ File basePath = Files.createTempDir();
+ basePath.deleteOnExit();
+
+ String minutelyPath =
"Identity/MemberAccount/minutely/2020/04/03/10/20_30/run_2020-04-03-10-20";
+ String hourlyPath = "Identity/MemberAccount/hourly/2020/04/03/10/";
+ File jobDir = new File(basePath, minutelyPath);
+ Assert.assertTrue(jobDir.mkdirs());
+
+ TypeDescription nestedSchema =
TypeDescription.fromString("struct<a:struct<a:int,b:string,c:int>,b:string,c:uniontype<int,string>>");
+ // Create three records with same value except "b" column in the top-level.
+ OrcStruct nested_struct_1 = (OrcStruct)
OrcUtils.createValueRecursively(nestedSchema);
+ OrcTestUtils.fillOrcStructWithFixedValue(nested_struct_1, nestedSchema, 1,
"test1", true);
+ ((OrcStruct)nested_struct_1).setFieldValue("b", new Text("uno"));
+ OrcStruct nested_struct_2 = (OrcStruct)
OrcUtils.createValueRecursively(nestedSchema);
+ OrcTestUtils.fillOrcStructWithFixedValue(nested_struct_2, nestedSchema, 1,
"test2", true);
+ ((OrcStruct)nested_struct_2).setFieldValue("b", new Text("dos"));
+ OrcStruct nested_struct_3 = (OrcStruct)
OrcUtils.createValueRecursively(nestedSchema);
+ OrcTestUtils.fillOrcStructWithFixedValue(nested_struct_3, nestedSchema, 1,
"test3", true);
+ ((OrcStruct)nested_struct_3).setFieldValue("b", new Text("tres"));
+ // Create another two records with different value from the above three,
and these two differs in column b as well.
+ OrcStruct nested_struct_4 = (OrcStruct)
OrcUtils.createValueRecursively(nestedSchema);
+ OrcTestUtils.fillOrcStructWithFixedValue(nested_struct_4, nestedSchema, 2,
"test2", false);
+ ((OrcStruct)nested_struct_4).setFieldValue("b", new Text("uno"));
+ // This record will be considered as a duplication as nested_struct_4
+ OrcStruct nested_struct_5 = (OrcStruct)
OrcUtils.createValueRecursively(nestedSchema);
+ OrcTestUtils.fillOrcStructWithFixedValue(nested_struct_5, nestedSchema, 2,
"test2", false);
+ ((OrcStruct)nested_struct_5).setFieldValue("b", new Text("uno"));
+
+ // Following pattern: FILENAME.RECORDCOUNT.EXTENSION
+ File file_0 = new File(jobDir, "file_0.5." + extensionName);
+ writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), nestedSchema,
ImmutableList.of(nested_struct_1,
+ nested_struct_2, nested_struct_3, nested_struct_4, nested_struct_5));
+
+ EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic",
basePath.getAbsolutePath().toString())
+
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+ TestCompactionOrcJobConfigurator.Factory.class.getName())
+ .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
+ .setConfiguration(ORC_MAPPER_SHUFFLE_KEY_SCHEMA,
"struct<a:struct<a:int,c:int>>");
+ JobExecutionResult execution = embeddedGobblin.run();
+ Assert.assertTrue(execution.isSuccessful());
+
+ // Verifying result: Reducer should catch all the false-duplicates
+ File outputDir = new File(basePath, hourlyPath);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ List<FileStatus> statuses = new ArrayList<>();
+ reloadFolder(statuses, outputDir, fs);
+ Assert.assertEquals(statuses.size(), 1);
+ List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
+ // Should still contain original 3 records since they have different value
in columns not included in shuffle key.
+ Assert.assertEquals(result.size(), 4);
+ Assert.assertTrue(result.contains(nested_struct_1));
+ Assert.assertTrue(result.contains(nested_struct_2));
+ Assert.assertTrue(result.contains(nested_struct_3));
+ Assert.assertTrue(result.contains(nested_struct_4));
+ }
+
// A helper method to load all files in the output directory for
compaction-result inspection.
private void reloadFolder(List<FileStatus> statuses, File outputDir,
FileSystem fs) throws IOException {
statuses.clear();
@@ -227,6 +289,7 @@ public class OrcCompactionTaskTest {
/**
* Read a output ORC compacted file into memory.
+ * This only works if fields are int value.
*/
public List<OrcStruct> readOrcFile(Path orcFilePath)
throws IOException, InterruptedException {
@@ -236,26 +299,16 @@ public class OrcCompactionTaskTest {
OrcMapreduceRecordReader recordReader = new
OrcMapreduceRecordReader(orcReader, options);
List<OrcStruct> result = new ArrayList<>();
+ OrcStruct recordContainer;
while (recordReader.nextKeyValue()) {
- result.add(copyIntOrcStruct((OrcStruct) recordReader.getCurrentValue()));
+ recordContainer = (OrcStruct)
OrcUtils.createValueRecursively(orcReader.getSchema());
+ OrcUtils.upConvertOrcStruct((OrcStruct) recordReader.getCurrentValue(),
recordContainer, orcReader.getSchema());
+ result.add(recordContainer);
}
return result;
}
- private OrcStruct copyIntOrcStruct(OrcStruct record) {
- OrcStruct result = new OrcStruct(record.getSchema());
- for (int i = 0 ; i < record.getNumFields() ; i ++ ) {
- if (record.getFieldValue(i) != null) {
- IntWritable newCopy = new IntWritable(((IntWritable)
record.getFieldValue(i)).get());
- result.setFieldValue(i, newCopy);
- } else {
- result.setFieldValue(i, null);
- }
- }
- return result;
- }
-
public void writeOrcRecordsInFile(Path path, TypeDescription schema,
List<OrcStruct> orcStructs) throws Exception {
Configuration configuration = new Configuration();
OrcFile.WriterOptions options =
OrcFile.writerOptions(configuration).setSchema(schema);
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcTestUtils.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcTestUtils.java
new file mode 100644
index 0000000..53efa17
--- /dev/null
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcTestUtils.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMap;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
+
+
+public class OrcTestUtils {
+ /**
+ * Fill in value in OrcStruct with given schema, assuming {@param w}
contains the same schema as {@param schema}.
+ * {@param schema} is still necessary to given given {@param w} do contains
schema information itself, because the
+ * actual value type is only available in {@link TypeDescription} but not
{@link org.apache.orc.mapred.OrcValue}.
+ *
+ * For simplicity here are some assumptions:
+ * - We only give 3 primitive values and use them to construct compound
values. To make it work for different types that
+ * can be widened or shrunk to each other, please use value within small
range.
+ * - For List, Map or Union, make sure there's at least one entry within the
record-container.
+ * you may want to try createValueRecursively(TypeDescription) instead of
{@link OrcStruct#createValue(TypeDescription)}
+ */
+ public static void fillOrcStructWithFixedValue(WritableComparable w,
TypeDescription schema, int unionTag,
+ int intValue, String stringValue, boolean booleanValue) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ ((BooleanWritable) w).set(booleanValue);
+ break;
+ case BYTE:
+ ((ByteWritable) w).set((byte) intValue);
+ break;
+ case SHORT:
+ ((ShortWritable) w).set((short) intValue);
+ break;
+ case INT:
+ ((IntWritable) w).set(intValue);
+ break;
+ case LONG:
+ ((LongWritable) w).set(intValue);
+ break;
+ case FLOAT:
+ ((FloatWritable) w).set(intValue * 1.0f);
+ break;
+ case DOUBLE:
+ ((DoubleWritable) w).set(intValue * 1.0);
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ ((Text) w).set(stringValue);
+ break;
+ case BINARY:
+ throw new UnsupportedOperationException("Binary type is not supported
in random orc data filler");
+ case DECIMAL:
+ throw new UnsupportedOperationException("Decimal type is not supported
in random orc data filler");
+ case DATE:
+ case TIMESTAMP:
+ case TIMESTAMP_INSTANT:
+ throw new UnsupportedOperationException(
+ "Timestamp and its derived types is not supported in random orc
data filler");
+ case LIST:
+ OrcList castedList = (OrcList) w;
+ // Here it is not trivial to create typed-object in element-type. So
this method expect the value container
+ // to at least contain one element, or the traversing within the list
will be skipped.
+ for (Object i : castedList) {
+ fillOrcStructWithFixedValue((WritableComparable) i,
schema.getChildren().get(0), unionTag, intValue,
+ stringValue, booleanValue);
+ }
+ break;
+ case MAP:
+ OrcMap castedMap = (OrcMap) w;
+ for (Object entry : castedMap.entrySet()) {
+ Map.Entry<WritableComparable, WritableComparable> castedEntry =
+ (Map.Entry<WritableComparable, WritableComparable>) entry;
+ fillOrcStructWithFixedValue(castedEntry.getKey(),
schema.getChildren().get(0), unionTag, intValue,
+ stringValue, booleanValue);
+ fillOrcStructWithFixedValue(castedEntry.getValue(),
schema.getChildren().get(1), unionTag, intValue,
+ stringValue, booleanValue);
+ }
+ break;
+ case STRUCT:
+ OrcStruct castedStruct = (OrcStruct) w;
+ int fieldIdx = 0;
+ for (TypeDescription child : schema.getChildren()) {
+ fillOrcStructWithFixedValue(castedStruct.getFieldValue(fieldIdx),
child, unionTag, intValue, stringValue,
+ booleanValue);
+ fieldIdx += 1;
+ }
+ break;
+ case UNION:
+ OrcUnion castedUnion = (OrcUnion) w;
+ TypeDescription targetMemberSchema =
schema.getChildren().get(unionTag);
+ castedUnion.set(unionTag,
OrcUtils.createValueRecursively(targetMemberSchema));
+ fillOrcStructWithFixedValue((WritableComparable)
castedUnion.getObject(), targetMemberSchema, unionTag,
+ intValue, stringValue, booleanValue);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown type " +
schema.toString());
+ }
+ }
+
+ /**
+ * The simple API: Union tag by default set to 0.
+ */
+ public static void fillOrcStructWithFixedValue(WritableComparable w,
TypeDescription schema, int intValue,
+ String stringValue, boolean booleanValue) {
+ fillOrcStructWithFixedValue(w, schema, 0, intValue, stringValue,
booleanValue);
+ }
+}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
new file mode 100644
index 0000000..54527f0
--- /dev/null
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.orc;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMap;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(groups = {"gobblin.compaction"})
+public class OrcUtilsTest {
+
+ final int intValue = 10;
+ final String stringValue = "testString";
+ final boolean boolValue = true;
+
+ @Test
+ public void testRandomFillOrcStructWithAnySchema() {
+ // 1. Basic case
+ TypeDescription schema_1 =
TypeDescription.fromString("struct<i:int,j:int,k:int>");
+ OrcStruct expectedStruct = (OrcStruct) OrcStruct.createValue(schema_1);
+ expectedStruct.setFieldValue("i", new IntWritable(3));
+ expectedStruct.setFieldValue("j", new IntWritable(3));
+ expectedStruct.setFieldValue("k", new IntWritable(3));
+
+ OrcStruct actualStruct = (OrcStruct) OrcStruct.createValue(schema_1);
+ OrcTestUtils.fillOrcStructWithFixedValue(actualStruct, schema_1, 3, "",
false);
+ Assert.assertEquals(actualStruct, expectedStruct);
+
+ TypeDescription schema_2 =
TypeDescription.fromString("struct<i:boolean,j:int,k:string>");
+ expectedStruct = (OrcStruct) OrcStruct.createValue(schema_2);
+ expectedStruct.setFieldValue("i", new BooleanWritable(false));
+ expectedStruct.setFieldValue("j", new IntWritable(3));
+ expectedStruct.setFieldValue("k", new Text(""));
+ actualStruct = (OrcStruct) OrcStruct.createValue(schema_2);
+
+ OrcTestUtils.fillOrcStructWithFixedValue(actualStruct, schema_2, 3, "",
false);
+ Assert.assertEquals(actualStruct, expectedStruct);
+
+ // 2. Some simple nested cases: struct within struct
+ TypeDescription schema_3 =
TypeDescription.fromString("struct<i:boolean,j:struct<i:boolean,j:int,k:string>>");
+ OrcStruct expectedStruct_nested_1 = (OrcStruct)
OrcStruct.createValue(schema_3);
+ expectedStruct_nested_1.setFieldValue("i", new BooleanWritable(false));
+ expectedStruct_nested_1.setFieldValue("j", expectedStruct);
+ actualStruct = (OrcStruct) OrcStruct.createValue(schema_3);
+
+ OrcTestUtils.fillOrcStructWithFixedValue(actualStruct, schema_3, 3, "",
false);
+ Assert.assertEquals(actualStruct, expectedStruct_nested_1);
+
+ // 3. array of struct within struct
+ TypeDescription schema_4 =
TypeDescription.fromString("struct<i:boolean,j:array<struct<i:boolean,j:int,k:string>>>");
+ // Note that this will not create any elements in the array.
+ expectedStruct_nested_1 = (OrcStruct) OrcStruct.createValue(schema_4);
+ expectedStruct_nested_1.setFieldValue("i", new BooleanWritable(false));
+ OrcList list = new OrcList(schema_2, 1);
+ list.add(expectedStruct);
+ expectedStruct_nested_1.setFieldValue("j", list);
+
+ // Constructing actualStruct: make sure the list is non-Empty. There's any
meaningful value within placeholder struct.
+ actualStruct = (OrcStruct) OrcStruct.createValue(schema_4);
+ OrcList placeHolderList = new OrcList(schema_2, 1);
+ OrcStruct placeHolderStruct = (OrcStruct) OrcStruct.createValue(schema_2);
+ placeHolderList.add(placeHolderStruct);
+ actualStruct.setFieldValue("j", placeHolderList);
+
+ OrcTestUtils.fillOrcStructWithFixedValue(actualStruct, schema_4, 3, "",
false);
+ Assert.assertEquals(actualStruct, expectedStruct_nested_1);
+
+ // 4. union of struct within struct
+ TypeDescription schema_5 =
TypeDescription.fromString("struct<i:boolean,j:uniontype<struct<i:boolean,j:int,k:string>>>");
+ expectedStruct_nested_1 = (OrcStruct) OrcStruct.createValue(schema_5);
+ expectedStruct_nested_1.setFieldValue("i", new BooleanWritable(false));
+ OrcUnion union = new OrcUnion(schema_2);
+ union.set(0, expectedStruct);
+ expectedStruct_nested_1.setFieldValue("j", union);
+
+ // Construct actualStruct: make sure there's a struct-placeholder within
the union.
+ actualStruct = (OrcStruct) OrcStruct.createValue(schema_5);
+ OrcUnion placeHolderUnion = new OrcUnion(schema_2);
+ placeHolderUnion.set(0, placeHolderStruct);
+ actualStruct.setFieldValue("j", placeHolderUnion);
+
+ OrcTestUtils.fillOrcStructWithFixedValue(actualStruct, schema_5, 3, "",
false);
+ Assert.assertEquals(actualStruct, expectedStruct_nested_1);
+ }
+
+ @Test
+ public void testUpConvertOrcStruct() {
+
+ // Basic case, all primitives, newly added value will be set to null
+ TypeDescription baseStructSchema =
TypeDescription.fromString("struct<a:int,b:string>");
+ // This would be re-used in the following tests as the actual record using
the schema.
+ OrcStruct baseStruct = (OrcStruct) OrcStruct.createValue(baseStructSchema);
+ // Fill in the baseStruct with specified value.
+ OrcTestUtils.fillOrcStructWithFixedValue(baseStruct, baseStructSchema,
intValue, stringValue, boolValue);
+ TypeDescription evolved_baseStructSchema =
TypeDescription.fromString("struct<a:int,b:string,c:int>");
+ OrcStruct evolvedStruct = (OrcStruct)
OrcStruct.createValue(evolved_baseStructSchema);
+ // This should be equivalent to
deserialize(baseStruct).serialize(evolvedStruct, evolvedSchema);
+ OrcUtils.upConvertOrcStruct(baseStruct, evolvedStruct,
evolved_baseStructSchema);
+ // Check if all value in baseStruct is populated and newly created column
in evolvedStruct is filled with null.
+ Assert.assertEquals(((IntWritable)
evolvedStruct.getFieldValue("a")).get(), intValue);
+ Assert.assertEquals(((Text) evolvedStruct.getFieldValue("b")).toString(),
stringValue);
+ Assert.assertNull(evolvedStruct.getFieldValue("c"));
+
+ // Base case: Reverse direction, which is column projection on top-level
columns.
+ OrcStruct baseStruct_shadow = (OrcStruct)
OrcStruct.createValue(baseStructSchema);
+ OrcUtils.upConvertOrcStruct(evolvedStruct, baseStruct_shadow,
baseStructSchema);
+ Assert.assertEquals(baseStruct, baseStruct_shadow);
+
+ // Simple Nested: List/Map/Union within Struct.
+ // The element type of list contains a new field.
+ // Prepare two ListInStructs with different size ( the list field contains
different number of members)
+ TypeDescription listInStructSchema =
TypeDescription.fromString("struct<a:array<struct<a:int,b:string>>>");
+ OrcStruct listInStruct = (OrcStruct)
OrcUtils.createValueRecursively(listInStructSchema);
+ OrcTestUtils.fillOrcStructWithFixedValue(listInStruct, listInStructSchema,
intValue, stringValue, boolValue);
+ TypeDescription evolved_listInStructSchema =
+
TypeDescription.fromString("struct<a:array<struct<a:int,b:string,c:int>>>");
+ OrcStruct evolved_listInStruct = (OrcStruct)
OrcUtils.createValueRecursively(evolved_listInStructSchema);
+ // Convert and verify contents.
+ OrcUtils.upConvertOrcStruct(listInStruct, evolved_listInStruct,
evolved_listInStructSchema);
+ Assert.assertEquals(
+ ((IntWritable) ((OrcStruct) ((OrcList)
evolved_listInStruct.getFieldValue("a")).get(0)).getFieldValue("a"))
+ .get(), intValue);
+ Assert.assertEquals(
+ ((Text) ((OrcStruct) ((OrcList)
evolved_listInStruct.getFieldValue("a")).get(0)).getFieldValue("b")).toString(),
+ stringValue);
+ Assert.assertNull((((OrcStruct) ((OrcList)
evolved_listInStruct.getFieldValue("a")).get(0)).getFieldValue("c")));
+ // Add cases when original OrcStruct has its list member having different
number of elements then the destination OrcStruct.
+ // original has list.size() = 2, target has list.size() = 1
+ listInStruct = (OrcStruct)
OrcUtils.createValueRecursively(listInStructSchema, 2);
+ OrcTestUtils.fillOrcStructWithFixedValue(listInStruct, listInStructSchema,
intValue, stringValue, boolValue);
+ Assert.assertNotEquals(((OrcList)listInStruct.getFieldValue("a")).size(),
+ ((OrcList)evolved_listInStruct.getFieldValue("a")).size());
+ OrcUtils.upConvertOrcStruct(listInStruct, evolved_listInStruct,
evolved_listInStructSchema);
+ Assert.assertEquals(((OrcList)
evolved_listInStruct.getFieldValue("a")).size(), 2);
+ // Original has lise.size()=0, target has list.size() = 1
+ ((OrcList)listInStruct.getFieldValue("a")).clear();
+ OrcUtils.upConvertOrcStruct(listInStruct, evolved_listInStruct,
evolved_listInStructSchema);
+ Assert.assertEquals(((OrcList)
evolved_listInStruct.getFieldValue("a")).size(), 0);
+
+ // Map within Struct, contains a type-widening in the map-value type.
+ TypeDescription mapInStructSchema =
TypeDescription.fromString("struct<a:map<string,int>>");
+ OrcStruct mapInStruct = (OrcStruct)
OrcStruct.createValue(mapInStructSchema);
+ TypeDescription mapSchema =
TypeDescription.createMap(TypeDescription.createString(),
TypeDescription.createInt());
+ OrcMap mapEntry = new OrcMap(mapSchema);
+ mapEntry.put(new Text(""), new IntWritable());
+ mapInStruct.setFieldValue("a", mapEntry);
+ OrcTestUtils.fillOrcStructWithFixedValue(mapEntry, mapSchema, intValue,
stringValue, boolValue);
+ // Create the target struct with evolved schema
+ TypeDescription evolved_mapInStructSchema =
TypeDescription.fromString("struct<a:map<string,bigint>>");
+ OrcStruct evolved_mapInStruct = (OrcStruct)
OrcStruct.createValue(evolved_mapInStructSchema);
+ OrcMap evolvedMapEntry =
+ new OrcMap(TypeDescription.createMap(TypeDescription.createString(),
TypeDescription.createInt()));
+ evolvedMapEntry.put(new Text(""), new LongWritable(2L));
+ evolvedMapEntry.put(new Text(""), new LongWritable(3L));
+ evolved_mapInStruct.setFieldValue("a", evolvedMapEntry);
+ // convert and verify: Type-widening is correct, and size of output file
is correct.
+ OrcUtils.upConvertOrcStruct(mapInStruct, evolved_mapInStruct,
evolved_mapInStructSchema);
+
+ Assert.assertEquals(((OrcMap)
evolved_mapInStruct.getFieldValue("a")).get(new Text(stringValue)),
+ new LongWritable(intValue));
+ Assert.assertEquals(((OrcMap)
evolved_mapInStruct.getFieldValue("a")).size(), 1);
+ // re-use the same object but the source struct has fewer member in the
map entry.
+ mapEntry.put(new Text(""), new IntWritable(1));
+ // sanity check
+ Assert.assertEquals(((OrcMap) mapInStruct.getFieldValue("a")).size(), 2);
+ OrcUtils.upConvertOrcStruct(mapInStruct, evolved_mapInStruct,
evolved_mapInStructSchema);
+ Assert.assertEquals(((OrcMap)
evolved_mapInStruct.getFieldValue("a")).size(), 2);
+ Assert.assertEquals(((OrcMap)
evolved_mapInStruct.getFieldValue("a")).get(new Text(stringValue)),
+ new LongWritable(intValue));
+
+ // Union in struct, type widening within the union's member field.
+ TypeDescription unionInStructSchema =
TypeDescription.fromString("struct<a:uniontype<int,string>>");
+ OrcStruct unionInStruct = (OrcStruct)
OrcStruct.createValue(unionInStructSchema);
+ OrcUnion placeHolderUnion = new
OrcUnion(TypeDescription.fromString("uniontype<int,string>"));
+ placeHolderUnion.set(0, new IntWritable(1));
+ unionInStruct.setFieldValue("a", placeHolderUnion);
+ OrcTestUtils.fillOrcStructWithFixedValue(unionInStruct,
unionInStructSchema, intValue, stringValue, boolValue);
+ // Create new structWithUnion
+ TypeDescription evolved_unionInStructSchema =
TypeDescription.fromString("struct<a:uniontype<bigint,string>>");
+ OrcStruct evolvedUnionInStruct = (OrcStruct)
OrcStruct.createValue(evolved_unionInStructSchema);
+ OrcUnion evolvedPlaceHolderUnion = new
OrcUnion(TypeDescription.fromString("uniontype<bigint,string>"));
+ evolvedPlaceHolderUnion.set(0, new LongWritable(1L));
+ evolvedUnionInStruct.setFieldValue("a", evolvedPlaceHolderUnion);
+ OrcUtils.upConvertOrcStruct(unionInStruct, evolvedUnionInStruct,
evolved_unionInStructSchema);
+ // Check in the tag 0(Default from value-filler) within
evolvedUnionInStruct, the value is becoming type-widened with correct value.
+ Assert.assertEquals(((OrcUnion)
evolvedUnionInStruct.getFieldValue("a")).getTag(), 0);
+ Assert.assertEquals(((OrcUnion)
evolvedUnionInStruct.getFieldValue("a")).getObject(), new
LongWritable(intValue));
+ // Check the case when union field is created in different tag.
+
+ // Complex: List<Struct> within struct among others and evolution happens
on multiple places, also type-widening in deeply nested level.
+ TypeDescription complexOrcSchema =
+
TypeDescription.fromString("struct<a:array<struct<a:string,b:int>>,b:struct<a:uniontype<int,string>>>");
+ OrcStruct complexOrcStruct = (OrcStruct)
OrcUtils.createValueRecursively(complexOrcSchema);
+ OrcTestUtils.fillOrcStructWithFixedValue(complexOrcStruct,
complexOrcSchema, intValue, stringValue, boolValue);
+ TypeDescription evolvedComplexOrcSchema = TypeDescription
+
.fromString("struct<a:array<struct<a:string,b:bigint,c:string>>,b:struct<a:uniontype<bigint,string>,b:int>>");
+ OrcStruct evolvedComplexStruct = (OrcStruct)
OrcUtils.createValueRecursively(evolvedComplexOrcSchema);
+ OrcTestUtils
+ .fillOrcStructWithFixedValue(evolvedComplexStruct,
evolvedComplexOrcSchema, intValue, stringValue, boolValue);
+ // Check if new columns are assigned with null value and type widening is
working fine.
+ OrcUtils.upConvertOrcStruct(complexOrcStruct, evolvedComplexStruct,
evolvedComplexOrcSchema);
+ Assert
+
.assertEquals(((OrcStruct)((OrcList)evolvedComplexStruct.getFieldValue("a")).get(0)).getFieldValue("b"),
new LongWritable(intValue));
+
Assert.assertNull(((OrcStruct)((OrcList)evolvedComplexStruct.getFieldValue("a")).get(0)).getFieldValue("c"));
+ Assert.assertEquals(((OrcUnion)
((OrcStruct)evolvedComplexStruct.getFieldValue("b")).getFieldValue("a")).getObject(),
new LongWritable(intValue));
+
Assert.assertNull(((OrcStruct)evolvedComplexStruct.getFieldValue("b")).getFieldValue("b"));
+ }
+
+ @Test
+ public void testNestedWithinUnionWithDiffTag() throws Exception {
+ // Construct union type with different tag for the src object dest object,
check if up-convert happens correctly.
+ TypeDescription structInUnionAsStruct =
TypeDescription.fromString("struct<a:uniontype<struct<a:int,b:string>,int>>");
+ OrcStruct structInUnionAsStructObject = (OrcStruct)
OrcUtils.createValueRecursively(structInUnionAsStruct);
+ OrcTestUtils
+ .fillOrcStructWithFixedValue(structInUnionAsStructObject,
structInUnionAsStruct, 0, intValue, stringValue, boolValue);
+
Assert.assertEquals(((OrcStruct)((OrcUnion)structInUnionAsStructObject.getFieldValue("a")).getObject())
+ .getFieldValue("a"), new IntWritable(intValue));
+
+ OrcStruct structInUnionAsStructObject_2 = (OrcStruct)
OrcUtils.createValueRecursively(structInUnionAsStruct);
+ OrcTestUtils
+ .fillOrcStructWithFixedValue(structInUnionAsStructObject_2,
structInUnionAsStruct, 1, intValue, stringValue, boolValue);
+
Assert.assertEquals(((OrcUnion)structInUnionAsStructObject_2.getFieldValue("a")).getObject(),
new IntWritable(intValue));
+
+ // Create a new record container, do up-convert twice and check if the
value is propagated properly.
+ OrcStruct container = (OrcStruct)
OrcUtils.createValueRecursively(structInUnionAsStruct);
+ OrcUtils.upConvertOrcStruct(structInUnionAsStructObject, container,
structInUnionAsStruct);
+ Assert.assertEquals(structInUnionAsStructObject, container);
+
+ OrcUtils.upConvertOrcStruct(structInUnionAsStructObject_2, container,
structInUnionAsStruct);
+ Assert.assertEquals(structInUnionAsStructObject_2, container);
+ }
+
+ /**
+ * This test mostly target at the following case:
+ * Schema: struct<a:array<struct<a:int,b:int>>>
+ * field a was set to null by one call of "upConvertOrcStruct", but the
subsequent call should still have the nested
+ * field filled.
+ */
+ public void testNestedFieldSequenceSet() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<a:array<struct<a:int,b:int>>>");
+ OrcStruct struct = (OrcStruct) OrcUtils.createValueRecursively(schema);
+ OrcTestUtils.fillOrcStructWithFixedValue(struct, schema, 1, "test", true);
+ OrcStruct structWithEmptyArray = (OrcStruct)
OrcUtils.createValueRecursively(schema);
+ OrcTestUtils.fillOrcStructWithFixedValue(structWithEmptyArray, schema, 1,
"test", true);
+ structWithEmptyArray.setFieldValue("a", null);
+ OrcUtils.upConvertOrcStruct(structWithEmptyArray, struct, schema);
+ Assert.assertEquals(struct, structWithEmptyArray);
+
+ OrcStruct struct_2 = (OrcStruct) OrcUtils.createValueRecursively(schema);
+ OrcTestUtils.fillOrcStructWithFixedValue(struct_2, schema, 2, "test",
true);
+ OrcUtils.upConvertOrcStruct(struct_2, struct, schema);
+ Assert.assertEquals(struct, struct_2);
+ }
+
+ /**
+ * Just a sanity test for column project, should be no difference from other
cases when provided reader schema.
+ */
+ @Test
+ public void testOrcStructProjection() throws Exception {
+ TypeDescription originalSchema =
TypeDescription.fromString("struct<a:struct<a:int,b:int>,b:struct<c:int,d:int>,c:int>");
+ OrcStruct originalStruct = (OrcStruct)
OrcUtils.createValueRecursively(originalSchema);
+ OrcTestUtils.fillOrcStructWithFixedValue(originalStruct, originalSchema,
intValue, stringValue, boolValue);
+
+ TypeDescription projectedSchema =
TypeDescription.fromString("struct<a:struct<b:int>,b:struct<c:int>>");
+ OrcStruct projectedStructExpectedValue = (OrcStruct)
OrcUtils.createValueRecursively(projectedSchema);
+ OrcTestUtils
+ .fillOrcStructWithFixedValue(projectedStructExpectedValue,
projectedSchema, intValue, stringValue, boolValue);
+ OrcStruct projectColumnStruct = (OrcStruct)
OrcUtils.createValueRecursively(projectedSchema);
+ OrcUtils.upConvertOrcStruct(originalStruct, projectColumnStruct,
projectedSchema);
+ Assert.assertEquals(projectColumnStruct, projectedStructExpectedValue);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
index 651c512..0e2e72c 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
@@ -18,8 +18,6 @@
package org.apache.gobblin.compaction.mapreduce.orc;
import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.SchemaEvolution;
-import org.apache.orc.mapred.OrcStruct;
import org.junit.Assert;
import org.testng.annotations.Test;
@@ -36,58 +34,4 @@ public class OrcValueMapperTest {
Assert.assertTrue(OrcUtils.isEvolutionValid(schema_1, schema_4));
Assert.assertTrue(OrcUtils.isEvolutionValid(schema_4, schema_1));
}
-
- @Test
- public void testUpConvertOrcStruct(){
- OrcValueMapper mapper = new OrcValueMapper();
-
- // Basic case.
- TypeDescription baseStructSchema =
TypeDescription.fromString("struct<a:int,b:string>");
- OrcStruct baseStruct = (OrcStruct) OrcStruct.createValue(baseStructSchema);
- TypeDescription evolved_baseStructSchema =
TypeDescription.fromString("struct<a:int,b:string,c:int>");
- OrcStruct evolvedStruct = (OrcStruct)
OrcStruct.createValue(evolved_baseStructSchema);
- OrcStruct resultStruct = mapper.upConvertOrcStruct(baseStruct,
evolved_baseStructSchema);
- Assert.assertEquals(resultStruct.getSchema(), evolved_baseStructSchema);
-
- // Base case: Reverse direction.
- resultStruct = mapper.upConvertOrcStruct(evolvedStruct, baseStructSchema);
- Assert.assertEquals(resultStruct.getSchema(), baseStructSchema);
-
- // Simple Nested: List/Map/Union/Struct within Struct.
- TypeDescription listInStructSchema =
TypeDescription.fromString("struct<a:array<struct<a:int,b:string>>>");
- OrcStruct listInStruct = (OrcStruct)
OrcStruct.createValue(listInStructSchema);
- TypeDescription evolved_listInStructSchema =
TypeDescription.fromString("struct<a:array<struct<a:int,b:string,c:string>>>");
- OrcStruct evolved_listInStruct = (OrcStruct)
OrcStruct.createValue(evolved_listInStructSchema);
- resultStruct = mapper.upConvertOrcStruct(listInStruct,
evolved_listInStructSchema);
- Assert.assertEquals(resultStruct.getSchema(), evolved_listInStructSchema);
- resultStruct = mapper.upConvertOrcStruct(evolved_listInStruct,
listInStructSchema);
- Assert.assertEquals(resultStruct.getSchema(), listInStructSchema);
-
- TypeDescription mapInStructSchema =
TypeDescription.fromString("struct<a:map<string,int>>");
- OrcStruct mapInStruct = (OrcStruct)
OrcStruct.createValue(mapInStructSchema);
- TypeDescription evolved_mapInStructSchema =
TypeDescription.fromString("struct<a:map<string,bigint>>");
- OrcStruct evolved_mapInStruct = (OrcStruct)
OrcStruct.createValue(evolved_mapInStructSchema);
- resultStruct = mapper.upConvertOrcStruct(mapInStruct,
evolved_mapInStructSchema);
- Assert.assertEquals(resultStruct.getSchema(), evolved_mapInStructSchema);
- resultStruct = mapper.upConvertOrcStruct(evolved_mapInStruct,
mapInStructSchema);
- // Evolution not valid, no up-conversion happened.
- try {
- resultStruct.getSchema().equals(evolved_mapInStructSchema);
- } catch (SchemaEvolution.IllegalEvolutionException ie) {
- Assert.assertTrue(true);
- }
-
- TypeDescription unionInStructSchema =
TypeDescription.fromString("struct<a:uniontype<int,string>>");
- OrcStruct unionInStruct = (OrcStruct)
OrcStruct.createValue(unionInStructSchema);
- TypeDescription evolved_unionInStructSchema =
TypeDescription.fromString("struct<a:uniontype<bigint,string>>");
- resultStruct = mapper.upConvertOrcStruct(unionInStruct,
evolved_unionInStructSchema);
- Assert.assertEquals(resultStruct.getSchema(), evolved_unionInStructSchema);
-
- // Complex: List<Struct> within struct among others and evolution happens
on multiple places.
- TypeDescription complex_1 =
TypeDescription.fromString("struct<a:array<struct<a:string,b:int>>,b:struct<a:uniontype<int,string>>>");
- OrcStruct complex_struct = (OrcStruct) OrcStruct.createValue(complex_1);
- TypeDescription evolved_complex_1 =
TypeDescription.fromString("struct<a:array<struct<a:string,b:int,c:string>>,b:struct<a:uniontype<bigint,string>,b:int>>");
- resultStruct = mapper.upConvertOrcStruct(complex_struct,
evolved_complex_1);
- Assert.assertEquals(resultStruct.getSchema(), evolved_complex_1);
- }
}
\ No newline at end of file
diff --git a/travis/test-groups.inc b/travis/test-groups.inc
index 89e4885..af6e517 100644
--- a/travis/test-groups.inc
+++ b/travis/test-groups.inc
@@ -1 +1 @@
-TEST_GROUP1=gobbin.yarn,gobblin.runtime
+TEST_GROUP1=gobbin.yarn,gobblin.runtime,gobblin.cluster,gobblin.compaction