This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 778a863  [GOBBLIN-1257] Fix the handling of collection field types 
during ORC schema up-conversion in compaction[]
778a863 is described below

commit 778a8634978981e63d3a096af336b14f79aebeca
Author: sv2000 <[email protected]>
AuthorDate: Tue Sep 8 11:26:14 2020 -0700

    [GOBBLIN-1257] Fix the handling of collection field types during ORC schema 
up-conversion in compaction[]
    
    Closes #3097 from sv2000/orcCompaction
---
 gobblin-compaction/build.gradle                    |   1 +
 .../gobblin/compaction/mapreduce/orc/OrcUtils.java |  13 +-
 .../test/TestCompactionOrcJobConfigurator.java     |  45 +++++
 .../mapreduce/test/TestCompactionTaskUtils.java    |  59 +++++++
 .../mapreduce/test/TestOrcCompactionTask.java      | 142 ++++++++++++++++
 .../main/resources/orcCompactionTest/data1.json    |   1 +
 .../main/resources/orcCompactionTest/data1.schema  |   1 +
 .../mapreduce/AvroCompactionTaskTest.java          |  46 ++----
 .../mapreduce/OrcCompactionTaskTest.java           |  49 ++----
 .../compaction/mapreduce/orc/OrcUtilsTest.java     | 184 +++++++++++++--------
 gradle/scripts/dependencyDefinitions.gradle        |   1 +
 11 files changed, 398 insertions(+), 144 deletions(-)

diff --git a/gobblin-compaction/build.gradle b/gobblin-compaction/build.gradle
index 846f788..4e607ad 100644
--- a/gobblin-compaction/build.gradle
+++ b/gobblin-compaction/build.gradle
@@ -31,6 +31,7 @@ dependencies {
   // with hive-exec-core in older version(1.0.1), we need to shadow 
ord-mapreduce's transitive deps.
   // and include direct orc-mapreduce library just as a compileOnly dependency
   compileOnly externalDependency.orcMapreduce
+  compile externalDependency.orcTools
   testCompileOnly externalDependency.orcMapreduce
   compile project(path: ":gobblin-modules:gobblin-orc-dep", 
configuration:"shadow")
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
index 4dabf53..f3da03b 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -208,11 +207,10 @@ public class OrcUtils {
       OrcList castedList = (OrcList) w;
       OrcList targetList = (OrcList) v;
       TypeDescription elementType = targetSchema.getChildren().get(0);
-      WritableComparable targetListRecordContainer =
-          targetList.size() > 0 ? (WritableComparable) targetList.get(0) : 
createValueRecursively(elementType, 0);
       targetList.clear();
 
       for (int i = 0; i < castedList.size(); i++) {
+        WritableComparable targetListRecordContainer = 
createValueRecursively(elementType, 0);
         targetList.add(i,
             structConversionHelper((WritableComparable) castedList.get(i), 
targetListRecordContainer, elementType));
       }
@@ -220,19 +218,12 @@ public class OrcUtils {
       OrcMap castedMap = (OrcMap) w;
       OrcMap targetMap = (OrcMap) v;
       TypeDescription valueSchema = targetSchema.getChildren().get(1);
-
-      // Create recordContainer with the schema of value.
-      Iterator targetMapEntries = targetMap.values().iterator();
-      WritableComparable targetMapRecordContainer =
-          targetMapEntries.hasNext() ? (WritableComparable) 
targetMapEntries.next()
-              : createValueRecursively(valueSchema);
-
       targetMap.clear();
 
       for (Object entry : castedMap.entrySet()) {
         Map.Entry<WritableComparable, WritableComparable> castedEntry =
             (Map.Entry<WritableComparable, WritableComparable>) entry;
-
+        WritableComparable targetMapRecordContainer = 
createValueRecursively(valueSchema);
         targetMapRecordContainer =
             structConversionHelper(castedEntry.getValue(), 
targetMapRecordContainer, valueSchema);
         targetMap.put(castedEntry.getKey(), targetMapRecordContainer);
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionOrcJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionOrcJobConfigurator.java
new file mode 100644
index 0000000..9470ba3
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionOrcJobConfigurator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.test;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.CompactionOrcJobConfigurator;
+import org.apache.gobblin.configuration.State;
+
+
+public class TestCompactionOrcJobConfigurator extends 
CompactionOrcJobConfigurator {
+  public static class Factory implements 
CompactionJobConfigurator.ConfiguratorFactory {
+    @Override
+    public TestCompactionOrcJobConfigurator createConfigurator(State state) 
throws IOException {
+      return new TestCompactionOrcJobConfigurator(state);
+    }
+  }
+
+  @Override
+  protected void setNumberOfReducers(Job job) {
+    job.setNumReduceTasks(1);
+  }
+
+  public TestCompactionOrcJobConfigurator(State state) throws IOException {
+    super(state);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionTaskUtils.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionTaskUtils.java
new file mode 100644
index 0000000..92ca2bc
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionTaskUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.compaction.mapreduce.test;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+
+public class TestCompactionTaskUtils {
+  public static final String PATH_SEPARATOR = "/";
+
+  public static final String DEFAULT_INPUT_SUBDIR_TYPE = "minutely";
+  public static EmbeddedGobblin createEmbeddedGobblinCompactionJob(String 
name, String basePath) {
+    return createEmbeddedGobblinCompactionJob(name, basePath, 
DEFAULT_INPUT_SUBDIR_TYPE);
+  }
+
+  public static EmbeddedGobblin createEmbeddedGobblinCompactionJob(String 
name, String basePath, String inputSubdirType) {
+    String pattern;
+    String outputSubdirType;
+    if (inputSubdirType.equals(DEFAULT_INPUT_SUBDIR_TYPE)) {
+      pattern = new Path(basePath, "*/*/minutely/*/*/*/*").toString();
+      outputSubdirType = "hourly";
+    } else {
+      pattern = new Path(basePath, "*/*/hourly/*/*/*").toString();
+      outputSubdirType = "daily";
+    }
+
+    return new EmbeddedGobblin(name)
+        .setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY, 
CompactionSource.class.getName())
+        
.setConfiguration(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY, 
pattern)
+        .setConfiguration(MRCompactor.COMPACTION_INPUT_DIR, basePath)
+        .setConfiguration(MRCompactor.COMPACTION_INPUT_SUBDIR, inputSubdirType)
+        .setConfiguration(MRCompactor.COMPACTION_DEST_DIR, basePath)
+        .setConfiguration(MRCompactor.COMPACTION_DEST_SUBDIR, outputSubdirType)
+        .setConfiguration(MRCompactor.COMPACTION_TMP_DEST_DIR, 
"/tmp/compaction/" + name)
+        
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
 "3000d")
+        
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
 "1d")
+        .setConfiguration(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0");
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestOrcCompactionTask.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestOrcCompactionTask.java
new file mode 100644
index 0000000..c07f7ac
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestOrcCompactionTask.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.Calendar;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.tools.convert.ConvertTool;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+
+import static 
org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils.PATH_SEPARATOR;
+
+/**
+ * A convenience class for running ORC compaction locally. Particularly useful 
for local debugging. The method takes as
+ * input a resource folder containing Json files containing input records, and 
an ORC data1.schema file, and generates
+ * input ORC files in hourly folders under ${HOME_DIRECTORY}/data using the 
ORC {@link ConvertTool}. The ORC input data
+ * for compaction job is written under: 
${HOME_DIRECTORY}/data/tracking/testTopic/hourly/YYYY/MM/DD/HH.
+ * The output of the compaction job is written under: 
${HOME_DIRECTORY}/data/tracking/testTopic/daily/YYYY/MM/DD.
+ * The year, month, day is derived from the header.time field in the input 
records.
+ *
+ * Assumptions:
+ * <ul>
+ *   <li>The input data has the header.time field, which is assumed to be the 
epoch time in millis</li>
+ *   <li>Associated with each json file, the must be a corresponding schema 
file containing the ORC schema definition. The schema file
+ *   must have the same filename (without extension) as the corresponding json 
file. See the orcCompactionTest resource folder for
+ *   an example. </li>
+ * </ul>
+ *
+ * When running the main() method in your IDE, make sure to remove the 
hive-exec, log4j-over-slf4j and xerces jars from
+ * the Project's External Libraries.
+ *
+ */
+public class TestOrcCompactionTask {
+  private static final JsonParser PARSER = new JsonParser();
+  private static final String HOURLY_SUBDIR = "tracking/testTopic/hourly";
+  private static final String JSON_FILE_EXTENSION = "json";
+  private static final String TEST_RESOURCE_FOLDER_NAME = "orcCompactionTest";
+
+  public static void main(String[] args) throws Exception {
+    File basePath = new File(System.getProperty("user.home"), 
TEST_RESOURCE_FOLDER_NAME);
+    if (basePath.exists()) {
+      FileUtils.deleteDirectory(basePath);
+    }
+    boolean mkdirs = basePath.mkdirs();
+    Preconditions.checkArgument(mkdirs, "Unable to create: " + 
basePath.getAbsolutePath());
+    URL resourceURL = 
TestOrcCompactionTask.class.getClassLoader().getResource(TEST_RESOURCE_FOLDER_NAME);
+    Preconditions.checkArgument(resourceURL != null, "Could not find resource: 
" + TEST_RESOURCE_FOLDER_NAME);
+    File resourceDirectory = new File(resourceURL.getFile());
+
+    for (File file: resourceDirectory.listFiles()) {
+      if(isJsonFile(file)) {
+        createOrcFile(file, basePath.getAbsolutePath());
+      }
+    }
+    EmbeddedGobblin embeddedGobblin =
+        TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob("basic", 
basePath.getAbsolutePath(), "hourly")
+            
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+                TestCompactionOrcJobConfigurator.Factory.class.getName());
+    embeddedGobblin.run();
+  }
+
+  private static void createOrcFile(File file, String basePath)
+      throws IOException, ParseException {
+    JsonElement jsonElement;
+    try (Reader reader = new InputStreamReader(new FileInputStream(file), 
Charset.defaultCharset())) {
+      jsonElement = PARSER.parse(reader);
+    }
+
+    //Get header.time
+    long timestamp = 
jsonElement.getAsJsonObject().get("header").getAsJsonObject().get("time").getAsLong();
+    File hourlyPath = new File(getPath(basePath, timestamp));
+    if (!hourlyPath.exists()) {
+      boolean result = hourlyPath.mkdirs();
+      Preconditions.checkArgument(result, "Unable to create: " + 
hourlyPath.getAbsolutePath());
+    }
+    String fileNameWithoutExtensions = 
Files.getNameWithoutExtension(file.getName());
+    File schemaFile = new File(file.getParent(), fileNameWithoutExtensions + 
".schema");
+    String orcSchema = FileUtils.readFileToString(schemaFile, 
Charset.defaultCharset());
+    String orcFileName = hourlyPath.getAbsolutePath() + PATH_SEPARATOR + 
fileNameWithoutExtensions + ".orc";
+    File orcFile = new File(orcFileName);
+    //Delete if file already exists
+    if (orcFile.exists()) {
+      boolean result = orcFile.delete();
+      Preconditions.checkArgument(result, "Unable to delete: " + 
orcFile.getAbsolutePath());
+    }
+    //Convert to ORC using the corresponding schema
+    String[] convertToolArgs = new String[]{"-s", orcSchema, 
file.getAbsolutePath(), "-o", orcFileName};
+    ConvertTool.main(new Configuration(), convertToolArgs);
+  }
+
+  /**
+   * A helper method that returns the absolute path of the hourly folder given 
a timestamp and a basePath.
+   * @param basePath e.g. /Users/foo/orcCompactionTaskTest
+   * @param timestamp the unix timestamp in milliseconds
+   * @return the output path of the hourly folder e.g. 
/Users/foo/orcCompactionTaskTest/hourly/2020/08/20/12
+   */
+  private static String getPath(String basePath, Long timestamp) {
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTimeInMillis(timestamp);
+    String year = Integer.toString(calendar.get(Calendar.YEAR));
+    String month = String.format("%02d", calendar.get(Calendar.MONTH));
+    String day = String.format("%02d", calendar.get(Calendar.DAY_OF_MONTH));
+    String hour = String.format("%02d", calendar.get(Calendar.HOUR_OF_DAY));
+    return Joiner.on(PATH_SEPARATOR).join(basePath, HOURLY_SUBDIR, year, 
month, day, hour);
+  }
+
+  private static boolean isJsonFile(File file) {
+    return Files.getFileExtension(file.getName()).equals(JSON_FILE_EXTENSION);
+  }
+}
diff --git a/gobblin-compaction/src/main/resources/orcCompactionTest/data1.json 
b/gobblin-compaction/src/main/resources/orcCompactionTest/data1.json
new file mode 100644
index 0000000..6ad0c76
--- /dev/null
+++ b/gobblin-compaction/src/main/resources/orcCompactionTest/data1.json
@@ -0,0 +1 @@
+{"header": { "memberId": 1, "time" : 1599171293000}, "name": "Alyssa"}
\ No newline at end of file
diff --git 
a/gobblin-compaction/src/main/resources/orcCompactionTest/data1.schema 
b/gobblin-compaction/src/main/resources/orcCompactionTest/data1.schema
new file mode 100644
index 0000000..cb10052
--- /dev/null
+++ b/gobblin-compaction/src/main/resources/orcCompactionTest/data1.schema
@@ -0,0 +1 @@
+struct<header:struct<memberId:int,time:bigint>,name:string>
\ No newline at end of file
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 145ecf1..223fc57 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -28,8 +28,6 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
-import org.apache.gobblin.configuration.State;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,12 +41,14 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
+import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories;
 import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.data.management.dataset.DatasetUtils;
 import 
org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
 import org.apache.gobblin.data.management.dataset.TimePartitionGlobFinder;
@@ -56,6 +56,8 @@ import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobData
 import org.apache.gobblin.runtime.api.JobExecutionResult;
 import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
 
+import static 
org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob;
+
 
 @Slf4j
 @Test(groups = {"gobblin.compaction"})
@@ -70,7 +72,6 @@ public class AvroCompactionTaskTest {
 
   @Test
   public void testDedup() throws Exception {
-
     File basePath = Files.createTempDir();
     basePath.deleteOnExit();
 
@@ -85,7 +86,7 @@ public class AvroCompactionTaskTest {
     File newestFile = writeFileWithContent(jobDir, "file3", r3, 10, 
r3.getSchema());
     newestFile.setLastModified(Long.MAX_VALUE);
 
-    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("dedup", 
basePath.getAbsolutePath().toString());
+    EmbeddedGobblin embeddedGobblin = 
createEmbeddedGobblinCompactionJob("dedup", basePath.getAbsolutePath());
     JobExecutionResult result = embeddedGobblin.run();
     Assert.assertTrue(result.isSuccessful());
   }
@@ -103,7 +104,7 @@ public class AvroCompactionTaskTest {
     writeFileWithContent(jobDir, "file1", r1, 20);
     writeFileWithContent(jobDir, "file2", r2, 18);
 
-    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("non-dedup", 
basePath.getAbsolutePath().toString());
+    EmbeddedGobblin embeddedGobblin = 
createEmbeddedGobblinCompactionJob("non-dedup", 
basePath.getAbsolutePath().toString());
     JobExecutionResult result = embeddedGobblin.run();
     Assert.assertTrue(result.isSuccessful());
   }
@@ -155,7 +156,7 @@ public class AvroCompactionTaskTest {
     GenericRecord r1 = createRandomRecord();
     writeFileWithContent(jobDir, "file1", r1, 20);
 
-    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin 
("Recompaction-First", basePath);
+    EmbeddedGobblin embeddedGobblin = 
createEmbeddedGobblinCompactionJob("Recompaction-First", basePath);
     JobExecutionResult result = embeddedGobblin.run();
     long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path 
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
     Assert.assertTrue(result.isSuccessful());
@@ -163,7 +164,7 @@ public class AvroCompactionTaskTest {
 
     // Now write more avro files to input dir
     writeFileWithContent(jobDir, "file2", r1, 22);
-    EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin 
("Recompaction-Second", basePath);
+    EmbeddedGobblin embeddedGobblin_2 = 
createEmbeddedGobblinCompactionJob("Recompaction-Second", basePath);
     embeddedGobblin_2.run();
     Assert.assertTrue(result.isSuccessful());
 
@@ -185,7 +186,7 @@ public class AvroCompactionTaskTest {
     GenericRecord r1 = createRandomRecord();
     writeFileWithContent(jobDir, "file1", r1, 20);
 
-    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin 
("Recompaction-First", basePath);
+    EmbeddedGobblin embeddedGobblin = 
createEmbeddedGobblinCompactionJob("Recompaction-First", basePath);
     
embeddedGobblin.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER,
 "true");
     JobExecutionResult result = embeddedGobblin.run();
     long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path 
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
@@ -194,7 +195,7 @@ public class AvroCompactionTaskTest {
 
     // Now write more avro files to input dir
     writeFileWithContent(jobDir, "file2", r1, 22);
-    EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin 
("Recompaction-Second", basePath);
+    EmbeddedGobblin embeddedGobblin_2 = 
createEmbeddedGobblinCompactionJob("Recompaction-Second", basePath);
     
embeddedGobblin_2.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER,
 "true");
     embeddedGobblin_2.run();
     Assert.assertTrue(result.isSuccessful());
@@ -218,7 +219,7 @@ public class AvroCompactionTaskTest {
     GenericRecord r1 = createRandomRecord();
     writeFileWithContent(jobDir, "file1", r1, 20);
 
-    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin 
("Recompaction-First", basePath);
+    EmbeddedGobblin embeddedGobblin = 
createEmbeddedGobblinCompactionJob("Recompaction-First", basePath);
     JobExecutionResult result = embeddedGobblin.run();
     long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path 
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
     Assert.assertTrue(result.isSuccessful());
@@ -226,7 +227,7 @@ public class AvroCompactionTaskTest {
 
     // Now write more avro files to input dir
     writeFileWithContent(jobDir, "file2", r1, 22);
-    EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin 
("Recompaction-Second", basePath);
+    EmbeddedGobblin embeddedGobblin_2 = 
createEmbeddedGobblinCompactionJob("Recompaction-Second", basePath);
     
embeddedGobblin_2.setConfiguration(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION,
 "8h");
     embeddedGobblin_2.run();
     Assert.assertTrue(result.isSuccessful());
@@ -304,24 +305,9 @@ public class AvroCompactionTaskTest {
       writer.close();
   }
 
-  static EmbeddedGobblin createEmbeddedGobblin (String name, String basePath) {
-    String pattern = new Path(basePath, "*/*/minutely/*/*/*/*").toString();
-
-    return new EmbeddedGobblin(name)
-            .setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY, 
CompactionSource.class.getName())
-            
.setConfiguration(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY, 
pattern)
-            .setConfiguration(MRCompactor.COMPACTION_INPUT_DIR, 
basePath.toString())
-            .setConfiguration(MRCompactor.COMPACTION_INPUT_SUBDIR, "minutely")
-            .setConfiguration(MRCompactor.COMPACTION_DEST_DIR, 
basePath.toString())
-            .setConfiguration(MRCompactor.COMPACTION_DEST_SUBDIR, "hourly")
-            .setConfiguration(MRCompactor.COMPACTION_TMP_DEST_DIR, 
"/tmp/compaction/" + name)
-            
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
 "3000d")
-            
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
 "1d")
-            .setConfiguration(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0");
-  }
 
   private EmbeddedGobblin createEmbeddedGobblinForAllFailures (String name, 
String basePath) {
-    return createEmbeddedGobblin(name, basePath)
+    return createEmbeddedGobblinCompactionJob(name, basePath)
         .setConfiguration(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY, 
"KafkaAuditCountHttpClientFactory")
         .setConfiguration(CompactionAuditCountVerifier.GOBBLIN_TIER, "dummy")
         .setConfiguration(CompactionAuditCountVerifier.ORIGIN_TIER, "dummy")
@@ -330,12 +316,12 @@ public class AvroCompactionTaskTest {
   }
 
   private EmbeddedGobblin createEmbeddedGobblinForHiveRegistrationFailure 
(String name, String basePath) {
-    return createEmbeddedGobblin(name, basePath)
+    return createEmbeddedGobblinCompactionJob(name, basePath)
         .setConfiguration(ConfigurationKeys.COMPACTION_SUITE_FACTORY, 
"HiveRegistrationFailureFactory");
   }
 
   private EmbeddedGobblin createEmbeddedGobblinWithPriority (String name, 
String basePath) {
-    return createEmbeddedGobblin(name, basePath)
+    return createEmbeddedGobblinCompactionJob(name, basePath)
         .setConfiguration(ConfigurationKeys.COMPACTION_PRIORITIZER_ALIAS, 
"TieredDatasets")
         .setConfiguration(SimpleDatasetHierarchicalPrioritizer.TIER_KEY + 
".0", "Identity")
         .setConfiguration(SimpleDatasetHierarchicalPrioritizer.TIER_KEY + 
".1", "EVG")
@@ -356,7 +342,7 @@ public class AvroCompactionTaskTest {
        writeFileWithContent(jobDir, "file_random", r1, 20);
      }
 
-     EmbeddedGobblin embeddedGobblin = 
createEmbeddedGobblin("workunit_stream", basePath.getAbsolutePath().toString());
+     EmbeddedGobblin embeddedGobblin = 
createEmbeddedGobblinCompactionJob("workunit_stream", 
basePath.getAbsolutePath().toString());
      JobExecutionResult result = embeddedGobblin.run();
 
      Assert.assertTrue(result.isSuccessful());
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index 33191b8..b9c4152 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -17,19 +17,13 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+
 import org.apache.commons.io.FilenameUtils;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcTestUtils;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.runtime.api.JobExecutionResult;
-import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,7 +32,6 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.orc.OrcFile;
@@ -52,11 +45,21 @@ import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import org.apache.gobblin.compaction.mapreduce.orc.OrcTestUtils;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
+import 
org.apache.gobblin.compaction.mapreduce.test.TestCompactionOrcJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+
 import static 
org.apache.gobblin.compaction.mapreduce.CompactionOrcJobConfigurator.ORC_MAPPER_SHUFFLE_KEY_SCHEMA;
-import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
 import static 
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET;
 import static 
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_SHOULD_DEDUPLICATE;
+import static 
org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob;
 
 @Test(groups = {"gobblin.compaction"})
 public class OrcCompactionTaskTest {
@@ -103,7 +106,7 @@ public class OrcCompactionTaskTest {
     // Testing data is schema'ed with "struct<i:int,j:int>"
     createTestingData(jobDir);
 
-    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic", 
basePath.getAbsolutePath().toString())
+    EmbeddedGobblin embeddedGobblin = 
TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob("basic", 
basePath.getAbsolutePath())
         
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
             TestCompactionOrcJobConfigurator.Factory.class.getName())
         .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
@@ -157,7 +160,7 @@ public class OrcCompactionTaskTest {
 
     // Verify execution
     // Overwrite the job configurator factory key.
-    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic", 
basePath.getAbsolutePath().toString())
+    EmbeddedGobblin embeddedGobblin = 
createEmbeddedGobblinCompactionJob("basic", basePath.getAbsolutePath())
         
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
         TestCompactionOrcJobConfigurator.Factory.class.getName())
         .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
@@ -239,7 +242,7 @@ public class OrcCompactionTaskTest {
     writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), nestedSchema, 
ImmutableList.of(nested_struct_1,
         nested_struct_2, nested_struct_3, nested_struct_4, nested_struct_5));
 
-    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic", 
basePath.getAbsolutePath().toString())
+    EmbeddedGobblin embeddedGobblin = 
createEmbeddedGobblinCompactionJob("basic", 
basePath.getAbsolutePath().toString())
         
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
             TestCompactionOrcJobConfigurator.Factory.class.getName())
         .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
@@ -287,7 +290,7 @@ public class OrcCompactionTaskTest {
 
     createTestingData(jobDir);
 
-    EmbeddedGobblin embeddedGobblin_nondedup = createEmbeddedGobblin("basic", 
basePath.getAbsolutePath().toString())
+    EmbeddedGobblin embeddedGobblin_nondedup = 
createEmbeddedGobblinCompactionJob("basic", 
basePath.getAbsolutePath().toString())
         
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
             TestCompactionOrcJobConfigurator.Factory.class.getName())
         .setConfiguration(COMPACTION_OUTPUT_EXTENSION, "orc")
@@ -361,22 +364,4 @@ public class OrcCompactionTaskTest {
     }
     recordWriter.close(new TaskAttemptContextImpl(configuration, new 
TaskAttemptID()));
   }
-
-  private static class TestCompactionOrcJobConfigurator extends 
CompactionOrcJobConfigurator {
-    public static class Factory implements 
CompactionJobConfigurator.ConfiguratorFactory {
-      @Override
-      public TestCompactionOrcJobConfigurator createConfigurator(State state) 
throws IOException {
-        return new TestCompactionOrcJobConfigurator(state);
-      }
-    }
-
-    @Override
-    protected void setNumberOfReducers(Job job) throws IOException {
-      job.setNumReduceTasks(1);
-    }
-
-    public TestCompactionOrcJobConfigurator(State state) throws IOException {
-      super(state);
-    }
-  }
 }
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
index 83e91bd..01487bc 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
@@ -32,9 +32,12 @@ import org.testng.annotations.Test;
 
 @Test(groups = {"gobblin.compaction"})
 public class OrcUtilsTest {
-
-  final int intValue = 10;
-  final String stringValue = "testString";
+  final int intValue1 = 10;
+  final String stringValue1 = "testString1";
+  final int intValue2 = 20;
+  final String stringValue2 = "testString2";
+  final int intValue3 = 30;
+  final String stringValue3 = "testString3";
   final boolean boolValue = true;
 
   @Test
@@ -108,97 +111,136 @@ public class OrcUtilsTest {
   }
 
   @Test
-  public void testUpConvertOrcStruct() {
-
+  public void testUpConvertSimpleOrcStruct() {
     // Basic case, all primitives, newly added value will be set to null
     TypeDescription baseStructSchema = 
TypeDescription.fromString("struct<a:int,b:string>");
     // This would be re-used in the following tests as the actual record using 
the schema.
     OrcStruct baseStruct = (OrcStruct) OrcStruct.createValue(baseStructSchema);
     // Fill in the baseStruct with specified value.
-    OrcTestUtils.fillOrcStructWithFixedValue(baseStruct, baseStructSchema, 
intValue, stringValue, boolValue);
+    OrcTestUtils.fillOrcStructWithFixedValue(baseStruct, baseStructSchema, 
intValue1, stringValue1, boolValue);
     TypeDescription evolved_baseStructSchema = 
TypeDescription.fromString("struct<a:int,b:string,c:int>");
     OrcStruct evolvedStruct = (OrcStruct) 
OrcStruct.createValue(evolved_baseStructSchema);
     // This should be equivalent to 
deserialize(baseStruct).serialize(evolvedStruct, evolvedSchema);
     OrcUtils.upConvertOrcStruct(baseStruct, evolvedStruct, 
evolved_baseStructSchema);
     // Check if all value in baseStruct is populated and newly created column 
in evolvedStruct is filled with null.
-    Assert.assertEquals(((IntWritable) 
evolvedStruct.getFieldValue("a")).get(), intValue);
-    Assert.assertEquals(((Text) evolvedStruct.getFieldValue("b")).toString(), 
stringValue);
+    Assert.assertEquals(((IntWritable) 
evolvedStruct.getFieldValue("a")).get(), intValue1);
+    Assert.assertEquals(evolvedStruct.getFieldValue("b").toString(), 
stringValue1);
     Assert.assertNull(evolvedStruct.getFieldValue("c"));
 
     // Base case: Reverse direction, which is column projection on top-level 
columns.
     OrcStruct baseStruct_shadow = (OrcStruct) 
OrcStruct.createValue(baseStructSchema);
     OrcUtils.upConvertOrcStruct(evolvedStruct, baseStruct_shadow, 
baseStructSchema);
     Assert.assertEquals(baseStruct, baseStruct_shadow);
+  }
 
-    // Simple Nested: List/Map/Union within Struct.
+  @Test
+  public void testUpConvertOrcStructOfList() {
+    // Simple Nested: List within Struct.
     // The element type of list contains a new field.
     // Prepare two ListInStructs with different size ( the list field contains 
different number of members)
-    TypeDescription listInStructSchema = 
TypeDescription.fromString("struct<a:array<struct<a:int,b:string>>>");
-    OrcStruct listInStruct = (OrcStruct) 
OrcUtils.createValueRecursively(listInStructSchema);
-    OrcTestUtils.fillOrcStructWithFixedValue(listInStruct, listInStructSchema, 
intValue, stringValue, boolValue);
-    TypeDescription evolved_listInStructSchema =
+    TypeDescription structOfListSchema = 
TypeDescription.fromString("struct<a:array<struct<a:int,b:string>>>");
+    OrcStruct structOfList = (OrcStruct) 
OrcUtils.createValueRecursively(structOfListSchema);
+
+    //Create an OrcList instance with two entries
+    TypeDescription innerStructSchema = 
TypeDescription.createStruct().addField("a", TypeDescription.createInt())
+        .addField("b", TypeDescription.createString());
+    OrcStruct innerStruct1 = new OrcStruct(innerStructSchema);
+    innerStruct1.setFieldValue("a", new IntWritable(intValue1));
+    innerStruct1.setFieldValue("b", new Text(stringValue1));
+
+    OrcStruct innerStruct2 = new OrcStruct(innerStructSchema);
+    innerStruct2.setFieldValue("a", new IntWritable(intValue2));
+    innerStruct2.setFieldValue("b", new Text(stringValue2));
+
+    TypeDescription listSchema = TypeDescription.createList(innerStructSchema);
+    OrcList orcList = new OrcList(listSchema);
+    orcList.add(innerStruct1);
+    orcList.add(innerStruct2);
+    structOfList.setFieldValue("a", orcList);
+
+    TypeDescription evolvedStructOfListSchema =
         
TypeDescription.fromString("struct<a:array<struct<a:int,b:string,c:int>>>");
-    OrcStruct evolved_listInStruct = (OrcStruct) 
OrcUtils.createValueRecursively(evolved_listInStructSchema);
+    OrcStruct evolvedStructOfList = (OrcStruct) 
OrcUtils.createValueRecursively(evolvedStructOfListSchema);
     // Convert and verify contents.
-    OrcUtils.upConvertOrcStruct(listInStruct, evolved_listInStruct, 
evolved_listInStructSchema);
+    OrcUtils.upConvertOrcStruct(structOfList, evolvedStructOfList, 
evolvedStructOfListSchema);
+    Assert.assertEquals(
+        ((IntWritable) ((OrcStruct) ((OrcList) 
evolvedStructOfList.getFieldValue("a")).get(0)).getFieldValue("a"))
+            .get(), intValue1);
     Assert.assertEquals(
-        ((IntWritable) ((OrcStruct) ((OrcList) 
evolved_listInStruct.getFieldValue("a")).get(0)).getFieldValue("a"))
-            .get(), intValue);
+        ((OrcStruct) ((OrcList) 
evolvedStructOfList.getFieldValue("a")).get(0)).getFieldValue("b").toString(),
+        stringValue1);
+    Assert.assertNull((((OrcStruct) ((OrcList) 
evolvedStructOfList.getFieldValue("a")).get(0)).getFieldValue("c")));
     Assert.assertEquals(
-        ((Text) ((OrcStruct) ((OrcList) 
evolved_listInStruct.getFieldValue("a")).get(0)).getFieldValue("b")).toString(),
-        stringValue);
-    Assert.assertNull((((OrcStruct) ((OrcList) 
evolved_listInStruct.getFieldValue("a")).get(0)).getFieldValue("c")));
-    // Add cases when original OrcStruct has its list member having different 
number of elements then the destination OrcStruct.
-    // original has list.size() = 2, target has list.size() = 1
-    listInStruct = (OrcStruct) 
OrcUtils.createValueRecursively(listInStructSchema, 2);
-    OrcTestUtils.fillOrcStructWithFixedValue(listInStruct, listInStructSchema, 
intValue, stringValue, boolValue);
-    Assert.assertNotEquals(((OrcList)listInStruct.getFieldValue("a")).size(),
-        ((OrcList)evolved_listInStruct.getFieldValue("a")).size());
-    OrcUtils.upConvertOrcStruct(listInStruct, evolved_listInStruct, 
evolved_listInStructSchema);
-    Assert.assertEquals(((OrcList) 
evolved_listInStruct.getFieldValue("a")).size(), 2);
-    // Original has lise.size()=0, target has list.size() = 1
-    ((OrcList)listInStruct.getFieldValue("a")).clear();
-    OrcUtils.upConvertOrcStruct(listInStruct, evolved_listInStruct, 
evolved_listInStructSchema);
-    Assert.assertEquals(((OrcList) 
evolved_listInStruct.getFieldValue("a")).size(), 0);
+        ((IntWritable) ((OrcStruct) ((OrcList) 
evolvedStructOfList.getFieldValue("a")).get(1)).getFieldValue("a"))
+            .get(), intValue2);
+    Assert.assertEquals(
+        ((OrcStruct) ((OrcList) 
evolvedStructOfList.getFieldValue("a")).get(1)).getFieldValue("b").toString(),
+        stringValue2);
+    Assert.assertNull((((OrcStruct) ((OrcList) 
evolvedStructOfList.getFieldValue("a")).get(1)).getFieldValue("c")));
+
+    //Create a list in source OrcStruct with 3 elements
+    structOfList = (OrcStruct) 
OrcUtils.createValueRecursively(structOfListSchema, 3);
+    OrcTestUtils.fillOrcStructWithFixedValue(structOfList, structOfListSchema, 
intValue1, stringValue1, boolValue);
+    Assert.assertNotEquals(((OrcList) structOfList.getFieldValue("a")).size(),
+        ((OrcList) evolvedStructOfList.getFieldValue("a")).size());
+    OrcUtils.upConvertOrcStruct(structOfList, evolvedStructOfList, 
evolvedStructOfListSchema);
+    Assert.assertEquals(((OrcList) 
evolvedStructOfList.getFieldValue("a")).size(), 3);
+    // Original has list.size()=0, target has list.size() = 1
+    ((OrcList) structOfList.getFieldValue("a")).clear();
+    OrcUtils.upConvertOrcStruct(structOfList, evolvedStructOfList, 
evolvedStructOfListSchema);
+    Assert.assertEquals(((OrcList) 
evolvedStructOfList.getFieldValue("a")).size(), 0);
+  }
 
+  @Test
+  public void testUpConvertOrcStructOfMap() {
     // Map within Struct, contains a type-widening in the map-value type.
-    TypeDescription mapInStructSchema = 
TypeDescription.fromString("struct<a:map<string,int>>");
-    OrcStruct mapInStruct = (OrcStruct) 
OrcStruct.createValue(mapInStructSchema);
+    TypeDescription structOfMapSchema = 
TypeDescription.fromString("struct<a:map<string,int>>");
+    OrcStruct structOfMap = (OrcStruct) 
OrcStruct.createValue(structOfMapSchema);
     TypeDescription mapSchema = 
TypeDescription.createMap(TypeDescription.createString(), 
TypeDescription.createInt());
-    OrcMap mapEntry = new OrcMap(mapSchema);
-    mapEntry.put(new Text(""), new IntWritable());
-    mapInStruct.setFieldValue("a", mapEntry);
-    OrcTestUtils.fillOrcStructWithFixedValue(mapEntry, mapSchema, intValue, 
stringValue, boolValue);
+    OrcMap testMap = new OrcMap(mapSchema);
+    //Add dummy entries to initialize the testMap. The actual keys and values 
will be set later.
+    testMap.put(new Text(stringValue1), new IntWritable(intValue1));
+    testMap.put(new Text(stringValue2), new IntWritable(intValue2));
+    structOfMap.setFieldValue("a", testMap);
     // Create the target struct with evolved schema
-    TypeDescription evolved_mapInStructSchema = 
TypeDescription.fromString("struct<a:map<string,bigint>>");
-    OrcStruct evolved_mapInStruct = (OrcStruct) 
OrcStruct.createValue(evolved_mapInStructSchema);
-    OrcMap evolvedMapEntry =
+    TypeDescription evolvedStructOfMapSchema = 
TypeDescription.fromString("struct<a:map<string,bigint>>");
+    OrcStruct evolvedStructOfMap = (OrcStruct) 
OrcStruct.createValue(evolvedStructOfMapSchema);
+    OrcMap evolvedMap =
         new OrcMap(TypeDescription.createMap(TypeDescription.createString(), 
TypeDescription.createInt()));
-    evolvedMapEntry.put(new Text(""), new LongWritable(2L));
-    evolvedMapEntry.put(new Text(""), new LongWritable(3L));
-    evolved_mapInStruct.setFieldValue("a", evolvedMapEntry);
+    //Initialize a map
+    evolvedMap.put(new Text(""), new LongWritable());
+    evolvedStructOfMap.setFieldValue("a", evolvedMap);
     // convert and verify: Type-widening is correct, and size of output file 
is correct.
-    OrcUtils.upConvertOrcStruct(mapInStruct, evolved_mapInStruct, 
evolved_mapInStructSchema);
+    OrcUtils.upConvertOrcStruct(structOfMap, evolvedStructOfMap, 
evolvedStructOfMapSchema);
 
-    Assert.assertEquals(((OrcMap) 
evolved_mapInStruct.getFieldValue("a")).get(new Text(stringValue)),
-        new LongWritable(intValue));
-    Assert.assertEquals(((OrcMap) 
evolved_mapInStruct.getFieldValue("a")).size(), 1);
+    Assert.assertEquals(((OrcMap) 
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue1)),
+        new LongWritable(intValue1));
+    Assert.assertEquals(((OrcMap) 
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue2)),
+        new LongWritable(intValue2));
+    Assert.assertEquals(((OrcMap) 
evolvedStructOfMap.getFieldValue("a")).size(), 2);
     // re-use the same object but the source struct has fewer member in the 
map entry.
-    mapEntry.put(new Text(""), new IntWritable(1));
+    testMap.put(new Text(stringValue3), new IntWritable(intValue3));
     // sanity check
-    Assert.assertEquals(((OrcMap) mapInStruct.getFieldValue("a")).size(), 2);
-    OrcUtils.upConvertOrcStruct(mapInStruct, evolved_mapInStruct, 
evolved_mapInStructSchema);
-    Assert.assertEquals(((OrcMap) 
evolved_mapInStruct.getFieldValue("a")).size(), 2);
-    Assert.assertEquals(((OrcMap) 
evolved_mapInStruct.getFieldValue("a")).get(new Text(stringValue)),
-        new LongWritable(intValue));
+    Assert.assertEquals(((OrcMap) structOfMap.getFieldValue("a")).size(), 3);
+    OrcUtils.upConvertOrcStruct(structOfMap, evolvedStructOfMap, 
evolvedStructOfMapSchema);
+    Assert.assertEquals(((OrcMap) 
evolvedStructOfMap.getFieldValue("a")).size(), 3);
+    Assert.assertEquals(((OrcMap) 
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue1)),
+        new LongWritable(intValue1));
+    Assert.assertEquals(((OrcMap) 
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue2)),
+        new LongWritable(intValue2));
+    Assert.assertEquals(((OrcMap) 
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue3)),
+        new LongWritable(intValue3));
+  }
 
+  @Test
+  public void testUpConvertOrcStructOfUnion() {
     // Union in struct, type widening within the union's member field.
     TypeDescription unionInStructSchema = 
TypeDescription.fromString("struct<a:uniontype<int,string>>");
     OrcStruct unionInStruct = (OrcStruct) 
OrcStruct.createValue(unionInStructSchema);
     OrcUnion placeHolderUnion = new 
OrcUnion(TypeDescription.fromString("uniontype<int,string>"));
     placeHolderUnion.set(0, new IntWritable(1));
     unionInStruct.setFieldValue("a", placeHolderUnion);
-    OrcTestUtils.fillOrcStructWithFixedValue(unionInStruct, 
unionInStructSchema, intValue, stringValue, boolValue);
+    OrcTestUtils.fillOrcStructWithFixedValue(unionInStruct, 
unionInStructSchema, intValue1, stringValue1, boolValue);
     // Create new structWithUnion
     TypeDescription evolved_unionInStructSchema = 
TypeDescription.fromString("struct<a:uniontype<bigint,string>>");
     OrcStruct evolvedUnionInStruct = (OrcStruct) 
OrcStruct.createValue(evolved_unionInStructSchema);
@@ -208,42 +250,42 @@ public class OrcUtilsTest {
     OrcUtils.upConvertOrcStruct(unionInStruct, evolvedUnionInStruct, 
evolved_unionInStructSchema);
     // Check in the tag 0(Default from value-filler) within 
evolvedUnionInStruct, the value is becoming type-widened with correct value.
     Assert.assertEquals(((OrcUnion) 
evolvedUnionInStruct.getFieldValue("a")).getTag(), 0);
-    Assert.assertEquals(((OrcUnion) 
evolvedUnionInStruct.getFieldValue("a")).getObject(), new 
LongWritable(intValue));
+    Assert.assertEquals(((OrcUnion) 
evolvedUnionInStruct.getFieldValue("a")).getObject(), new 
LongWritable(intValue1));
     // Check the case when union field is created in different tag.
 
     // Complex: List<Struct> within struct among others and evolution happens 
on multiple places, also type-widening in deeply nested level.
     TypeDescription complexOrcSchema =
         
TypeDescription.fromString("struct<a:array<struct<a:string,b:int>>,b:struct<a:uniontype<int,string>>>");
     OrcStruct complexOrcStruct = (OrcStruct) 
OrcUtils.createValueRecursively(complexOrcSchema);
-    OrcTestUtils.fillOrcStructWithFixedValue(complexOrcStruct, 
complexOrcSchema, intValue, stringValue, boolValue);
+    OrcTestUtils.fillOrcStructWithFixedValue(complexOrcStruct, 
complexOrcSchema, intValue1, stringValue1, boolValue);
     TypeDescription evolvedComplexOrcSchema = TypeDescription
         
.fromString("struct<a:array<struct<a:string,b:bigint,c:string>>,b:struct<a:uniontype<bigint,string>,b:int>>");
     OrcStruct evolvedComplexStruct = (OrcStruct) 
OrcUtils.createValueRecursively(evolvedComplexOrcSchema);
     OrcTestUtils
-        .fillOrcStructWithFixedValue(evolvedComplexStruct, 
evolvedComplexOrcSchema, intValue, stringValue, boolValue);
+        .fillOrcStructWithFixedValue(evolvedComplexStruct, 
evolvedComplexOrcSchema, intValue1, stringValue1, boolValue);
     // Check if new columns are assigned with null value and type widening is 
working fine.
     OrcUtils.upConvertOrcStruct(complexOrcStruct, evolvedComplexStruct, 
evolvedComplexOrcSchema);
     Assert
-        
.assertEquals(((OrcStruct)((OrcList)evolvedComplexStruct.getFieldValue("a")).get(0)).getFieldValue("b"),
 new LongWritable(intValue));
+        
.assertEquals(((OrcStruct)((OrcList)evolvedComplexStruct.getFieldValue("a")).get(0)).getFieldValue("b"),
 new LongWritable(intValue1));
     
Assert.assertNull(((OrcStruct)((OrcList)evolvedComplexStruct.getFieldValue("a")).get(0)).getFieldValue("c"));
-    Assert.assertEquals(((OrcUnion) 
((OrcStruct)evolvedComplexStruct.getFieldValue("b")).getFieldValue("a")).getObject(),
 new LongWritable(intValue));
+    Assert.assertEquals(((OrcUnion) 
((OrcStruct)evolvedComplexStruct.getFieldValue("b")).getFieldValue("a")).getObject(),
 new LongWritable(intValue1));
     
Assert.assertNull(((OrcStruct)evolvedComplexStruct.getFieldValue("b")).getFieldValue("b"));
   }
 
   @Test
-  public void testNestedWithinUnionWithDiffTag() throws Exception {
+  public void testNestedWithinUnionWithDiffTag() {
     // Construct union type with different tag for the src object dest object, 
check if up-convert happens correctly.
     TypeDescription structInUnionAsStruct = 
TypeDescription.fromString("struct<a:uniontype<struct<a:int,b:string>,int>>");
     OrcStruct structInUnionAsStructObject = (OrcStruct) 
OrcUtils.createValueRecursively(structInUnionAsStruct);
     OrcTestUtils
-        .fillOrcStructWithFixedValue(structInUnionAsStructObject, 
structInUnionAsStruct, 0, intValue, stringValue, boolValue);
+        .fillOrcStructWithFixedValue(structInUnionAsStructObject, 
structInUnionAsStruct, 0, intValue1, stringValue1, boolValue);
     
Assert.assertEquals(((OrcStruct)((OrcUnion)structInUnionAsStructObject.getFieldValue("a")).getObject())
-        .getFieldValue("a"), new IntWritable(intValue));
+        .getFieldValue("a"), new IntWritable(intValue1));
 
     OrcStruct structInUnionAsStructObject_2 = (OrcStruct) 
OrcUtils.createValueRecursively(structInUnionAsStruct);
     OrcTestUtils
-        .fillOrcStructWithFixedValue(structInUnionAsStructObject_2, 
structInUnionAsStruct, 1, intValue, stringValue, boolValue);
-    
Assert.assertEquals(((OrcUnion)structInUnionAsStructObject_2.getFieldValue("a")).getObject(),
 new IntWritable(intValue));
+        .fillOrcStructWithFixedValue(structInUnionAsStructObject_2, 
structInUnionAsStruct, 1, intValue1, stringValue1, boolValue);
+    
Assert.assertEquals(((OrcUnion)structInUnionAsStructObject_2.getFieldValue("a")).getObject(),
 new IntWritable(intValue1));
 
     // Create a new record container, do up-convert twice and check if the 
value is propagated properly.
     OrcStruct container = (OrcStruct) 
OrcUtils.createValueRecursively(structInUnionAsStruct);
@@ -260,7 +302,7 @@ public class OrcUtilsTest {
    * field a was set to null by one call of "upConvertOrcStruct", but the 
subsequent call should still have the nested
    * field filled.
    */
-  public void testNestedFieldSequenceSet() throws Exception {
+  public void testNestedFieldSequenceSet() {
     TypeDescription schema = 
TypeDescription.fromString("struct<a:array<struct<a:int,b:int>>>");
     OrcStruct struct = (OrcStruct) OrcUtils.createValueRecursively(schema);
     OrcTestUtils.fillOrcStructWithFixedValue(struct, schema, 1, "test", true);
@@ -280,22 +322,22 @@ public class OrcUtilsTest {
    * Just a sanity test for column project, should be no difference from other 
cases when provided reader schema.
    */
   @Test
-  public void testOrcStructProjection() throws Exception {
+  public void testOrcStructProjection() {
     TypeDescription originalSchema = 
TypeDescription.fromString("struct<a:struct<a:int,b:int>,b:struct<c:int,d:int>,c:int>");
     OrcStruct originalStruct = (OrcStruct) 
OrcUtils.createValueRecursively(originalSchema);
-    OrcTestUtils.fillOrcStructWithFixedValue(originalStruct, originalSchema, 
intValue, stringValue, boolValue);
+    OrcTestUtils.fillOrcStructWithFixedValue(originalStruct, originalSchema, 
intValue1, stringValue1, boolValue);
 
     TypeDescription projectedSchema = 
TypeDescription.fromString("struct<a:struct<b:int>,b:struct<c:int>>");
     OrcStruct projectedStructExpectedValue = (OrcStruct) 
OrcUtils.createValueRecursively(projectedSchema);
     OrcTestUtils
-        .fillOrcStructWithFixedValue(projectedStructExpectedValue, 
projectedSchema, intValue, stringValue, boolValue);
+        .fillOrcStructWithFixedValue(projectedStructExpectedValue, 
projectedSchema, intValue1, stringValue1, boolValue);
     OrcStruct projectColumnStruct = (OrcStruct) 
OrcUtils.createValueRecursively(projectedSchema);
     OrcUtils.upConvertOrcStruct(originalStruct, projectColumnStruct, 
projectedSchema);
     Assert.assertEquals(projectColumnStruct, projectedStructExpectedValue);
   }
 
   @Test
-  public void complexTypeEligibilityCheck() throws Exception {
+  public void complexTypeEligibilityCheck() {
     TypeDescription struct_array_0 = 
TypeDescription.fromString("struct<first:array<int>,second:int>");
     TypeDescription struct_array_1 = 
TypeDescription.fromString("struct<first:array<int>,second:int>");
     Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_array_0, 
struct_array_1));
@@ -311,7 +353,7 @@ public class OrcUtilsTest {
     Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_map_0, 
struct_map_3));
   }
 
-  public void testSchemaContains() throws Exception {
+  public void testSchemaContains() {
     // Simple case.
     TypeDescription struct_0 = 
TypeDescription.fromString("struct<a:int,b:int>");
     TypeDescription struct_1 = TypeDescription.fromString("struct<a:int>");
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 8a12f44..7c8931f 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -171,6 +171,7 @@ ext.externalDependency = [
     "grok": "io.thekraken:grok:0.1.5",
     "hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
     "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.3",
+    "orcTools":"org.apache.orc:orc-tools:1.6.3",
     'parquet': 'org.apache.parquet:parquet-hadoop:1.10.1',
     'parquetAvro': 'org.apache.parquet:parquet-avro:1.10.1',
     'parquetProto': 'org.apache.parquet:parquet-protobuf:1.10.1',

Reply via email to