This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 778a863 [GOBBLIN-1257] Fix the handling of collection field types
during ORC schema up-conversion in compaction[]
778a863 is described below
commit 778a8634978981e63d3a096af336b14f79aebeca
Author: sv2000 <[email protected]>
AuthorDate: Tue Sep 8 11:26:14 2020 -0700
[GOBBLIN-1257] Fix the handling of collection field types during ORC schema
up-conversion in compaction[]
Closes #3097 from sv2000/orcCompaction
---
gobblin-compaction/build.gradle | 1 +
.../gobblin/compaction/mapreduce/orc/OrcUtils.java | 13 +-
.../test/TestCompactionOrcJobConfigurator.java | 45 +++++
.../mapreduce/test/TestCompactionTaskUtils.java | 59 +++++++
.../mapreduce/test/TestOrcCompactionTask.java | 142 ++++++++++++++++
.../main/resources/orcCompactionTest/data1.json | 1 +
.../main/resources/orcCompactionTest/data1.schema | 1 +
.../mapreduce/AvroCompactionTaskTest.java | 46 ++----
.../mapreduce/OrcCompactionTaskTest.java | 49 ++----
.../compaction/mapreduce/orc/OrcUtilsTest.java | 184 +++++++++++++--------
gradle/scripts/dependencyDefinitions.gradle | 1 +
11 files changed, 398 insertions(+), 144 deletions(-)
diff --git a/gobblin-compaction/build.gradle b/gobblin-compaction/build.gradle
index 846f788..4e607ad 100644
--- a/gobblin-compaction/build.gradle
+++ b/gobblin-compaction/build.gradle
@@ -31,6 +31,7 @@ dependencies {
// with hive-exec-core in older version(1.0.1), we need to shadow
ord-mapreduce's transitive deps.
// and include direct orc-mapreduce library just as a compileOnly dependency
compileOnly externalDependency.orcMapreduce
+ compile externalDependency.orcTools
testCompileOnly externalDependency.orcMapreduce
compile project(path: ":gobblin-modules:gobblin-orc-dep",
configuration:"shadow")
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
index 4dabf53..f3da03b 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -208,11 +207,10 @@ public class OrcUtils {
OrcList castedList = (OrcList) w;
OrcList targetList = (OrcList) v;
TypeDescription elementType = targetSchema.getChildren().get(0);
- WritableComparable targetListRecordContainer =
- targetList.size() > 0 ? (WritableComparable) targetList.get(0) :
createValueRecursively(elementType, 0);
targetList.clear();
for (int i = 0; i < castedList.size(); i++) {
+ WritableComparable targetListRecordContainer =
createValueRecursively(elementType, 0);
targetList.add(i,
structConversionHelper((WritableComparable) castedList.get(i),
targetListRecordContainer, elementType));
}
@@ -220,19 +218,12 @@ public class OrcUtils {
OrcMap castedMap = (OrcMap) w;
OrcMap targetMap = (OrcMap) v;
TypeDescription valueSchema = targetSchema.getChildren().get(1);
-
- // Create recordContainer with the schema of value.
- Iterator targetMapEntries = targetMap.values().iterator();
- WritableComparable targetMapRecordContainer =
- targetMapEntries.hasNext() ? (WritableComparable)
targetMapEntries.next()
- : createValueRecursively(valueSchema);
-
targetMap.clear();
for (Object entry : castedMap.entrySet()) {
Map.Entry<WritableComparable, WritableComparable> castedEntry =
(Map.Entry<WritableComparable, WritableComparable>) entry;
-
+ WritableComparable targetMapRecordContainer =
createValueRecursively(valueSchema);
targetMapRecordContainer =
structConversionHelper(castedEntry.getValue(),
targetMapRecordContainer, valueSchema);
targetMap.put(castedEntry.getKey(), targetMapRecordContainer);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionOrcJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionOrcJobConfigurator.java
new file mode 100644
index 0000000..9470ba3
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionOrcJobConfigurator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.test;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.CompactionOrcJobConfigurator;
+import org.apache.gobblin.configuration.State;
+
+
+public class TestCompactionOrcJobConfigurator extends
CompactionOrcJobConfigurator {
+ public static class Factory implements
CompactionJobConfigurator.ConfiguratorFactory {
+ @Override
+ public TestCompactionOrcJobConfigurator createConfigurator(State state)
throws IOException {
+ return new TestCompactionOrcJobConfigurator(state);
+ }
+ }
+
+ @Override
+ protected void setNumberOfReducers(Job job) {
+ job.setNumReduceTasks(1);
+ }
+
+ public TestCompactionOrcJobConfigurator(State state) throws IOException {
+ super(state);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionTaskUtils.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionTaskUtils.java
new file mode 100644
index 0000000..92ca2bc
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestCompactionTaskUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.compaction.mapreduce.test;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+
+public class TestCompactionTaskUtils {
+ public static final String PATH_SEPARATOR = "/";
+
+ public static final String DEFAULT_INPUT_SUBDIR_TYPE = "minutely";
+ public static EmbeddedGobblin createEmbeddedGobblinCompactionJob(String
name, String basePath) {
+ return createEmbeddedGobblinCompactionJob(name, basePath,
DEFAULT_INPUT_SUBDIR_TYPE);
+ }
+
+ public static EmbeddedGobblin createEmbeddedGobblinCompactionJob(String
name, String basePath, String inputSubdirType) {
+ String pattern;
+ String outputSubdirType;
+ if (inputSubdirType.equals(DEFAULT_INPUT_SUBDIR_TYPE)) {
+ pattern = new Path(basePath, "*/*/minutely/*/*/*/*").toString();
+ outputSubdirType = "hourly";
+ } else {
+ pattern = new Path(basePath, "*/*/hourly/*/*/*").toString();
+ outputSubdirType = "daily";
+ }
+
+ return new EmbeddedGobblin(name)
+ .setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY,
CompactionSource.class.getName())
+
.setConfiguration(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY,
pattern)
+ .setConfiguration(MRCompactor.COMPACTION_INPUT_DIR, basePath)
+ .setConfiguration(MRCompactor.COMPACTION_INPUT_SUBDIR, inputSubdirType)
+ .setConfiguration(MRCompactor.COMPACTION_DEST_DIR, basePath)
+ .setConfiguration(MRCompactor.COMPACTION_DEST_SUBDIR, outputSubdirType)
+ .setConfiguration(MRCompactor.COMPACTION_TMP_DEST_DIR,
"/tmp/compaction/" + name)
+
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
"3000d")
+
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
"1d")
+ .setConfiguration(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0");
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestOrcCompactionTask.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestOrcCompactionTask.java
new file mode 100644
index 0000000..c07f7ac
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/test/TestOrcCompactionTask.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce.test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.Calendar;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.tools.convert.ConvertTool;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+
+import static
org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils.PATH_SEPARATOR;
+
+/**
+ * A convenience class for running ORC compaction locally. Particularly useful
for local debugging. The method takes as
+ * input a resource folder containing Json files containing input records, and
an ORC data1.schema file, and generates
+ * input ORC files in hourly folders under ${HOME_DIRECTORY}/data using the
ORC {@link ConvertTool}. The ORC input data
+ * for compaction job is written under:
${HOME_DIRECTORY}/data/tracking/testTopic/hourly/YYYY/MM/DD/HH.
+ * The output of the compaction job is written under:
${HOME_DIRECTORY}/data/tracking/testTopic/daily/YYYY/MM/DD.
+ * The year, month, day is derived from the header.time field in the input
records.
+ *
+ * Assumptions:
+ * <ul>
+ * <li>The input data has the header.time field, which is assumed to be the
epoch time in millis</li>
+ * <li>Associated with each json file, the must be a corresponding schema
file containing the ORC schema definition. The schema file
+ * must have the same filename (without extension) as the corresponding json
file. See the orcCompactionTest resource folder for
+ * an example. </li>
+ * </ul>
+ *
+ * When running the main() method in your IDE, make sure to remove the
hive-exec, log4j-over-slf4j and xerces jars from
+ * the Project's External Libraries.
+ *
+ */
+public class TestOrcCompactionTask {
+ private static final JsonParser PARSER = new JsonParser();
+ private static final String HOURLY_SUBDIR = "tracking/testTopic/hourly";
+ private static final String JSON_FILE_EXTENSION = "json";
+ private static final String TEST_RESOURCE_FOLDER_NAME = "orcCompactionTest";
+
+ public static void main(String[] args) throws Exception {
+ File basePath = new File(System.getProperty("user.home"),
TEST_RESOURCE_FOLDER_NAME);
+ if (basePath.exists()) {
+ FileUtils.deleteDirectory(basePath);
+ }
+ boolean mkdirs = basePath.mkdirs();
+ Preconditions.checkArgument(mkdirs, "Unable to create: " +
basePath.getAbsolutePath());
+ URL resourceURL =
TestOrcCompactionTask.class.getClassLoader().getResource(TEST_RESOURCE_FOLDER_NAME);
+ Preconditions.checkArgument(resourceURL != null, "Could not find resource:
" + TEST_RESOURCE_FOLDER_NAME);
+ File resourceDirectory = new File(resourceURL.getFile());
+
+ for (File file: resourceDirectory.listFiles()) {
+ if(isJsonFile(file)) {
+ createOrcFile(file, basePath.getAbsolutePath());
+ }
+ }
+ EmbeddedGobblin embeddedGobblin =
+ TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob("basic",
basePath.getAbsolutePath(), "hourly")
+
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+ TestCompactionOrcJobConfigurator.Factory.class.getName());
+ embeddedGobblin.run();
+ }
+
+ private static void createOrcFile(File file, String basePath)
+ throws IOException, ParseException {
+ JsonElement jsonElement;
+ try (Reader reader = new InputStreamReader(new FileInputStream(file),
Charset.defaultCharset())) {
+ jsonElement = PARSER.parse(reader);
+ }
+
+ //Get header.time
+ long timestamp =
jsonElement.getAsJsonObject().get("header").getAsJsonObject().get("time").getAsLong();
+ File hourlyPath = new File(getPath(basePath, timestamp));
+ if (!hourlyPath.exists()) {
+ boolean result = hourlyPath.mkdirs();
+ Preconditions.checkArgument(result, "Unable to create: " +
hourlyPath.getAbsolutePath());
+ }
+ String fileNameWithoutExtensions =
Files.getNameWithoutExtension(file.getName());
+ File schemaFile = new File(file.getParent(), fileNameWithoutExtensions +
".schema");
+ String orcSchema = FileUtils.readFileToString(schemaFile,
Charset.defaultCharset());
+ String orcFileName = hourlyPath.getAbsolutePath() + PATH_SEPARATOR +
fileNameWithoutExtensions + ".orc";
+ File orcFile = new File(orcFileName);
+ //Delete if file already exists
+ if (orcFile.exists()) {
+ boolean result = orcFile.delete();
+ Preconditions.checkArgument(result, "Unable to delete: " +
orcFile.getAbsolutePath());
+ }
+ //Convert to ORC using the corresponding schema
+ String[] convertToolArgs = new String[]{"-s", orcSchema,
file.getAbsolutePath(), "-o", orcFileName};
+ ConvertTool.main(new Configuration(), convertToolArgs);
+ }
+
+ /**
+ * A helper method that returns the absolute path of the hourly folder given
a timestamp and a basePath.
+ * @param basePath e.g. /Users/foo/orcCompactionTaskTest
+ * @param timestamp the unix timestamp in milliseconds
+ * @return the output path of the hourly folder e.g.
/Users/foo/orcCompactionTaskTest/hourly/2020/08/20/12
+ */
+ private static String getPath(String basePath, Long timestamp) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(timestamp);
+ String year = Integer.toString(calendar.get(Calendar.YEAR));
+ String month = String.format("%02d", calendar.get(Calendar.MONTH));
+ String day = String.format("%02d", calendar.get(Calendar.DAY_OF_MONTH));
+ String hour = String.format("%02d", calendar.get(Calendar.HOUR_OF_DAY));
+ return Joiner.on(PATH_SEPARATOR).join(basePath, HOURLY_SUBDIR, year,
month, day, hour);
+ }
+
+ private static boolean isJsonFile(File file) {
+ return Files.getFileExtension(file.getName()).equals(JSON_FILE_EXTENSION);
+ }
+}
diff --git a/gobblin-compaction/src/main/resources/orcCompactionTest/data1.json
b/gobblin-compaction/src/main/resources/orcCompactionTest/data1.json
new file mode 100644
index 0000000..6ad0c76
--- /dev/null
+++ b/gobblin-compaction/src/main/resources/orcCompactionTest/data1.json
@@ -0,0 +1 @@
+{"header": { "memberId": 1, "time" : 1599171293000}, "name": "Alyssa"}
\ No newline at end of file
diff --git
a/gobblin-compaction/src/main/resources/orcCompactionTest/data1.schema
b/gobblin-compaction/src/main/resources/orcCompactionTest/data1.schema
new file mode 100644
index 0000000..cb10052
--- /dev/null
+++ b/gobblin-compaction/src/main/resources/orcCompactionTest/data1.schema
@@ -0,0 +1 @@
+struct<header:struct<memberId:int,time:bigint>,name:string>
\ No newline at end of file
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 145ecf1..223fc57 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -28,8 +28,6 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
-import org.apache.gobblin.configuration.State;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -43,12 +41,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
+import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories;
import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import
org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
import org.apache.gobblin.data.management.dataset.TimePartitionGlobFinder;
@@ -56,6 +56,8 @@ import
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobData
import org.apache.gobblin.runtime.api.JobExecutionResult;
import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import static
org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob;
+
@Slf4j
@Test(groups = {"gobblin.compaction"})
@@ -70,7 +72,6 @@ public class AvroCompactionTaskTest {
@Test
public void testDedup() throws Exception {
-
File basePath = Files.createTempDir();
basePath.deleteOnExit();
@@ -85,7 +86,7 @@ public class AvroCompactionTaskTest {
File newestFile = writeFileWithContent(jobDir, "file3", r3, 10,
r3.getSchema());
newestFile.setLastModified(Long.MAX_VALUE);
- EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("dedup",
basePath.getAbsolutePath().toString());
+ EmbeddedGobblin embeddedGobblin =
createEmbeddedGobblinCompactionJob("dedup", basePath.getAbsolutePath());
JobExecutionResult result = embeddedGobblin.run();
Assert.assertTrue(result.isSuccessful());
}
@@ -103,7 +104,7 @@ public class AvroCompactionTaskTest {
writeFileWithContent(jobDir, "file1", r1, 20);
writeFileWithContent(jobDir, "file2", r2, 18);
- EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("non-dedup",
basePath.getAbsolutePath().toString());
+ EmbeddedGobblin embeddedGobblin =
createEmbeddedGobblinCompactionJob("non-dedup",
basePath.getAbsolutePath().toString());
JobExecutionResult result = embeddedGobblin.run();
Assert.assertTrue(result.isSuccessful());
}
@@ -155,7 +156,7 @@ public class AvroCompactionTaskTest {
GenericRecord r1 = createRandomRecord();
writeFileWithContent(jobDir, "file1", r1, 20);
- EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin
("Recompaction-First", basePath);
+ EmbeddedGobblin embeddedGobblin =
createEmbeddedGobblinCompactionJob("Recompaction-First", basePath);
JobExecutionResult result = embeddedGobblin.run();
long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
Assert.assertTrue(result.isSuccessful());
@@ -163,7 +164,7 @@ public class AvroCompactionTaskTest {
// Now write more avro files to input dir
writeFileWithContent(jobDir, "file2", r1, 22);
- EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin
("Recompaction-Second", basePath);
+ EmbeddedGobblin embeddedGobblin_2 =
createEmbeddedGobblinCompactionJob("Recompaction-Second", basePath);
embeddedGobblin_2.run();
Assert.assertTrue(result.isSuccessful());
@@ -185,7 +186,7 @@ public class AvroCompactionTaskTest {
GenericRecord r1 = createRandomRecord();
writeFileWithContent(jobDir, "file1", r1, 20);
- EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin
("Recompaction-First", basePath);
+ EmbeddedGobblin embeddedGobblin =
createEmbeddedGobblinCompactionJob("Recompaction-First", basePath);
embeddedGobblin.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER,
"true");
JobExecutionResult result = embeddedGobblin.run();
long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
@@ -194,7 +195,7 @@ public class AvroCompactionTaskTest {
// Now write more avro files to input dir
writeFileWithContent(jobDir, "file2", r1, 22);
- EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin
("Recompaction-Second", basePath);
+ EmbeddedGobblin embeddedGobblin_2 =
createEmbeddedGobblinCompactionJob("Recompaction-Second", basePath);
embeddedGobblin_2.setConfiguration(ConfigurationKeys.RECOMPACTION_WRITE_TO_NEW_FOLDER,
"true");
embeddedGobblin_2.run();
Assert.assertTrue(result.isSuccessful());
@@ -218,7 +219,7 @@ public class AvroCompactionTaskTest {
GenericRecord r1 = createRandomRecord();
writeFileWithContent(jobDir, "file1", r1, 20);
- EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin
("Recompaction-First", basePath);
+ EmbeddedGobblin embeddedGobblin =
createEmbeddedGobblinCompactionJob("Recompaction-First", basePath);
JobExecutionResult result = embeddedGobblin.run();
long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
Assert.assertTrue(result.isSuccessful());
@@ -226,7 +227,7 @@ public class AvroCompactionTaskTest {
// Now write more avro files to input dir
writeFileWithContent(jobDir, "file2", r1, 22);
- EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin
("Recompaction-Second", basePath);
+ EmbeddedGobblin embeddedGobblin_2 =
createEmbeddedGobblinCompactionJob("Recompaction-Second", basePath);
embeddedGobblin_2.setConfiguration(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION,
"8h");
embeddedGobblin_2.run();
Assert.assertTrue(result.isSuccessful());
@@ -304,24 +305,9 @@ public class AvroCompactionTaskTest {
writer.close();
}
- static EmbeddedGobblin createEmbeddedGobblin (String name, String basePath) {
- String pattern = new Path(basePath, "*/*/minutely/*/*/*/*").toString();
-
- return new EmbeddedGobblin(name)
- .setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY,
CompactionSource.class.getName())
-
.setConfiguration(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY,
pattern)
- .setConfiguration(MRCompactor.COMPACTION_INPUT_DIR,
basePath.toString())
- .setConfiguration(MRCompactor.COMPACTION_INPUT_SUBDIR, "minutely")
- .setConfiguration(MRCompactor.COMPACTION_DEST_DIR,
basePath.toString())
- .setConfiguration(MRCompactor.COMPACTION_DEST_SUBDIR, "hourly")
- .setConfiguration(MRCompactor.COMPACTION_TMP_DEST_DIR,
"/tmp/compaction/" + name)
-
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
"3000d")
-
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
"1d")
- .setConfiguration(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0");
- }
private EmbeddedGobblin createEmbeddedGobblinForAllFailures (String name,
String basePath) {
- return createEmbeddedGobblin(name, basePath)
+ return createEmbeddedGobblinCompactionJob(name, basePath)
.setConfiguration(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY,
"KafkaAuditCountHttpClientFactory")
.setConfiguration(CompactionAuditCountVerifier.GOBBLIN_TIER, "dummy")
.setConfiguration(CompactionAuditCountVerifier.ORIGIN_TIER, "dummy")
@@ -330,12 +316,12 @@ public class AvroCompactionTaskTest {
}
private EmbeddedGobblin createEmbeddedGobblinForHiveRegistrationFailure
(String name, String basePath) {
- return createEmbeddedGobblin(name, basePath)
+ return createEmbeddedGobblinCompactionJob(name, basePath)
.setConfiguration(ConfigurationKeys.COMPACTION_SUITE_FACTORY,
"HiveRegistrationFailureFactory");
}
private EmbeddedGobblin createEmbeddedGobblinWithPriority (String name,
String basePath) {
- return createEmbeddedGobblin(name, basePath)
+ return createEmbeddedGobblinCompactionJob(name, basePath)
.setConfiguration(ConfigurationKeys.COMPACTION_PRIORITIZER_ALIAS,
"TieredDatasets")
.setConfiguration(SimpleDatasetHierarchicalPrioritizer.TIER_KEY +
".0", "Identity")
.setConfiguration(SimpleDatasetHierarchicalPrioritizer.TIER_KEY +
".1", "EVG")
@@ -356,7 +342,7 @@ public class AvroCompactionTaskTest {
writeFileWithContent(jobDir, "file_random", r1, 20);
}
- EmbeddedGobblin embeddedGobblin =
createEmbeddedGobblin("workunit_stream", basePath.getAbsolutePath().toString());
+ EmbeddedGobblin embeddedGobblin =
createEmbeddedGobblinCompactionJob("workunit_stream",
basePath.getAbsolutePath().toString());
JobExecutionResult result = embeddedGobblin.run();
Assert.assertTrue(result.isSuccessful());
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index 33191b8..b9c4152 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -17,19 +17,13 @@
package org.apache.gobblin.compaction.mapreduce;
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+
import org.apache.commons.io.FilenameUtils;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcTestUtils;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.runtime.api.JobExecutionResult;
-import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -38,7 +32,6 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.orc.OrcFile;
@@ -52,11 +45,21 @@ import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import org.apache.gobblin.compaction.mapreduce.orc.OrcTestUtils;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
+import
org.apache.gobblin.compaction.mapreduce.test.TestCompactionOrcJobConfigurator;
+import org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+
import static
org.apache.gobblin.compaction.mapreduce.CompactionOrcJobConfigurator.ORC_MAPPER_SHUFFLE_KEY_SCHEMA;
-import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
import static
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET;
import static
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_SHOULD_DEDUPLICATE;
+import static
org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob;
@Test(groups = {"gobblin.compaction"})
public class OrcCompactionTaskTest {
@@ -103,7 +106,7 @@ public class OrcCompactionTaskTest {
// Testing data is schema'ed with "struct<i:int,j:int>"
createTestingData(jobDir);
- EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic",
basePath.getAbsolutePath().toString())
+ EmbeddedGobblin embeddedGobblin =
TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob("basic",
basePath.getAbsolutePath())
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
TestCompactionOrcJobConfigurator.Factory.class.getName())
.setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
@@ -157,7 +160,7 @@ public class OrcCompactionTaskTest {
// Verify execution
// Overwrite the job configurator factory key.
- EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic",
basePath.getAbsolutePath().toString())
+ EmbeddedGobblin embeddedGobblin =
createEmbeddedGobblinCompactionJob("basic", basePath.getAbsolutePath())
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
TestCompactionOrcJobConfigurator.Factory.class.getName())
.setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
@@ -239,7 +242,7 @@ public class OrcCompactionTaskTest {
writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), nestedSchema,
ImmutableList.of(nested_struct_1,
nested_struct_2, nested_struct_3, nested_struct_4, nested_struct_5));
- EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic",
basePath.getAbsolutePath().toString())
+ EmbeddedGobblin embeddedGobblin =
createEmbeddedGobblinCompactionJob("basic",
basePath.getAbsolutePath().toString())
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
TestCompactionOrcJobConfigurator.Factory.class.getName())
.setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
@@ -287,7 +290,7 @@ public class OrcCompactionTaskTest {
createTestingData(jobDir);
- EmbeddedGobblin embeddedGobblin_nondedup = createEmbeddedGobblin("basic",
basePath.getAbsolutePath().toString())
+ EmbeddedGobblin embeddedGobblin_nondedup =
createEmbeddedGobblinCompactionJob("basic",
basePath.getAbsolutePath().toString())
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
TestCompactionOrcJobConfigurator.Factory.class.getName())
.setConfiguration(COMPACTION_OUTPUT_EXTENSION, "orc")
@@ -361,22 +364,4 @@ public class OrcCompactionTaskTest {
}
recordWriter.close(new TaskAttemptContextImpl(configuration, new
TaskAttemptID()));
}
-
- private static class TestCompactionOrcJobConfigurator extends
CompactionOrcJobConfigurator {
- public static class Factory implements
CompactionJobConfigurator.ConfiguratorFactory {
- @Override
- public TestCompactionOrcJobConfigurator createConfigurator(State state)
throws IOException {
- return new TestCompactionOrcJobConfigurator(state);
- }
- }
-
- @Override
- protected void setNumberOfReducers(Job job) throws IOException {
- job.setNumReduceTasks(1);
- }
-
- public TestCompactionOrcJobConfigurator(State state) throws IOException {
- super(state);
- }
- }
}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
index 83e91bd..01487bc 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
@@ -32,9 +32,12 @@ import org.testng.annotations.Test;
@Test(groups = {"gobblin.compaction"})
public class OrcUtilsTest {
-
- final int intValue = 10;
- final String stringValue = "testString";
+ final int intValue1 = 10;
+ final String stringValue1 = "testString1";
+ final int intValue2 = 20;
+ final String stringValue2 = "testString2";
+ final int intValue3 = 30;
+ final String stringValue3 = "testString3";
final boolean boolValue = true;
@Test
@@ -108,97 +111,136 @@ public class OrcUtilsTest {
}
@Test
- public void testUpConvertOrcStruct() {
-
+ public void testUpConvertSimpleOrcStruct() {
// Basic case, all primitives, newly added value will be set to null
TypeDescription baseStructSchema =
TypeDescription.fromString("struct<a:int,b:string>");
// This would be re-used in the following tests as the actual record using
the schema.
OrcStruct baseStruct = (OrcStruct) OrcStruct.createValue(baseStructSchema);
// Fill in the baseStruct with specified value.
- OrcTestUtils.fillOrcStructWithFixedValue(baseStruct, baseStructSchema,
intValue, stringValue, boolValue);
+ OrcTestUtils.fillOrcStructWithFixedValue(baseStruct, baseStructSchema,
intValue1, stringValue1, boolValue);
TypeDescription evolved_baseStructSchema =
TypeDescription.fromString("struct<a:int,b:string,c:int>");
OrcStruct evolvedStruct = (OrcStruct)
OrcStruct.createValue(evolved_baseStructSchema);
// This should be equivalent to
deserialize(baseStruct).serialize(evolvedStruct, evolvedSchema);
OrcUtils.upConvertOrcStruct(baseStruct, evolvedStruct,
evolved_baseStructSchema);
// Check if all value in baseStruct is populated and newly created column
in evolvedStruct is filled with null.
- Assert.assertEquals(((IntWritable)
evolvedStruct.getFieldValue("a")).get(), intValue);
- Assert.assertEquals(((Text) evolvedStruct.getFieldValue("b")).toString(),
stringValue);
+ Assert.assertEquals(((IntWritable)
evolvedStruct.getFieldValue("a")).get(), intValue1);
+ Assert.assertEquals(evolvedStruct.getFieldValue("b").toString(),
stringValue1);
Assert.assertNull(evolvedStruct.getFieldValue("c"));
// Base case: Reverse direction, which is column projection on top-level
columns.
OrcStruct baseStruct_shadow = (OrcStruct)
OrcStruct.createValue(baseStructSchema);
OrcUtils.upConvertOrcStruct(evolvedStruct, baseStruct_shadow,
baseStructSchema);
Assert.assertEquals(baseStruct, baseStruct_shadow);
+ }
- // Simple Nested: List/Map/Union within Struct.
+ @Test
+ public void testUpConvertOrcStructOfList() {
+ // Simple Nested: List within Struct.
// The element type of list contains a new field.
// Prepare two ListInStructs with different size ( the list field contains
different number of members)
- TypeDescription listInStructSchema =
TypeDescription.fromString("struct<a:array<struct<a:int,b:string>>>");
- OrcStruct listInStruct = (OrcStruct)
OrcUtils.createValueRecursively(listInStructSchema);
- OrcTestUtils.fillOrcStructWithFixedValue(listInStruct, listInStructSchema,
intValue, stringValue, boolValue);
- TypeDescription evolved_listInStructSchema =
+ TypeDescription structOfListSchema =
TypeDescription.fromString("struct<a:array<struct<a:int,b:string>>>");
+ OrcStruct structOfList = (OrcStruct)
OrcUtils.createValueRecursively(structOfListSchema);
+
+ //Create an OrcList instance with two entries
+ TypeDescription innerStructSchema =
TypeDescription.createStruct().addField("a", TypeDescription.createInt())
+ .addField("b", TypeDescription.createString());
+ OrcStruct innerStruct1 = new OrcStruct(innerStructSchema);
+ innerStruct1.setFieldValue("a", new IntWritable(intValue1));
+ innerStruct1.setFieldValue("b", new Text(stringValue1));
+
+ OrcStruct innerStruct2 = new OrcStruct(innerStructSchema);
+ innerStruct2.setFieldValue("a", new IntWritable(intValue2));
+ innerStruct2.setFieldValue("b", new Text(stringValue2));
+
+ TypeDescription listSchema = TypeDescription.createList(innerStructSchema);
+ OrcList orcList = new OrcList(listSchema);
+ orcList.add(innerStruct1);
+ orcList.add(innerStruct2);
+ structOfList.setFieldValue("a", orcList);
+
+ TypeDescription evolvedStructOfListSchema =
TypeDescription.fromString("struct<a:array<struct<a:int,b:string,c:int>>>");
- OrcStruct evolved_listInStruct = (OrcStruct)
OrcUtils.createValueRecursively(evolved_listInStructSchema);
+ OrcStruct evolvedStructOfList = (OrcStruct)
OrcUtils.createValueRecursively(evolvedStructOfListSchema);
// Convert and verify contents.
- OrcUtils.upConvertOrcStruct(listInStruct, evolved_listInStruct,
evolved_listInStructSchema);
+ OrcUtils.upConvertOrcStruct(structOfList, evolvedStructOfList,
evolvedStructOfListSchema);
+ Assert.assertEquals(
+ ((IntWritable) ((OrcStruct) ((OrcList)
evolvedStructOfList.getFieldValue("a")).get(0)).getFieldValue("a"))
+ .get(), intValue1);
Assert.assertEquals(
- ((IntWritable) ((OrcStruct) ((OrcList)
evolved_listInStruct.getFieldValue("a")).get(0)).getFieldValue("a"))
- .get(), intValue);
+ ((OrcStruct) ((OrcList)
evolvedStructOfList.getFieldValue("a")).get(0)).getFieldValue("b").toString(),
+ stringValue1);
+ Assert.assertNull((((OrcStruct) ((OrcList)
evolvedStructOfList.getFieldValue("a")).get(0)).getFieldValue("c")));
Assert.assertEquals(
- ((Text) ((OrcStruct) ((OrcList)
evolved_listInStruct.getFieldValue("a")).get(0)).getFieldValue("b")).toString(),
- stringValue);
- Assert.assertNull((((OrcStruct) ((OrcList)
evolved_listInStruct.getFieldValue("a")).get(0)).getFieldValue("c")));
- // Add cases when original OrcStruct has its list member having different
number of elements then the destination OrcStruct.
- // original has list.size() = 2, target has list.size() = 1
- listInStruct = (OrcStruct)
OrcUtils.createValueRecursively(listInStructSchema, 2);
- OrcTestUtils.fillOrcStructWithFixedValue(listInStruct, listInStructSchema,
intValue, stringValue, boolValue);
- Assert.assertNotEquals(((OrcList)listInStruct.getFieldValue("a")).size(),
- ((OrcList)evolved_listInStruct.getFieldValue("a")).size());
- OrcUtils.upConvertOrcStruct(listInStruct, evolved_listInStruct,
evolved_listInStructSchema);
- Assert.assertEquals(((OrcList)
evolved_listInStruct.getFieldValue("a")).size(), 2);
- // Original has lise.size()=0, target has list.size() = 1
- ((OrcList)listInStruct.getFieldValue("a")).clear();
- OrcUtils.upConvertOrcStruct(listInStruct, evolved_listInStruct,
evolved_listInStructSchema);
- Assert.assertEquals(((OrcList)
evolved_listInStruct.getFieldValue("a")).size(), 0);
+ ((IntWritable) ((OrcStruct) ((OrcList)
evolvedStructOfList.getFieldValue("a")).get(1)).getFieldValue("a"))
+ .get(), intValue2);
+ Assert.assertEquals(
+ ((OrcStruct) ((OrcList)
evolvedStructOfList.getFieldValue("a")).get(1)).getFieldValue("b").toString(),
+ stringValue2);
+ Assert.assertNull((((OrcStruct) ((OrcList)
evolvedStructOfList.getFieldValue("a")).get(1)).getFieldValue("c")));
+
+ //Create a list in source OrcStruct with 3 elements
+ structOfList = (OrcStruct)
OrcUtils.createValueRecursively(structOfListSchema, 3);
+ OrcTestUtils.fillOrcStructWithFixedValue(structOfList, structOfListSchema,
intValue1, stringValue1, boolValue);
+ Assert.assertNotEquals(((OrcList) structOfList.getFieldValue("a")).size(),
+ ((OrcList) evolvedStructOfList.getFieldValue("a")).size());
+ OrcUtils.upConvertOrcStruct(structOfList, evolvedStructOfList,
evolvedStructOfListSchema);
+ Assert.assertEquals(((OrcList)
evolvedStructOfList.getFieldValue("a")).size(), 3);
+ // Original has list.size()=0, target has list.size() = 1
+ ((OrcList) structOfList.getFieldValue("a")).clear();
+ OrcUtils.upConvertOrcStruct(structOfList, evolvedStructOfList,
evolvedStructOfListSchema);
+ Assert.assertEquals(((OrcList)
evolvedStructOfList.getFieldValue("a")).size(), 0);
+ }
+ @Test
+ public void testUpConvertOrcStructOfMap() {
// Map within Struct, contains a type-widening in the map-value type.
- TypeDescription mapInStructSchema =
TypeDescription.fromString("struct<a:map<string,int>>");
- OrcStruct mapInStruct = (OrcStruct)
OrcStruct.createValue(mapInStructSchema);
+ TypeDescription structOfMapSchema =
TypeDescription.fromString("struct<a:map<string,int>>");
+ OrcStruct structOfMap = (OrcStruct)
OrcStruct.createValue(structOfMapSchema);
TypeDescription mapSchema =
TypeDescription.createMap(TypeDescription.createString(),
TypeDescription.createInt());
- OrcMap mapEntry = new OrcMap(mapSchema);
- mapEntry.put(new Text(""), new IntWritable());
- mapInStruct.setFieldValue("a", mapEntry);
- OrcTestUtils.fillOrcStructWithFixedValue(mapEntry, mapSchema, intValue,
stringValue, boolValue);
+ OrcMap testMap = new OrcMap(mapSchema);
+ //Add dummy entries to initialize the testMap. The actual keys and values
will be set later.
+ testMap.put(new Text(stringValue1), new IntWritable(intValue1));
+ testMap.put(new Text(stringValue2), new IntWritable(intValue2));
+ structOfMap.setFieldValue("a", testMap);
// Create the target struct with evolved schema
- TypeDescription evolved_mapInStructSchema =
TypeDescription.fromString("struct<a:map<string,bigint>>");
- OrcStruct evolved_mapInStruct = (OrcStruct)
OrcStruct.createValue(evolved_mapInStructSchema);
- OrcMap evolvedMapEntry =
+ TypeDescription evolvedStructOfMapSchema =
TypeDescription.fromString("struct<a:map<string,bigint>>");
+ OrcStruct evolvedStructOfMap = (OrcStruct)
OrcStruct.createValue(evolvedStructOfMapSchema);
+ OrcMap evolvedMap =
new OrcMap(TypeDescription.createMap(TypeDescription.createString(),
TypeDescription.createInt()));
- evolvedMapEntry.put(new Text(""), new LongWritable(2L));
- evolvedMapEntry.put(new Text(""), new LongWritable(3L));
- evolved_mapInStruct.setFieldValue("a", evolvedMapEntry);
+ //Initialize a map
+ evolvedMap.put(new Text(""), new LongWritable());
+ evolvedStructOfMap.setFieldValue("a", evolvedMap);
// convert and verify: Type-widening is correct, and size of output file
is correct.
- OrcUtils.upConvertOrcStruct(mapInStruct, evolved_mapInStruct,
evolved_mapInStructSchema);
+ OrcUtils.upConvertOrcStruct(structOfMap, evolvedStructOfMap,
evolvedStructOfMapSchema);
- Assert.assertEquals(((OrcMap)
evolved_mapInStruct.getFieldValue("a")).get(new Text(stringValue)),
- new LongWritable(intValue));
- Assert.assertEquals(((OrcMap)
evolved_mapInStruct.getFieldValue("a")).size(), 1);
+ Assert.assertEquals(((OrcMap)
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue1)),
+ new LongWritable(intValue1));
+ Assert.assertEquals(((OrcMap)
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue2)),
+ new LongWritable(intValue2));
+ Assert.assertEquals(((OrcMap)
evolvedStructOfMap.getFieldValue("a")).size(), 2);
// re-use the same object but the source struct has fewer member in the
map entry.
- mapEntry.put(new Text(""), new IntWritable(1));
+ testMap.put(new Text(stringValue3), new IntWritable(intValue3));
// sanity check
- Assert.assertEquals(((OrcMap) mapInStruct.getFieldValue("a")).size(), 2);
- OrcUtils.upConvertOrcStruct(mapInStruct, evolved_mapInStruct,
evolved_mapInStructSchema);
- Assert.assertEquals(((OrcMap)
evolved_mapInStruct.getFieldValue("a")).size(), 2);
- Assert.assertEquals(((OrcMap)
evolved_mapInStruct.getFieldValue("a")).get(new Text(stringValue)),
- new LongWritable(intValue));
+ Assert.assertEquals(((OrcMap) structOfMap.getFieldValue("a")).size(), 3);
+ OrcUtils.upConvertOrcStruct(structOfMap, evolvedStructOfMap,
evolvedStructOfMapSchema);
+ Assert.assertEquals(((OrcMap)
evolvedStructOfMap.getFieldValue("a")).size(), 3);
+ Assert.assertEquals(((OrcMap)
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue1)),
+ new LongWritable(intValue1));
+ Assert.assertEquals(((OrcMap)
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue2)),
+ new LongWritable(intValue2));
+ Assert.assertEquals(((OrcMap)
evolvedStructOfMap.getFieldValue("a")).get(new Text(stringValue3)),
+ new LongWritable(intValue3));
+ }
+ @Test
+ public void testUpConvertOrcStructOfUnion() {
// Union in struct, type widening within the union's member field.
TypeDescription unionInStructSchema =
TypeDescription.fromString("struct<a:uniontype<int,string>>");
OrcStruct unionInStruct = (OrcStruct)
OrcStruct.createValue(unionInStructSchema);
OrcUnion placeHolderUnion = new
OrcUnion(TypeDescription.fromString("uniontype<int,string>"));
placeHolderUnion.set(0, new IntWritable(1));
unionInStruct.setFieldValue("a", placeHolderUnion);
- OrcTestUtils.fillOrcStructWithFixedValue(unionInStruct,
unionInStructSchema, intValue, stringValue, boolValue);
+ OrcTestUtils.fillOrcStructWithFixedValue(unionInStruct,
unionInStructSchema, intValue1, stringValue1, boolValue);
// Create new structWithUnion
TypeDescription evolved_unionInStructSchema =
TypeDescription.fromString("struct<a:uniontype<bigint,string>>");
OrcStruct evolvedUnionInStruct = (OrcStruct)
OrcStruct.createValue(evolved_unionInStructSchema);
@@ -208,42 +250,42 @@ public class OrcUtilsTest {
OrcUtils.upConvertOrcStruct(unionInStruct, evolvedUnionInStruct,
evolved_unionInStructSchema);
// Check in the tag 0(Default from value-filler) within
evolvedUnionInStruct, the value is becoming type-widened with correct value.
Assert.assertEquals(((OrcUnion)
evolvedUnionInStruct.getFieldValue("a")).getTag(), 0);
- Assert.assertEquals(((OrcUnion)
evolvedUnionInStruct.getFieldValue("a")).getObject(), new
LongWritable(intValue));
+ Assert.assertEquals(((OrcUnion)
evolvedUnionInStruct.getFieldValue("a")).getObject(), new
LongWritable(intValue1));
// Check the case when union field is created in different tag.
// Complex: List<Struct> within struct among others and evolution happens
on multiple places, also type-widening in deeply nested level.
TypeDescription complexOrcSchema =
TypeDescription.fromString("struct<a:array<struct<a:string,b:int>>,b:struct<a:uniontype<int,string>>>");
OrcStruct complexOrcStruct = (OrcStruct)
OrcUtils.createValueRecursively(complexOrcSchema);
- OrcTestUtils.fillOrcStructWithFixedValue(complexOrcStruct,
complexOrcSchema, intValue, stringValue, boolValue);
+ OrcTestUtils.fillOrcStructWithFixedValue(complexOrcStruct,
complexOrcSchema, intValue1, stringValue1, boolValue);
TypeDescription evolvedComplexOrcSchema = TypeDescription
.fromString("struct<a:array<struct<a:string,b:bigint,c:string>>,b:struct<a:uniontype<bigint,string>,b:int>>");
OrcStruct evolvedComplexStruct = (OrcStruct)
OrcUtils.createValueRecursively(evolvedComplexOrcSchema);
OrcTestUtils
- .fillOrcStructWithFixedValue(evolvedComplexStruct,
evolvedComplexOrcSchema, intValue, stringValue, boolValue);
+ .fillOrcStructWithFixedValue(evolvedComplexStruct,
evolvedComplexOrcSchema, intValue1, stringValue1, boolValue);
// Check if new columns are assigned with null value and type widening is
working fine.
OrcUtils.upConvertOrcStruct(complexOrcStruct, evolvedComplexStruct,
evolvedComplexOrcSchema);
Assert
-
.assertEquals(((OrcStruct)((OrcList)evolvedComplexStruct.getFieldValue("a")).get(0)).getFieldValue("b"),
new LongWritable(intValue));
+
.assertEquals(((OrcStruct)((OrcList)evolvedComplexStruct.getFieldValue("a")).get(0)).getFieldValue("b"),
new LongWritable(intValue1));
Assert.assertNull(((OrcStruct)((OrcList)evolvedComplexStruct.getFieldValue("a")).get(0)).getFieldValue("c"));
- Assert.assertEquals(((OrcUnion)
((OrcStruct)evolvedComplexStruct.getFieldValue("b")).getFieldValue("a")).getObject(),
new LongWritable(intValue));
+ Assert.assertEquals(((OrcUnion)
((OrcStruct)evolvedComplexStruct.getFieldValue("b")).getFieldValue("a")).getObject(),
new LongWritable(intValue1));
Assert.assertNull(((OrcStruct)evolvedComplexStruct.getFieldValue("b")).getFieldValue("b"));
}
@Test
- public void testNestedWithinUnionWithDiffTag() throws Exception {
+ public void testNestedWithinUnionWithDiffTag() {
// Construct union type with different tag for the src object dest object,
check if up-convert happens correctly.
TypeDescription structInUnionAsStruct =
TypeDescription.fromString("struct<a:uniontype<struct<a:int,b:string>,int>>");
OrcStruct structInUnionAsStructObject = (OrcStruct)
OrcUtils.createValueRecursively(structInUnionAsStruct);
OrcTestUtils
- .fillOrcStructWithFixedValue(structInUnionAsStructObject,
structInUnionAsStruct, 0, intValue, stringValue, boolValue);
+ .fillOrcStructWithFixedValue(structInUnionAsStructObject,
structInUnionAsStruct, 0, intValue1, stringValue1, boolValue);
Assert.assertEquals(((OrcStruct)((OrcUnion)structInUnionAsStructObject.getFieldValue("a")).getObject())
- .getFieldValue("a"), new IntWritable(intValue));
+ .getFieldValue("a"), new IntWritable(intValue1));
OrcStruct structInUnionAsStructObject_2 = (OrcStruct)
OrcUtils.createValueRecursively(structInUnionAsStruct);
OrcTestUtils
- .fillOrcStructWithFixedValue(structInUnionAsStructObject_2,
structInUnionAsStruct, 1, intValue, stringValue, boolValue);
-
Assert.assertEquals(((OrcUnion)structInUnionAsStructObject_2.getFieldValue("a")).getObject(),
new IntWritable(intValue));
+ .fillOrcStructWithFixedValue(structInUnionAsStructObject_2,
structInUnionAsStruct, 1, intValue1, stringValue1, boolValue);
+
Assert.assertEquals(((OrcUnion)structInUnionAsStructObject_2.getFieldValue("a")).getObject(),
new IntWritable(intValue1));
// Create a new record container, do up-convert twice and check if the
value is propagated properly.
OrcStruct container = (OrcStruct)
OrcUtils.createValueRecursively(structInUnionAsStruct);
@@ -260,7 +302,7 @@ public class OrcUtilsTest {
* field a was set to null by one call of "upConvertOrcStruct", but the
subsequent call should still have the nested
* field filled.
*/
- public void testNestedFieldSequenceSet() throws Exception {
+ public void testNestedFieldSequenceSet() {
TypeDescription schema =
TypeDescription.fromString("struct<a:array<struct<a:int,b:int>>>");
OrcStruct struct = (OrcStruct) OrcUtils.createValueRecursively(schema);
OrcTestUtils.fillOrcStructWithFixedValue(struct, schema, 1, "test", true);
@@ -280,22 +322,22 @@ public class OrcUtilsTest {
* Just a sanity test for column project, should be no difference from other
cases when provided reader schema.
*/
@Test
- public void testOrcStructProjection() throws Exception {
+ public void testOrcStructProjection() {
TypeDescription originalSchema =
TypeDescription.fromString("struct<a:struct<a:int,b:int>,b:struct<c:int,d:int>,c:int>");
OrcStruct originalStruct = (OrcStruct)
OrcUtils.createValueRecursively(originalSchema);
- OrcTestUtils.fillOrcStructWithFixedValue(originalStruct, originalSchema,
intValue, stringValue, boolValue);
+ OrcTestUtils.fillOrcStructWithFixedValue(originalStruct, originalSchema,
intValue1, stringValue1, boolValue);
TypeDescription projectedSchema =
TypeDescription.fromString("struct<a:struct<b:int>,b:struct<c:int>>");
OrcStruct projectedStructExpectedValue = (OrcStruct)
OrcUtils.createValueRecursively(projectedSchema);
OrcTestUtils
- .fillOrcStructWithFixedValue(projectedStructExpectedValue,
projectedSchema, intValue, stringValue, boolValue);
+ .fillOrcStructWithFixedValue(projectedStructExpectedValue,
projectedSchema, intValue1, stringValue1, boolValue);
OrcStruct projectColumnStruct = (OrcStruct)
OrcUtils.createValueRecursively(projectedSchema);
OrcUtils.upConvertOrcStruct(originalStruct, projectColumnStruct,
projectedSchema);
Assert.assertEquals(projectColumnStruct, projectedStructExpectedValue);
}
@Test
- public void complexTypeEligibilityCheck() throws Exception {
+ public void complexTypeEligibilityCheck() {
TypeDescription struct_array_0 =
TypeDescription.fromString("struct<first:array<int>,second:int>");
TypeDescription struct_array_1 =
TypeDescription.fromString("struct<first:array<int>,second:int>");
Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_array_0,
struct_array_1));
@@ -311,7 +353,7 @@ public class OrcUtilsTest {
Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_map_0,
struct_map_3));
}
- public void testSchemaContains() throws Exception {
+ public void testSchemaContains() {
// Simple case.
TypeDescription struct_0 =
TypeDescription.fromString("struct<a:int,b:int>");
TypeDescription struct_1 = TypeDescription.fromString("struct<a:int>");
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 8a12f44..7c8931f 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -171,6 +171,7 @@ ext.externalDependency = [
"grok": "io.thekraken:grok:0.1.5",
"hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
"orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.3",
+ "orcTools":"org.apache.orc:orc-tools:1.6.3",
'parquet': 'org.apache.parquet:parquet-hadoop:1.10.1',
'parquetAvro': 'org.apache.parquet:parquet-avro:1.10.1',
'parquetProto': 'org.apache.parquet:parquet-protobuf:1.10.1',